Remove code smells and increase code coverage 60/2560/6
authorelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 24 Feb 2020 08:17:11 +0000 (09:17 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 2 Mar 2020 12:26:03 +0000 (13:26 +0100)
Added new test cases.

Change-Id: Ie0e726d7ff6ea185282010f66e842b973b4718c2
Issue-ID: NONRTRIC-142
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
33 files changed:
policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java
policy-agent/src/main/java/org/oransc/policyagent/clients/JsonHelper.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapRequestMessage.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java [moved from policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java with 53% similarity]
policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientFactoryTest.java
policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientHelper.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java [new file with mode: 0644]
policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java [new file with mode: 0644]
policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
policy-agent/src/test/java/org/oransc/policyagent/utils/LoggingUtils.java

index a3c7448..ff9c8ff 100644 (file)
@@ -21,7 +21,6 @@
 package org.oransc.policyagent.clients;
 
 import java.util.List;
-
 import org.oransc.policyagent.repository.Policy;
 
 import reactor.core.publisher.Flux;
@@ -52,5 +51,4 @@ public interface A1Client {
     public Flux<String> deleteAllPolicies();
 
     public Mono<String> getPolicyStatus(Policy policy);
-
 }
index 0e7fee2..3e37017 100644 (file)
@@ -77,12 +77,12 @@ public class A1ClientFactory {
     private Mono<A1Client.A1ProtocolType> getProtocolVersion(Ric ric) {
         if (ric.getProtocolVersion() == A1ProtocolType.UNKNOWN) {
             return fetchVersion(createSdnrOnapA1Client(ric)) //
-                .onErrorResume(err -> fetchVersion(createSdncOscA1Client(ric))) //
-                .onErrorResume(err -> fetchVersion(createOscA1Client(ric))) //
-                .onErrorResume(err -> fetchVersion(createStdA1ClientImpl(ric))) //
-                .doOnNext(version -> ric.setProtocolVersion(version))
+                .onErrorResume(notUsed -> fetchVersion(createSdncOscA1Client(ric))) //
+                .onErrorResume(notUsed -> fetchVersion(createOscA1Client(ric))) //
+                .onErrorResume(notUsed -> fetchVersion(createStdA1ClientImpl(ric))) //
+                .doOnNext(ric::setProtocolVersion)
                 .doOnNext(version -> logger.debug("Recover ric: {}, protocol version:{}", ric.name(), version)) //
-                .doOnError(t -> logger.warn("Could not get protocol version from RIC: {}", ric.name())); //
+                .doOnError(notUsed -> logger.warn("Could not get protocol version from RIC: {}", ric.name())); //
         } else {
             return Mono.just(ric.getProtocolVersion());
         }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/JsonHelper.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/JsonHelper.java
new file mode 100644 (file)
index 0000000..4ce7b5a
--- /dev/null
@@ -0,0 +1,97 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.clients;
+
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class JsonHelper {
+    private static Gson gson = new GsonBuilder() //
+        .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES) //
+        .create();
+
+    private JsonHelper() {
+
+    }
+
+    public static Flux<String> parseJsonArrayOfString(String inputString) {
+        try {
+            List<String> arrayList = new ArrayList<>();
+            if (!inputString.isEmpty()) {
+                JSONArray jsonArray = new JSONArray(inputString);
+                for (int i = 0; i < jsonArray.length(); i++) {
+                    arrayList.add(jsonArray.getString(i));
+                }
+            }
+            return Flux.fromIterable(arrayList);
+        } catch (JSONException ex) { // invalid json
+            return Flux.error(ex);
+        }
+    }
+
+    public static <T> String createInputJsonString(T params) {
+        JSONObject inputJson = new JSONObject();
+        inputJson.put("input", gson.toJson(params));
+        return inputJson.toString();
+    }
+
+    public static Mono<String> getValueFromResponse(String response, String key) {
+        try {
+            JSONObject outputJson = new JSONObject(response);
+            JSONObject responseParams = outputJson.getJSONObject("output");
+            if (!responseParams.has(key)) {
+                return Mono.just("");
+            }
+            String value = responseParams.get(key).toString();
+            return Mono.just(value);
+        } catch (JSONException ex) { // invalid json
+            return Mono.error(ex);
+        }
+    }
+
+    public static Mono<String> extractPolicySchema(String inputString) {
+        try {
+            JSONObject jsonObject = new JSONObject(inputString);
+            JSONObject schemaObject = jsonObject.getJSONObject("policySchema");
+            String schemaString = schemaObject.toString();
+            return Mono.just(schemaString);
+        } catch (JSONException ex) { // invalid json
+            return Mono.error(ex);
+        }
+    }
+
+    public static Mono<String> validateJson(String inputString) {
+        try {
+            new JSONObject(inputString);
+            return Mono.just(inputString);
+        } catch (JSONException ex) { // invalid json
+            return Mono.error(ex);
+        }
+    }
+}
index 0efe14d..efb1d52 100644 (file)
 package org.oransc.policyagent.clients;
 
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
 import java.util.List;
-import org.json.JSONArray;
-import org.json.JSONException;
 import org.json.JSONObject;
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
@@ -123,12 +120,12 @@ public class OscA1Client implements A1Client {
 
     private Flux<String> getPolicyTypeIds() {
         return restClient.get(POLICY_TYPES) //
-            .flatMapMany(this::parseJsonArrayOfString);
+            .flatMapMany(JsonHelper::parseJsonArrayOfString);
     }
 
     private Flux<String> getPolicyIdentitiesByType(String typeId) {
         return restClient.get(POLICY_IDS_URI.buildAndExpand(typeId).toUriString()) //
-            .flatMapMany(this::parseJsonArrayOfString);
+            .flatMapMany(JsonHelper::parseJsonArrayOfString);
     }
 
     private Mono<String> getCreateSchema(String policyTypeResponse, String policyTypeId) {
@@ -152,18 +149,4 @@ public class OscA1Client implements A1Client {
         return getPolicyIdentitiesByType(typeId) //
             .flatMap(policyId -> deletePolicyById(typeId, policyId));
     }
-
-    private Flux<String> parseJsonArrayOfString(String inputString) {
-        try {
-            List<String> arrayList = new ArrayList<>();
-            JSONArray jsonArray = new JSONArray(inputString);
-            for (int i = 0; i < jsonArray.length(); i++) {
-                arrayList.add(jsonArray.getString(i));
-            }
-            logger.debug("A1 client: received list = {}", arrayList);
-            return Flux.fromIterable(arrayList);
-        } catch (JSONException ex) { // invalid json
-            return Flux.error(ex);
-        }
-    }
 }
index 44799c8..feaa017 100644 (file)
 
 package org.oransc.policyagent.clients;
 
-import com.google.gson.FieldNamingPolicy;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
 import org.slf4j.Logger;
@@ -47,10 +40,6 @@ public class SdncOnapA1Client implements A1Client {
     private final RicConfig ricConfig;
     private final AsyncRestClient restClient;
 
-    private static Gson gson = new GsonBuilder() //
-        .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES) //
-        .create();
-
     public SdncOnapA1Client(RicConfig ricConfig, String baseUrl, String username, String password) {
         this(ricConfig, username, password, new AsyncRestClient(baseUrl + "/restconf/operations"));
         if (logger.isDebugEnabled()) {
@@ -84,14 +73,14 @@ public class SdncOnapA1Client implements A1Client {
             .nearRtRicId(ricConfig.baseUrl()) //
             .policyTypeId(policyTypeId) //
             .build();
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST getPolicyType inputJsonString = {}", inputJsonString);
 
         return restClient
             .postWithAuthHeader(URL_PREFIX + "getPolicyType", inputJsonString, a1ControllerUsername,
                 a1ControllerPassword) //
-            .flatMap(response -> getValueFromResponse(response, "policy-type")) //
-            .flatMap(this::extractPolicySchema);
+            .flatMap(response -> JsonHelper.getValueFromResponse(response, "policy-type")) //
+            .flatMap(JsonHelper::extractPolicySchema);
     }
 
     @Override
@@ -104,7 +93,7 @@ public class SdncOnapA1Client implements A1Client {
             .properties(new ArrayList<String>()) //
             .build();
 
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST putPolicy inputJsonString = {}", inputJsonString);
 
         return restClient.postWithAuthHeader(URL_PREFIX + "createPolicyInstance", inputJsonString,
@@ -137,14 +126,14 @@ public class SdncOnapA1Client implements A1Client {
         SdncOnapAdapterInput inputParams = ImmutableSdncOnapAdapterInput.builder() //
             .nearRtRicId(ricConfig.baseUrl()) //
             .build();
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST getPolicyTypeIdentities inputJsonString = {}", inputJsonString);
 
         return restClient
             .postWithAuthHeader("/A1-ADAPTER-API:getPolicyTypes", inputJsonString, a1ControllerUsername,
                 a1ControllerPassword) //
-            .flatMap(response -> getValueFromResponse(response, "policy-type-id-list")) //
-            .flatMapMany(this::parseJsonArrayOfString);
+            .flatMap(response -> JsonHelper.getValueFromResponse(response, "policy-type-id-list")) //
+            .flatMapMany(JsonHelper::parseJsonArrayOfString);
     }
 
     private Flux<String> getPolicyIdentitiesByType(String policyTypeId) {
@@ -152,56 +141,14 @@ public class SdncOnapA1Client implements A1Client {
             .nearRtRicId(ricConfig.baseUrl()) //
             .policyTypeId(policyTypeId) //
             .build();
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST getPolicyIdentities inputJsonString = {}", inputJsonString);
 
         return restClient
             .postWithAuthHeader("/A1-ADAPTER-API:getPolicyInstances", inputJsonString, a1ControllerUsername,
                 a1ControllerPassword) //
-            .flatMap(response -> getValueFromResponse(response, "policy-instance-id-list")) //
-            .flatMapMany(this::parseJsonArrayOfString);
-    }
-
-    private Mono<String> getValueFromResponse(String response, String key) {
-        logger.debug("A1 client: response = {}", response);
-        try {
-            JSONObject outputJson = new JSONObject(response);
-            JSONObject responseParams = outputJson.getJSONObject("output");
-            if (!responseParams.has(key)) {
-                return Mono.just("");
-            }
-            String value = responseParams.get(key).toString();
-            return Mono.just(value);
-        } catch (JSONException ex) { // invalid json
-            return Mono.error(ex);
-        }
-    }
-
-    private Flux<String> parseJsonArrayOfString(String inputString) {
-        try {
-            List<String> arrayList = new ArrayList<>();
-            if (!inputString.isEmpty()) {
-                JSONArray jsonArray = new JSONArray(inputString);
-                for (int i = 0; i < jsonArray.length(); i++) {
-                    arrayList.add(jsonArray.getString(i));
-                }
-            }
-            logger.debug("A1 client: received list = {}", arrayList);
-            return Flux.fromIterable(arrayList);
-        } catch (JSONException ex) { // invalid json
-            return Flux.error(ex);
-        }
-    }
-
-    private Mono<String> extractPolicySchema(String inputString) {
-        try {
-            JSONObject jsonObject = new JSONObject(inputString);
-            JSONObject schemaObject = jsonObject.getJSONObject("policySchema");
-            String schemaString = schemaObject.toString();
-            return Mono.just(schemaString);
-        } catch (JSONException ex) { // invalid json
-            return Mono.error(ex);
-        }
+            .flatMap(response -> JsonHelper.getValueFromResponse(response, "policy-instance-id-list")) //
+            .flatMapMany(JsonHelper::parseJsonArrayOfString);
     }
 
     private Flux<String> deletePoliciesForType(String typeId) {
@@ -215,16 +162,10 @@ public class SdncOnapA1Client implements A1Client {
             .policyTypeId(policyTypeId) //
             .policyInstanceId(policyId) //
             .build();
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST deletePolicy inputJsonString = {}", inputJsonString);
 
         return restClient.postWithAuthHeader("/A1-ADAPTER-API:deletePolicyInstance", inputJsonString,
             a1ControllerUsername, a1ControllerPassword);
     }
-
-    private String createInputJsonString(SdncOnapAdapterInput params) {
-        JSONObject inputJson = new JSONObject();
-        inputJson.put("input", new JSONObject(gson.toJson(params)));
-        return inputJson.toString();
-    }
 }
index bbb3121..9541f7f 100644 (file)
 
 package org.oransc.policyagent.clients;
 
-import com.google.gson.FieldNamingPolicy;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
 import java.util.List;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
 import org.slf4j.Logger;
@@ -46,10 +39,6 @@ public class SdncOscA1Client implements A1Client {
     private final RicConfig ricConfig;
     private final AsyncRestClient restClient;
 
-    private static Gson gson = new GsonBuilder() //
-        .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES) //
-        .create(); //
-
     public SdncOscA1Client(RicConfig ricConfig, String baseUrl, String username, String password) {
         this(ricConfig, username, password, new AsyncRestClient(baseUrl + "/restconf/operations"));
         if (logger.isDebugEnabled()) {
@@ -69,14 +58,14 @@ public class SdncOscA1Client implements A1Client {
         SdncOscAdapterInput inputParams = ImmutableSdncOscAdapterInput.builder() //
             .nearRtRicUrl(ricConfig.baseUrl()) //
             .build();
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST getPolicyTypeIdentities inputJsonString = {}", inputJsonString);
 
         return restClient
             .postWithAuthHeader(URL_PREFIX + "getPolicyTypeIdentities", inputJsonString, a1ControllerUsername,
                 a1ControllerPassword) //
-            .flatMap(response -> getValueFromResponse(response, "policy-type-id-list")) //
-            .flatMapMany(this::parseJsonArrayOfString) //
+            .flatMap(response -> JsonHelper.getValueFromResponse(response, "policy-type-id-list")) //
+            .flatMapMany(JsonHelper::parseJsonArrayOfString) //
             .collectList();
     }
 
@@ -92,14 +81,14 @@ public class SdncOscA1Client implements A1Client {
             .nearRtRicUrl(ricConfig.baseUrl()) //
             .policyTypeId(policyTypeId) //
             .build();
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST getPolicyType inputJsonString = {}", inputJsonString);
 
         return restClient
             .postWithAuthHeader(URL_PREFIX + "getPolicyType", inputJsonString, a1ControllerUsername,
                 a1ControllerPassword) //
-            .flatMap(response -> getValueFromResponse(response, "policy-type")) //
-            .flatMap(this::extractPolicySchema);
+            .flatMap(response -> JsonHelper.getValueFromResponse(response, "policy-type")) //
+            .flatMap(JsonHelper::extractPolicySchema);
     }
 
     @Override
@@ -110,13 +99,13 @@ public class SdncOscA1Client implements A1Client {
             .policyId(policy.id()) //
             .policy(policy.json()) //
             .build();
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST putPolicy inputJsonString = {}", inputJsonString);
 
         return restClient
             .postWithAuthHeader(URL_PREFIX + "putPolicy", inputJsonString, a1ControllerUsername, a1ControllerPassword)
-            .flatMap(response -> getValueFromResponse(response, "returned-policy")) //
-            .flatMap(this::validateJson);
+            .flatMap(response -> JsonHelper.getValueFromResponse(response, "returned-policy")) //
+            .flatMap(JsonHelper::validateJson);
     }
 
     @Override
@@ -145,65 +134,14 @@ public class SdncOscA1Client implements A1Client {
         SdncOscAdapterInput inputParams = ImmutableSdncOscAdapterInput.builder() //
             .nearRtRicUrl(ricConfig.baseUrl()) //
             .build();
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST getPolicyIdentities inputJsonString = {}", inputJsonString);
 
         return restClient
             .postWithAuthHeader("/A1-ADAPTER-API:getPolicyIdentities", inputJsonString, a1ControllerUsername,
                 a1ControllerPassword) //
-            .flatMap(response -> getValueFromResponse(response, "policy-id-list")) //
-            .flatMapMany(this::parseJsonArrayOfString);
-    }
-
-    private Mono<String> getValueFromResponse(String response, String key) {
-        logger.debug("A1 client: response = {}", response);
-        try {
-            JSONObject outputJson = new JSONObject(response);
-            JSONObject responseParams = outputJson.getJSONObject("output");
-            if (!responseParams.has(key)) {
-                return Mono.just("");
-            }
-            String value = responseParams.get(key).toString();
-            return Mono.just(value);
-        } catch (JSONException ex) { // invalid json
-            return Mono.error(ex);
-        }
-    }
-
-    private Flux<String> parseJsonArrayOfString(String inputString) {
-        try {
-            List<String> arrayList = new ArrayList<>();
-            if (!inputString.isEmpty()) {
-                JSONArray jsonArray = new JSONArray(inputString);
-                for (int i = 0; i < jsonArray.length(); i++) {
-                    arrayList.add(jsonArray.getString(i));
-                }
-            }
-            logger.debug("A1 client: received list = {}", arrayList);
-            return Flux.fromIterable(arrayList);
-        } catch (JSONException ex) { // invalid json
-            return Flux.error(ex);
-        }
-    }
-
-    private Mono<String> extractPolicySchema(String inputString) {
-        try {
-            JSONObject jsonObject = new JSONObject(inputString);
-            JSONObject schemaObject = jsonObject.getJSONObject("policySchema");
-            String schemaString = schemaObject.toString();
-            return Mono.just(schemaString);
-        } catch (JSONException ex) { // invalid json
-            return Mono.error(ex);
-        }
-    }
-
-    private Mono<String> validateJson(String inputString) {
-        try {
-            new JSONObject(inputString);
-            return Mono.just(inputString);
-        } catch (JSONException ex) { // invalid json
-            return Mono.error(ex);
-        }
+            .flatMap(response -> JsonHelper.getValueFromResponse(response, "policy-id-list")) //
+            .flatMapMany(JsonHelper::parseJsonArrayOfString);
     }
 
     private Mono<String> deletePolicyById(String policyId) {
@@ -212,16 +150,10 @@ public class SdncOscA1Client implements A1Client {
             .policyId(policyId) //
             .build();
 
-        String inputJsonString = createInputJsonString(inputParams);
+        String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST deletePolicy inputJsonString = {}", inputJsonString);
 
         return restClient.postWithAuthHeader(URL_PREFIX + "deletePolicy", inputJsonString, a1ControllerUsername,
             a1ControllerPassword);
     }
-
-    private String createInputJsonString(SdncOscAdapterInput params) {
-        JSONObject inputJson = new JSONObject();
-        inputJson.put("input", new JSONObject(gson.toJson(params)));
-        return inputJson.toString();
-    }
 }
index 8f4334c..1715d9d 100644 (file)
 
 package org.oransc.policyagent.clients;
 
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
 import java.util.List;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.web.util.UriComponentsBuilder;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -41,7 +34,6 @@ public class StdA1Client implements A1Client {
     private static final String POLICY_TYPE_ID = "policyTypeId";
 
     private static final String POLICIES_URI = "/policies";
-    private static final String POLICY_SCHEMA = "policySchema";
 
     private static final UriComponentsBuilder POLICY_TYPE_SCHEMA_URI =
         UriComponentsBuilder.fromPath("/policytypes/{policy-type-name}");
@@ -55,8 +47,6 @@ public class StdA1Client implements A1Client {
     private static final UriComponentsBuilder POLICY_STATUS_URI =
         UriComponentsBuilder.fromPath("/policies/{policy-id}/status");
 
-    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
     private final AsyncRestClient restClient;
 
     public StdA1Client(RicConfig ricConfig) {
@@ -78,13 +68,13 @@ public class StdA1Client implements A1Client {
     public Mono<String> putPolicy(Policy policy) {
         String uri = POLICY_URI.buildAndExpand(policy.id(), policy.type().name()).toUriString();
         return restClient.put(uri, policy.json()) //
-            .flatMap(this::validateJson);
+            .flatMap(JsonHelper::validateJson);
     }
 
     @Override
     public Mono<List<String>> getPolicyTypeIdentities() {
         return restClient.get(POLICY_TYPES_URI) //
-            .flatMapMany(this::parseJsonArrayOfString) //
+            .flatMapMany(JsonHelper::parseJsonArrayOfString) //
             .collectList();
     }
 
@@ -92,7 +82,7 @@ public class StdA1Client implements A1Client {
     public Mono<String> getPolicyTypeSchema(String policyTypeId) {
         String uri = POLICY_TYPE_SCHEMA_URI.buildAndExpand(policyTypeId).toUriString();
         return restClient.get(uri) //
-            .flatMap(this::extractPolicySchema);
+            .flatMap(JsonHelper::extractPolicySchema);
     }
 
     @Override
@@ -120,46 +110,11 @@ public class StdA1Client implements A1Client {
 
     private Flux<String> getPolicyIds() {
         return restClient.get(POLICIES_URI) //
-            .flatMapMany(this::parseJsonArrayOfString);
+            .flatMapMany(JsonHelper::parseJsonArrayOfString);
     }
 
     private Mono<String> deletePolicyById(String policyId) {
         String uri = POLICY_DELETE_URI.buildAndExpand(policyId).toUriString();
         return restClient.delete(uri);
     }
-
-    private Flux<String> parseJsonArrayOfString(String inputString) {
-        try {
-            List<String> arrayList = new ArrayList<>();
-            JSONArray jsonArray = new JSONArray(inputString);
-            for (int i = 0; i < jsonArray.length(); i++) {
-                arrayList.add(jsonArray.getString(i));
-            }
-            logger.debug("A1 client: received list = {}", arrayList);
-            return Flux.fromIterable(arrayList);
-        } catch (JSONException ex) { // invalid json
-            return Flux.error(ex);
-        }
-    }
-
-    private Mono<String> extractPolicySchema(String inputString) {
-        try {
-            JSONObject jsonObject = new JSONObject(inputString);
-            JSONObject schemaObject = jsonObject.getJSONObject(POLICY_SCHEMA);
-            String schemaString = schemaObject.toString();
-            return Mono.just(schemaString);
-        } catch (JSONException ex) { // invalid json
-            return Mono.error(ex);
-        }
-    }
-
-    private Mono<String> validateJson(String inputString) {
-        try {
-            new JSONObject(inputString);
-            return Mono.just(inputString);
-        } catch (JSONException ex) { // invalid json
-            return Mono.error(ex);
-        }
-    }
-
 }
index b102964..ea27524 100644 (file)
@@ -22,7 +22,6 @@ package org.oransc.policyagent.controllers;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -31,7 +30,6 @@ import io.swagger.annotations.ApiResponses;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
index 38369a8..b95c196 100644 (file)
@@ -27,10 +27,9 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
-import java.util.Vector;
-
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Rics;
@@ -91,7 +90,7 @@ public class RicRepositoryController {
     public ResponseEntity<String> getRics(
         @RequestParam(name = "policyType", required = false) String supportingPolicyType) {
 
-        Vector<RicInfo> result = new Vector<>();
+        List<RicInfo> result = new ArrayList<>();
         synchronized (rics) {
             for (Ric ric : rics.getRics()) {
                 if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
index e68b82c..3f775a5 100644 (file)
@@ -28,9 +28,9 @@ import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Vector;
-
+import java.util.List;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
@@ -67,10 +67,10 @@ public class ServiceController {
     @ApiOperation(value = "Returns service information")
     @ApiResponses(
         value = {@ApiResponse(code = 200, message = "OK", response = ServiceStatus.class, responseContainer = "List")})
-    public ResponseEntity<String> getServices( //
-        @RequestParam(name = "name", required = false) String name) {
+    public ResponseEntity<String> getServices(//
+        @RequestParam(name = "serviceName", required = false) String name) {
 
-        Collection<ServiceStatus> servicesStatus = new Vector<>();
+        Collection<ServiceStatus> servicesStatus = new ArrayList<>();
         synchronized (this.services) {
             for (Service s : this.services.getAll()) {
                 if (name == null || name.equals(s.getName())) {
@@ -80,7 +80,7 @@ public class ServiceController {
         }
 
         String res = gson.toJson(servicesStatus);
-        return new ResponseEntity<String>(res, HttpStatus.OK);
+        return new ResponseEntity<>(res, HttpStatus.OK);
     }
 
     private ServiceStatus toServiceStatus(Service s) {
@@ -90,29 +90,29 @@ public class ServiceController {
     @ApiOperation(value = "Register a service")
     @ApiResponses(value = {@ApiResponse(code = 200, message = "OK", response = String.class)})
     @PutMapping("/service")
-    public ResponseEntity<String> putService( //
+    public ResponseEntity<String> putService(//
         @RequestBody ServiceRegistrationInfo registrationInfo) {
         try {
             this.services.put(toService(registrationInfo));
-            return new ResponseEntity<String>("OK", HttpStatus.OK);
+            return new ResponseEntity<>("OK", HttpStatus.OK);
         } catch (Exception e) {
-            return new ResponseEntity<String>(e.getMessage(), HttpStatus.NO_CONTENT);
+            return new ResponseEntity<>(e.getMessage(), HttpStatus.NO_CONTENT);
         }
     }
 
     @ApiOperation(value = "Delete a service")
     @ApiResponses(value = {@ApiResponse(code = 200, message = "OK")})
     @DeleteMapping("/services")
-    public ResponseEntity<String> deleteService( //
+    public ResponseEntity<String> deleteService(//
         @RequestParam(name = "serviceName", required = true) String serviceName) {
         try {
             Service service = removeService(serviceName);
             // Remove the policies from the repo and let the consistency monitoring
             // do the rest.
             removePolicies(service);
-            return new ResponseEntity<String>("OK", HttpStatus.NO_CONTENT);
+            return new ResponseEntity<>("OK", HttpStatus.NO_CONTENT);
         } catch (Exception e) {
-            return new ResponseEntity<String>(e.getMessage(), HttpStatus.NO_CONTENT);
+            return new ResponseEntity<>(e.getMessage(), HttpStatus.NO_CONTENT);
         }
     }
 
@@ -121,13 +121,13 @@ public class ServiceController {
         value = {@ApiResponse(code = 200, message = "Policies timeout supervision refreshed"),
             @ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
     @PostMapping("/services/keepalive")
-    public ResponseEntity<String> keepAliveService( //
+    public ResponseEntity<String> keepAliveService(//
         @RequestParam(name = "serviceName", required = true) String serviceName) {
         try {
             services.getService(serviceName).ping();
-            return new ResponseEntity<String>("OK", HttpStatus.OK);
+            return new ResponseEntity<>("OK", HttpStatus.OK);
         } catch (Exception e) {
-            return new ResponseEntity<String>(e.getMessage(), HttpStatus.NOT_FOUND);
+            return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
         }
     }
 
@@ -141,7 +141,7 @@ public class ServiceController {
 
     private void removePolicies(Service service) {
         synchronized (this.policies) {
-            Vector<Policy> policyList = new Vector<>(this.policies.getForService(service.getName()));
+            List<Policy> policyList = new ArrayList<>(this.policies.getForService(service.getName()));
             for (Policy policy : policyList) {
                 this.policies.remove(policy);
             }
index 91f9ff2..625c459 100644 (file)
@@ -22,7 +22,6 @@ package org.oransc.policyagent.dmaap;
 
 import com.google.common.collect.Iterables;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Properties;
@@ -83,7 +82,7 @@ public class DmaapMessageConsumer implements Runnable {
         }
     }
 
-    private Iterable<String> fetchAllMessages() throws ServiceException, FileNotFoundException, IOException {
+    private Iterable<String> fetchAllMessages() throws ServiceException, IOException {
         Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
         MRConsumer consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
         MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
@@ -95,18 +94,18 @@ public class DmaapMessageConsumer implements Runnable {
         }
     }
 
-    private void processMsg(String msg) throws Exception {
+    private void processMsg(String msg) throws IOException {
         logger.debug("Message Reveived from DMAAP : {}", msg);
         createDmaapMessageHandler().handleDmaapMsg(msg);
     }
 
-    private DmaapMessageHandler createDmaapMessageHandler() throws FileNotFoundException, IOException {
+    private DmaapMessageHandler createDmaapMessageHandler() throws IOException {
         String agentBaseUrl = "http://localhost:" + this.localServerPort;
         AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
         Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
         MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
 
-        return new DmaapMessageHandler(producer, this.applicationConfig, agentClient);
+        return new DmaapMessageHandler(producer, agentClient);
     }
 
     private boolean sleep(Duration duration) {
index 9b0c809..cce4a92 100644 (file)
@@ -22,12 +22,9 @@ package org.oransc.policyagent.dmaap;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-
 import java.io.IOException;
-
 import org.onap.dmaap.mr.client.MRBatchingPublisher;
 import org.oransc.policyagent.clients.AsyncRestClient;
-import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,15 +42,14 @@ public class DmaapMessageHandler {
     private final MRBatchingPublisher dmaapClient;
     private final AsyncRestClient agentClient;
 
-    public DmaapMessageHandler(MRBatchingPublisher dmaapClient, ApplicationConfig applicationConfig,
-        AsyncRestClient agentClient) {
+    public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) {
         this.agentClient = agentClient;
         this.dmaapClient = dmaapClient;
     }
 
     public void handleDmaapMsg(String msg) {
         this.createTask(msg) //
-            .subscribe(x -> logger.debug("handleDmaapMsg: " + x), //
+            .subscribe(message -> logger.debug("handleDmaapMsg: {}", message), //
                 throwable -> logger.warn("handleDmaapMsg failure ", throwable), //
                 () -> logger.debug("handleDmaapMsg complete"));
     }
@@ -73,9 +69,9 @@ public class DmaapMessageHandler {
     }
 
     private Mono<String> handleAgentCallError(Throwable t, DmaapRequestMessage dmaapRequestMessage) {
-        logger.debug("Agent call failed: " + t.getMessage());
+        logger.debug("Agent call failed: {}", t.getMessage());
         return sendDmaapResponse(t.toString(), dmaapRequestMessage, HttpStatus.NOT_FOUND) //
-            .flatMap(s -> Mono.empty());
+            .flatMap(notUsed -> Mono.empty());
     }
 
     private Mono<String> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
@@ -99,8 +95,8 @@ public class DmaapMessageHandler {
     private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
         HttpStatus status) {
         return getDmaapResponseMessage(dmaapRequestMessage, response, status) //
-            .flatMap(body -> sendToDmaap(body)) //
-            .onErrorResume(t -> handleResponseCallError(t, dmaapRequestMessage));
+            .flatMap(this::sendToDmaap) //
+            .onErrorResume(this::handleResponseCallError);
     }
 
     private Mono<String> sendToDmaap(String body) {
@@ -114,8 +110,8 @@ public class DmaapMessageHandler {
         }
     }
 
-    private Mono<String> handleResponseCallError(Throwable t, DmaapRequestMessage dmaapRequestMessage) {
-        logger.debug("Failed to respond: " + t.getMessage());
+    private Mono<String> handleResponseCallError(Throwable t) {
+        logger.debug("Failed to respond: {}", t.getMessage());
         return Mono.empty();
     }
 
@@ -135,5 +131,4 @@ public class DmaapMessageHandler {
         return Mono.just(str);
 
     }
-
 }
index fe48aec..9bcb3cb 100644 (file)
@@ -27,7 +27,7 @@ import org.immutables.value.Value;
 @Gson.TypeAdapters
 public interface DmaapRequestMessage {
 
-    public static enum Operation {
+    public enum Operation {
         PUT, GET, DELETE, POST
     }
 
index a279db5..c910dd5 100644 (file)
@@ -25,8 +25,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.Vector;
-
 import org.oransc.policyagent.exceptions.ServiceException;
 
 public class Policies {
@@ -35,9 +33,6 @@ public class Policies {
     private Map<String, Map<String, Policy>> policiesService = new HashMap<>();
     private Map<String, Map<String, Policy>> policiesType = new HashMap<>();
 
-    public Policies() {
-    }
-
     public synchronized void put(Policy policy) {
         policiesId.put(policy.id(), policy);
         multiMapPut(policiesRic, policy.ric().name(), policy);
@@ -46,12 +41,7 @@ public class Policies {
     }
 
     private void multiMapPut(Map<String, Map<String, Policy>> multiMap, String key, Policy value) {
-        Map<String, Policy> map = multiMap.get(key);
-        if (map == null) {
-            map = new HashMap<>();
-            multiMap.put(key, map);
-        }
-        map.put(value.id(), value);
+        multiMap.computeIfAbsent(key, k -> new HashMap<>()).put(value.id(), value);
     }
 
     private void multiMapRemove(Map<String, Map<String, Policy>> multiMap, String key, Policy value) {
@@ -67,7 +57,7 @@ public class Policies {
     private Collection<Policy> multiMapGet(Map<String, Map<String, Policy>> multiMap, String key) {
         Map<String, Policy> map = multiMap.get(key);
         if (map == null) {
-            return new Vector<Policy>();
+            return Collections.emptyList();
         }
         return Collections.unmodifiableCollection(map.values());
     }
index 7723983..7798231 100644 (file)
@@ -28,10 +28,7 @@ import java.util.Map;
 import org.oransc.policyagent.exceptions.ServiceException;
 
 public class PolicyTypes {
-    private Map<String, PolicyType> types = new HashMap<String, PolicyType>();
-
-    public PolicyTypes() {
-    }
+    private Map<String, PolicyType> types = new HashMap<>();
 
     public synchronized PolicyType getType(String name) throws ServiceException {
         PolicyType t = types.get(name);
index 505fce9..6eece5e 100644 (file)
@@ -161,7 +161,7 @@ public class Ric {
      */
     public enum RicState {
         /**
-         * The agent view of the agent may be inconsistent.
+         * The agent view of the Ric may be inconsistent.
          */
         UNDEFINED,
         /**
@@ -169,8 +169,8 @@ public class Ric {
          */
         IDLE,
         /**
-         * The Ric states are recovered.
+         * The agent is synchronizing the view of the Ric.
          */
-        RECOVERING
+        SYNCHRONIZING
     }
 }
index c6d2561..3b8e587 100644 (file)
@@ -30,18 +30,18 @@ import org.oransc.policyagent.exceptions.ServiceException;
  * Dynamic representation of all Rics in the system.
  */
 public class Rics {
-    Map<String, Ric> rics = new HashMap<>();
+    Map<String, Ric> registeredRics = new HashMap<>();
 
     public synchronized void put(Ric ric) {
-        rics.put(ric.name(), ric);
+        registeredRics.put(ric.name(), ric);
     }
 
     public synchronized Iterable<Ric> getRics() {
-        return rics.values();
+        return registeredRics.values();
     }
 
     public synchronized Ric getRic(String name) throws ServiceException {
-        Ric ric = rics.get(name);
+        Ric ric = registeredRics.get(name);
         if (ric == null) {
             throw new ServiceException("Could not find ric: " + name);
         }
@@ -49,23 +49,23 @@ public class Rics {
     }
 
     public synchronized Ric get(String name) {
-        return rics.get(name);
+        return registeredRics.get(name);
     }
 
     public synchronized void remove(String name) {
-        rics.remove(name);
+        registeredRics.remove(name);
     }
 
     public synchronized int size() {
-        return rics.size();
+        return registeredRics.size();
     }
 
     public synchronized void clear() {
-        this.rics.clear();
+        this.registeredRics.clear();
     }
 
     public synchronized Optional<Ric> lookupRicForManagedElement(String managedElementId) {
-        for (Ric ric : this.rics.values()) {
+        for (Ric ric : this.registeredRics.values()) {
             if (ric.getManagedElementIds().contains(managedElementId)) {
                 return Optional.of(ric);
             }
index 369b258..568f002 100644 (file)
@@ -30,13 +30,10 @@ import org.slf4j.LoggerFactory;
 public class Services {
     private static final Logger logger = LoggerFactory.getLogger(Services.class);
 
-    private Map<String, Service> services = new HashMap<>();
-
-    public Services() {
-    }
+    private Map<String, Service> registeredServices = new HashMap<>();
 
     public synchronized Service getService(String name) throws ServiceException {
-        Service s = services.get(name);
+        Service s = registeredServices.get(name);
         if (s == null) {
             throw new ServiceException("Could not find service: " + name);
         }
@@ -44,24 +41,27 @@ public class Services {
     }
 
     public synchronized Service get(String name) {
-        return services.get(name);
+        return registeredServices.get(name);
     }
 
     public synchronized void put(Service service) {
-        logger.debug("Put service: " + service.getName());
-        services.put(service.getName(), service);
+        logger.debug("Put service: {}", service.getName());
+        registeredServices.put(service.getName(), service);
     }
 
     public synchronized Iterable<Service> getAll() {
-        return services.values();
+        return registeredServices.values();
     }
 
     public synchronized void remove(String name) {
-        services.remove(name);
+        registeredServices.remove(name);
     }
 
     public synchronized int size() {
-        return services.size();
+        return registeredServices.size();
     }
 
+    public void clear() {
+        registeredServices.clear();
+    }
 }
index f5834e3..4080b37 100644 (file)
@@ -116,8 +116,9 @@ public class RefreshConfigTask {
         return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
     }
 
-    private <R> Mono<R> onErrorResume(Throwable trowable) {
-        logger.error("Could not refresh application configuration {}", trowable.toString());
+    private <R> Mono<R> onErrorResume(Throwable throwable) {
+        String errMsg = throwable.toString();
+        logger.error("Could not refresh application configuration. {}", errMsg);
         return Mono.empty();
     }
 
@@ -136,7 +137,7 @@ public class RefreshConfigTask {
     /**
      * Reads the configuration from file.
      */
-    public void loadConfigurationFromFile() {
+    void loadConfigurationFromFile() {
         String filepath = appConfig.getLocalConfigurationFilePath();
         if (filepath == null) {
             logger.debug("No localconfiguration file used");
@@ -147,9 +148,6 @@ public class RefreshConfigTask {
 
         try (InputStream inputStream = createInputStream(filepath)) {
             JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
-            if (rootObject == null) {
-                throw new JsonSyntaxException("Root is not a json object");
-            }
             ApplicationConfigParser appParser = new ApplicationConfigParser();
             appParser.parse(rootObject);
             appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapPublisherConfig(),
index d95272b..667a701 100644 (file)
@@ -41,7 +41,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Regularly checks the exisiting rics towards the local repository to keep it consistent.
+ * Regularly checks the existing rics towards the local repository to keep it consistent.
  */
 @Component
 @EnableScheduling
@@ -65,21 +65,21 @@ public class RepositorySupervision {
     }
 
     /**
-     * Regularly contacts all Rics to check if they are alive.
+     * Regularly contacts all Rics to check if they are alive and synchronized.
      */
     @Scheduled(fixedRate = 1000 * 60)
     public void checkAllRics() {
         logger.debug("Checking Rics starting");
-        createTask().subscribe(this::onRicChecked, this::onError, this::onComplete);
+        createTask().subscribe(this::onRicChecked, null, this::onComplete);
     }
 
     private Flux<RicData> createTask() {
         synchronized (this.rics) {
             return Flux.fromIterable(rics.getRics()) //
-                .flatMap(ric -> createRicData(ric)) //
-                .flatMap(ricData -> checkRicState(ricData)) //
-                .flatMap(ricData -> checkRicPolicies(ricData)) //
-                .flatMap(ricData -> checkRicPolicyTypes(ricData));
+                .flatMap(this::createRicData) //
+                .flatMap(this::checkRicState) //
+                .flatMap(this::checkRicPolicies) //
+                .flatMap(this::checkRicPolicyTypes);
         }
     }
 
@@ -101,8 +101,8 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicState(RicData ric) {
         if (ric.ric.getState() == RicState.UNDEFINED) {
-            return startRecovery(ric);
-        } else if (ric.ric.getState() == RicState.RECOVERING) {
+            return startSynchronization(ric);
+        } else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
             return Mono.empty();
         } else {
             return Mono.just(ric);
@@ -118,12 +118,12 @@ public class RepositorySupervision {
     private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
         synchronized (this.policies) {
             if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
-                return startRecovery(ric);
+                return startSynchronization(ric);
             }
         }
         for (String policyId : ricPolicies) {
             if (!policies.containsPolicy(policyId)) {
-                return startRecovery(ric);
+                return startSynchronization(ric);
             }
         }
         return Mono.just(ric);
@@ -131,40 +131,38 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
         return ric.a1Client.getPolicyTypeIdentities() //
-            .onErrorResume(t -> {
-                return Mono.empty();
-            }) //
+            .onErrorResume(notUsed -> Mono.empty()) //
             .flatMap(ricTypes -> validateTypes(ricTypes, ric));
     }
 
     private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) {
         if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) {
-            return startRecovery(ric);
+            return startSynchronization(ric);
         }
         for (String typeName : ricTypes) {
             if (!ric.ric.isSupportingType(typeName)) {
-                return startRecovery(ric);
+                return startSynchronization(ric);
             }
         }
         return Mono.just(ric);
     }
 
-    private Mono<RicData> startRecovery(RicData ric) {
-        RicRecoveryTask recovery = new RicRecoveryTask(a1ClientFactory, policyTypes, policies, services);
+    private Mono<RicData> startSynchronization(RicData ric) {
+        RicSynchronizationTask recovery = createSynchronizationTask();
         recovery.run(ric.ric);
         return Mono.empty();
     }
 
+    @SuppressWarnings("squid:S2629")
     private void onRicChecked(RicData ric) {
-        logger.info("Ric: " + ric.ric.name() + " checked");
-    }
-
-    private void onError(Throwable t) {
-        logger.error("Rics supervision failed", t);
+        logger.debug("Ric: {} checked", ric.ric.name());
     }
 
     private void onComplete() {
         logger.debug("Checking Rics completed");
     }
 
-}
+    RicSynchronizationTask createSynchronizationTask() {
+        return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
+    }
+}
\ No newline at end of file
 
 package org.oransc.policyagent.tasks;
 
+import static org.oransc.policyagent.repository.Ric.RicState;
+
+import java.util.Collection;
 import java.util.Vector;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.clients.AsyncRestClient;
-import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
@@ -41,23 +43,23 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Recovery handling of RIC.
+ * Synchronizes the content of a RIC with the content in the repository.
  * This means:
  * - load all policy types
  * - send all policy instances to the RIC
  * --- if that fails remove all policy instances
  * - Notify subscribing services
  */
-public class RicRecoveryTask {
+public class RicSynchronizationTask {
 
-    private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class);
+    private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
 
     private final A1ClientFactory a1ClientFactory;
     private final PolicyTypes policyTypes;
     private final Policies policies;
     private final Services services;
 
-    public RicRecoveryTask(A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Policies policies,
+    public RicSynchronizationTask(A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Policies policies,
         Services services) {
         this.a1ClientFactory = a1ClientFactory;
         this.policyTypes = policyTypes;
@@ -65,35 +67,41 @@ public class RicRecoveryTask {
         this.services = services;
     }
 
+    @SuppressWarnings("squid:S2629")
     public void run(Ric ric) {
         logger.debug("Handling ric: {}", ric.getConfig().name());
 
         synchronized (ric) {
-            if (ric.getState() == Ric.RicState.RECOVERING) {
-                logger.debug("Recovery ric: {} is already running", ric.getConfig().name());
+            if (ric.getState() == RicState.SYNCHRONIZING) {
+                logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
                 return;
             }
-            ric.setState(Ric.RicState.RECOVERING);
+            ric.setState(RicState.SYNCHRONIZING);
         }
         this.a1ClientFactory.createA1Client(ric)//
-            .flatMapMany(client -> startRecover(ric, client)) //
-            .subscribe(x -> logger.debug("Recover: " + x), //
-                throwable -> onRecoveryError(ric, throwable), //
-                () -> onRecoveryComplete(ric));
+            .flatMapMany(client -> startSynchronization(ric, client)) //
+            .subscribe(x -> logger.debug("Synchronize: {}", x), //
+                throwable -> onSynchronizationError(ric, throwable), //
+                () -> onSynchronizationComplete(ric));
     }
 
-    private Flux<Object> startRecover(Ric ric, A1Client a1Client) {
-        Flux<PolicyType> recoverTypes = recoverPolicyTypes(ric, a1Client);
-        Flux<?> deletePoliciesInRic = a1Client.deleteAllPolicies();
-        Flux<?> recreatePoliciesInRic = recreateAllPoliciesInRic(ric, a1Client);
-
-        return Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic);
+    private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
+        Flux<PolicyType> recoverTypes = synchronizePolicyTypes(ric, a1Client);
+        Collection<Policy> policiesForRic = policies.getForRic(ric.name());
+        Flux<?> policiesDeletedInRic = Flux.empty();
+        Flux<?> policiesRecreatedInRic = Flux.empty();
+        if (!policiesForRic.isEmpty()) {
+            policiesDeletedInRic = a1Client.deleteAllPolicies();
+            policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+        }
+        return Flux.concat(recoverTypes, policiesDeletedInRic, policiesRecreatedInRic);
     }
 
-    private void onRecoveryComplete(Ric ric) {
-        logger.debug("Recovery completed for:" + ric.name());
-        ric.setState(Ric.RicState.IDLE);
-        notifyAllServices("Recovery completed for:" + ric.name());
+    @SuppressWarnings("squid:S2629")
+    private void onSynchronizationComplete(Ric ric) {
+        logger.debug("Synchronization completed for: {}", ric.name());
+        ric.setState(RicState.IDLE);
+        notifyAllServices("Synchronization completed for:" + ric.name());
     }
 
     private void notifyAllServices(String body) {
@@ -101,56 +109,57 @@ public class RicRecoveryTask {
             for (Service service : services.getAll()) {
                 String url = service.getCallbackUrl();
                 if (service.getCallbackUrl().length() > 0) {
-                    createClient(url) //
+                    createNotificationClient(url) //
                         .put("", body) //
-                        .subscribe(rsp -> logger.debug("Service called"),
-                            throwable -> logger.warn("Service called failed", throwable),
-                            () -> logger.debug("Service called complete"));
+                        .subscribe(
+                            notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
+                                .warn("Service notification failed for service: {}", service.getName(), throwable),
+                            () -> logger.debug("All services notified"));
                 }
             }
         }
     }
 
-    private void onRecoveryError(Ric ric, Throwable t) {
-        logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+    @SuppressWarnings("squid:S2629")
+    private void onSynchronizationError(Ric ric, Throwable t) {
+        logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
+        deleteAllPoliciesInRepository(ric);
+
+        Flux<PolicyType> typesRecoveredForRic = this.a1ClientFactory.createA1Client(ric) //
+            .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
+
         // If recovery fails, try to remove all instances
-        deleteAllPolicies(ric);
-        Flux<PolicyType> recoverTypes = this.a1ClientFactory.createA1Client(ric) //
-            .flatMapMany(a1Client -> recoverPolicyTypes(ric, a1Client));
-        Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
-            .flatMapMany(a1Client -> a1Client.deleteAllPolicies());
-
-        Flux.merge(recoverTypes, deletePoliciesInRic) //
-            .subscribe(x -> logger.debug("Brute recover: " + x), //
-                throwable -> onRemoveAllError(ric, throwable), //
-                () -> onRecoveryComplete(ric));
+        Flux<?> policiesDeletedInRic = this.a1ClientFactory.createA1Client(ric) //
+            .flatMapMany(A1Client::deleteAllPolicies);
+
+        Flux.merge(typesRecoveredForRic, policiesDeletedInRic) //
+            .subscribe(x -> logger.debug("Brute recover: {}", x), //
+                throwable -> onRecoveryError(ric, throwable), //
+                () -> onSynchronizationComplete(ric));
     }
 
-    private void onRemoveAllError(Ric ric, Throwable t) {
-        logger.warn("Remove all failed for: {}, reason: {}", ric.name(), t.getMessage());
-        ric.setState(Ric.RicState.UNDEFINED);
+    @SuppressWarnings("squid:S2629")
+    private void onRecoveryError(Ric ric, Throwable t) {
+        logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
+        ric.setState(RicState.UNDEFINED);
     }
 
-    private AsyncRestClient createClient(final String url) {
+    AsyncRestClient createNotificationClient(final String url) {
         return new AsyncRestClient(url);
     }
 
-    private Flux<PolicyType> recoverPolicyTypes(Ric ric, A1Client a1Client) {
+    private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
         ric.clearSupportedPolicyTypes();
         return a1Client.getPolicyTypeIdentities() //
-            .flatMapMany(types -> Flux.fromIterable(types)) //
+            .flatMapMany(Flux::fromIterable) //
             .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
-            .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId, a1Client)) //
-            .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
+            .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
+            .doOnNext(ric::addSupportedPolicyType); //
     }
 
-    private Mono<PolicyType> getPolicyType(Ric ric, String policyTypeId, A1Client a1Client) {
+    private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
         if (policyTypes.contains(policyTypeId)) {
-            try {
-                return Mono.just(policyTypes.getType(policyTypeId));
-            } catch (ServiceException e) {
-                return Mono.error(e);
-            }
+            return Mono.just(policyTypes.get(policyTypeId));
         }
         return a1Client.getPolicyTypeSchema(policyTypeId) //
             .flatMap(schema -> createPolicyType(policyTypeId, schema));
@@ -162,7 +171,7 @@ public class RicRecoveryTask {
         return Mono.just(pt);
     }
 
-    private void deleteAllPolicies(Ric ric) {
+    private void deleteAllPoliciesInRepository(Ric ric) {
         synchronized (policies) {
             for (Policy policy : policies.getForRic(ric.name())) {
                 this.policies.remove(policy);
@@ -175,7 +184,7 @@ public class RicRecoveryTask {
             return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
                 .doOnNext(
                     policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
-                .flatMap(policy -> a1Client.putPolicy(policy));
+                .flatMap(a1Client::putPolicy);
         }
     }
 
index d4b32e0..626a9b6 100644 (file)
@@ -35,6 +35,11 @@ import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+/**
+ * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive,
+ * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is
+ * removed from the repository. This means that the service needs to register again after this.
+ */
 @Component
 @EnableScheduling
 public class ServiceSupervision {
@@ -53,15 +58,12 @@ public class ServiceSupervision {
     @Scheduled(fixedRate = 1000 * 60)
     public void checkAllServices() {
         logger.debug("Checking services starting");
-        createTask().subscribe(this::onPolicyDeleted, this::onError, this::onComplete);
+        createTask().subscribe(this::onPolicyDeleted, null, this::onComplete);
     }
 
+    @SuppressWarnings("squid:S2629")
     private void onPolicyDeleted(Policy policy) {
-        logger.info("Policy deleted due to inactivity: " + policy.id() + ", service: " + policy.ownerServiceName());
-    }
-
-    private void onError(Throwable t) {
-        logger.error("Service supervision failed", t);
+        logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName());
     }
 
     private void onComplete() {
@@ -71,15 +73,16 @@ public class ServiceSupervision {
     private Flux<Policy> createTask() {
         synchronized (services) {
             return Flux.fromIterable(services.getAll()) //
-                .filter(service -> service.isExpired()) //
-                .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
-                .flatMap(service -> getAllPolicies(service)) //
-                .doOnNext(policy -> this.policies.remove(policy)) //
-                .flatMap(policy -> deletePolicyInRic(policy));
+                .filter(Service::isExpired) //
+                .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+                .doOnNext(service -> services.remove(service.getName())) //
+                .flatMap(this::getAllPoliciesForService) //
+                .doOnNext(policies::remove) //
+                .flatMap(this::deletePolicyInRic);
         }
     }
 
-    private Flux<Policy> getAllPolicies(Service service) {
+    private Flux<Policy> getAllPoliciesForService(Service service) {
         synchronized (policies) {
             return Flux.fromIterable(policies.getForService(service.getName()));
         }
@@ -89,9 +92,10 @@ public class ServiceSupervision {
         return a1ClientFactory.createA1Client(policy.ric()) //
             .flatMap(client -> client.deletePolicy(policy) //
                 .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
-                .map((nothing) -> policy));
+                .map(nothing -> policy));
     }
 
+    @SuppressWarnings("squid:S2629")
     private Mono<String> handleDeleteFromRicFailure(Policy policy, Throwable e) {
         logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e);
         return Mono.empty();
index 54fd79f..8552162 100644 (file)
@@ -84,12 +84,13 @@ public class StartupService implements ApplicationConfig.Observer {
                 || event.equals(ApplicationConfig.RicConfigUpdate.CHANGED)) {
                 Ric ric = new Ric(ricConfig);
                 rics.put(ric);
-                RicRecoveryTask recoveryTask = new RicRecoveryTask(a1ClientFactory, policyTypes, policies, services);
+                RicSynchronizationTask recoveryTask =
+                    new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
                 recoveryTask.run(ric);
             } else if (event.equals(ApplicationConfig.RicConfigUpdate.REMOVED)) {
                 rics.remove(ricConfig.name());
             } else {
-                logger.debug("Unhandled event :" + event);
+                logger.debug("Unhandled event: {}", event);
             }
         }
     }
index fc4030f..9d45f05 100644 (file)
@@ -22,20 +22,16 @@ package org.oransc.policyagent;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Vector;
-
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.oransc.policyagent.configuration.ApplicationConfig;
@@ -163,14 +159,6 @@ public class ApplicationTest {
         }
     }
 
-    private void reset() {
-        rics.clear();
-        policies.clear();
-        policyTypes.clear();
-        assertThat(policies.size()).isEqualTo(0);
-        restTemplate.setErrorHandler(new RestTemplateResponseErrorHandler());
-    }
-
     @Test
     public void testGetRics() throws Exception {
         reset();
@@ -188,58 +176,88 @@ public class ApplicationTest {
     @Test
     public void testRecovery() throws Exception {
         reset();
-        Policy policy2 = addPolicy("policyId2", "typeName", "service", "ric");
+        String ricName = "ric";
+        Policy policy2 = addPolicy("policyId2", "typeName", "service", ricName);
 
-        getA1Client("ric").putPolicy(policy2); // put it in the RIC
+        getA1Client(ricName).putPolicy(policy2); // put it in the RIC
         policies.remove(policy2); // Remove it from the repo -> should be deleted in the RIC
 
-        Policy policy = addPolicy("policyId", "typeName", "service", "ric"); // This should be created in the RIC
+        String policyId = "policyId";
+        Policy policy = addPolicy(policyId, "typeName", "service", ricName); // This should be created in the RIC
         supervision.checkAllRics(); // The created policy should be put in the RIC
-        await().untilAsserted(() -> RicState.RECOVERING.equals(rics.getRic("ric").getState()));
-        await().untilAsserted(() -> RicState.IDLE.equals(rics.getRic("ric").getState()));
+        await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ricName).getState()));
+        await().untilAsserted(() -> RicState.IDLE.equals(rics.getRic(ricName).getState()));
 
-        Policies ricPolicies = getA1Client("ric").getPolicies();
+        Policies ricPolicies = getA1Client(ricName).getPolicies();
         assertThat(ricPolicies.size()).isEqualTo(1);
-        Policy ricPolicy = ricPolicies.get("policyId");
+        Policy ricPolicy = ricPolicies.get(policyId);
         assertThat(ricPolicy.json()).isEqualTo(policy.json());
     }
 
-    MockA1Client getA1Client(String ricName) throws ServiceException {
-        return a1ClientFactory.getOrCreateA1Client(ricName);
+    @Test
+    public void testGetRicForManagedElement_thenReturnCorrectRic() throws Exception {
+        reset();
+        addRic("notCorrectRic1");
+        addRic("notCorrectRic2");
+        addRic("notCorrectRic3");
+        addRic("notCorrectRic4");
+        addRic("notCorrectRic5");
+        addRic("notCorrectRic6");
+
+        String ricName = "ric1";
+        Ric ric = addRic(ricName);
+        String managedElementId = "kista_1";
+        ric.addManagedElement(managedElementId);
+
+        String url = baseUrl() + "/ric?managedElementId=" + managedElementId;
+        String rsp = this.restTemplate.getForObject(url, String.class);
+
+        assertThat(rsp).isEqualTo(ricName);
     }
 
     @Test
-    public void testGetRic() throws Exception {
+    public void testGetRicForManagedElementThatDoesNotExist_thenReturnEmpty() throws Exception {
         reset();
-        Ric ric = addRic("ric1");
-        ric.addManagedElement("kista_1");
-        String url = baseUrl() + "/ric?managedElementId=kista_1";
+        addRic("notCorrectRic1");
+        addRic("notCorrectRic2");
+        addRic("notCorrectRic3");
+        addRic("notCorrectRic4");
+        addRic("notCorrectRic5");
+        addRic("notCorrectRic6");
 
+        String url = baseUrl() + "/ric?managedElementId=kista_1";
         String rsp = this.restTemplate.getForObject(url, String.class);
-        System.out.println(rsp);
-        assertThat(rsp).isEqualTo("ric1");
+
+        assertThat(rsp).isNull();
     }
 
     @Test
     public void testPutPolicy() throws Exception {
         reset();
-        putService("service1");
-        this.addRic("ric1").setState(Ric.RicState.IDLE);
-        addPolicyType("type1", "ric1");
+        String serviceName = "service1";
+        String ricName = "ric1";
+        String policyTypeName = "type1";
+        String policyInstanceId = "instance1";
+
+        putService(serviceName);
+        addPolicyType(policyTypeName, ricName);
 
+        String url = baseUrl() + "/policy?type=" + policyTypeName + "&instance=" + policyInstanceId + "&ric=" + ricName
+            + "&service=" + serviceName;
         final String json = jsonString();
-        String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1";
+        this.rics.getRic(ricName).setState(Ric.RicState.IDLE);
+
         this.restTemplate.put(url, createJsonHttpEntity(json));
-        Policy policy = policies.getPolicy("instance1");
 
+        Policy policy = policies.getPolicy(policyInstanceId);
         assertThat(policy).isNotNull();
-        assertThat(policy.id()).isEqualTo("instance1");
-        assertThat(policy.ownerServiceName()).isEqualTo("service1");
+        assertThat(policy.id()).isEqualTo(policyInstanceId);
+        assertThat(policy.ownerServiceName()).isEqualTo(serviceName);
         assertThat(policy.ric().name()).isEqualTo("ric1");
 
         url = baseUrl() + "/policies";
         String rsp = this.restTemplate.getForObject(url, String.class);
-        System.out.println(rsp);
+        assertThat(rsp.contains(policyInstanceId)).isTrue();
 
     }
 
@@ -258,75 +276,6 @@ public class ApplicationTest {
         assertThat(policy.ric().name()).isEqualTo("ric1"); // Not changed
     }
 
-    private PolicyType addPolicyType(String policyTypeName, String ricName) {
-        PolicyType type = ImmutablePolicyType.builder() //
-            .name(policyTypeName) //
-            .schema("{\"title\":\"" + policyTypeName + "\"}") //
-            .build();
-
-        policyTypes.put(type);
-        addRic(ricName).addSupportedPolicyType(type);
-        return type;
-    }
-
-    private Ric addRic(String ricName) {
-        if (rics.get(ricName) != null) {
-            return rics.get(ricName);
-        }
-        Vector<String> mes = new Vector<>();
-        RicConfig conf = ImmutableRicConfig.builder() //
-            .name(ricName) //
-            .baseUrl(ricName) //
-            .managedElementIds(mes) //
-            .build();
-        Ric ric = new Ric(conf);
-        this.rics.put(ric);
-        return ric;
-    }
-
-    private String createServiceJson(String name) {
-        ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, 1, "callbackUrl");
-
-        String json = gson.toJson(service);
-        return json;
-    }
-
-    HttpEntity<String> createJsonHttpEntity(String content) {
-        HttpHeaders headers = new HttpHeaders();
-        headers.setContentType(MediaType.APPLICATION_JSON);
-        return new HttpEntity<String>(content, headers);
-    }
-
-    private void putService(String name) {
-        String url = baseUrl() + "/service";
-        HttpEntity<String> entity = createJsonHttpEntity(createServiceJson(name));
-        this.restTemplate.put(url, entity);
-    }
-
-    private String jsonString() {
-        return "{\n  \"servingCellNrcgi\": \"1\"\n }";
-    }
-
-    private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
-        addRic(ric);
-        Policy p = ImmutablePolicy.builder().id(id) //
-            .json(jsonString()) //
-            .ownerServiceName(service) //
-            .ric(rics.getRic(ric)) //
-            .type(addPolicyType(typeName, ric)) //
-            .lastModified("lastModified").build();
-        policies.put(p);
-        return p;
-    }
-
-    private Policy addPolicy(String id, String typeName, String service) throws ServiceException {
-        return addPolicy(id, typeName, service, "ric");
-    }
-
-    private String baseUrl() {
-        return "http://localhost:" + port;
-    }
-
     @Test
     public void testGetPolicy() throws Exception {
         String url = baseUrl() + "/policy?instance=id";
@@ -368,13 +317,13 @@ public class ApplicationTest {
         assertThat(rsp).contains("[{\"title\":\"type2\"}");
 
         List<String> info = parseSchemas(rsp);
-        assertEquals(2, info.size());
+        assertThat(info.size()).isEqualTo(2);
 
         url = baseUrl() + "/policy_schemas?ric=ric1";
         rsp = this.restTemplate.getForObject(url, String.class);
         assertThat(rsp).contains("type1");
         info = parseSchemas(rsp);
-        assertEquals(1, info.size());
+        assertThat(info.size()).isEqualTo(1);
     }
 
     @Test
@@ -407,6 +356,7 @@ public class ApplicationTest {
 
     @Test
     public void testGetPolicies() throws Exception {
+        reset();
         String url = baseUrl() + "/policies";
         addPolicy("id1", "type1", "service1");
 
@@ -432,14 +382,14 @@ public class ApplicationTest {
         System.out.println(rsp);
         assertThat(rsp).contains("id1");
         assertThat(rsp).contains("id2");
-        assertFalse(rsp.contains("id3"));
+        assertThat(rsp.contains("id3")).isFalse();
 
         url = baseUrl() + "/policies?type=type1&service=service2";
         rsp = this.restTemplate.getForObject(url, String.class);
         System.out.println(rsp);
-        assertFalse(rsp.contains("id1"));
+        assertThat(rsp.contains("id1")).isFalse();
         assertThat(rsp).contains("id2");
-        assertFalse(rsp.contains("id3"));
+        assertThat(rsp.contains("id3")).isFalse();
     }
 
     @Test
@@ -449,35 +399,35 @@ public class ApplicationTest {
         putService("name");
 
         // GET
-        String url = baseUrl() + "/services?name=name";
+        String url = baseUrl() + "/services?serviceName=name";
         String rsp = this.restTemplate.getForObject(url, String.class);
         List<ServiceStatus> info = parseList(rsp, ServiceStatus.class);
-        assertThat(info.size() == 1);
+        assertThat(info.size()).isEqualTo(1);
         ServiceStatus status = info.iterator().next();
-        assertThat(status.keepAliveIntervalSeconds == 1);
-        assertThat(status.serviceName.equals("name"));
+        assertThat(status.keepAliveIntervalSeconds).isEqualTo(1);
+        assertThat(status.serviceName).isEqualTo("name");
 
         // GET (all)
         url = baseUrl() + "/services";
         rsp = this.restTemplate.getForObject(url, String.class);
-        assertThat(rsp.contains("name"));
+        assertThat(rsp.contains("name")).isTrue();
         System.out.println(rsp);
 
         // Keep alive
-        url = baseUrl() + "/services/keepalive?name=name";
+        url = baseUrl() + "/services/keepalive?serviceName=name";
         rsp = this.restTemplate.postForObject(url, null, String.class);
-        assertThat(rsp.contains("OK"));
+        assertThat(rsp.contains("OK")).isTrue();
 
         // DELETE
-        assertThat(services.size() == 1);
-        url = baseUrl() + "/services?name=name";
+        assertThat(services.size()).isEqualTo(1);
+        url = baseUrl() + "/services?serviceName=name";
         this.restTemplate.delete(url);
-        assertThat(services.size() == 0);
+        assertThat(services.size()).isEqualTo(0);
 
         // Keep alive, no registerred service
-        url = baseUrl() + "/services/keepalive?name=nameXXX";
+        url = baseUrl() + "/services/keepalive?serviceName=nameXXX";
         ResponseEntity<String> entity = this.restTemplate.postForEntity(url, null, String.class);
-        assertThat(entity.getStatusCode().equals(HttpStatus.NOT_FOUND));
+        assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
     }
 
     @Test
@@ -489,7 +439,89 @@ public class ApplicationTest {
 
         String url = baseUrl() + "/policy_status?instance=id";
         String rsp = this.restTemplate.getForObject(url, String.class);
-        assertThat(rsp.equals("OK"));
+        assertThat(rsp.equals("OK")).isTrue();
+    }
+
+    private PolicyType addPolicyType(String policyTypeName, String ricName) {
+        PolicyType type = ImmutablePolicyType.builder() //
+            .name(policyTypeName) //
+            .schema("{\"title\":\"" + policyTypeName + "\"}") //
+            .build();
+
+        policyTypes.put(type);
+        addRic(ricName).addSupportedPolicyType(type);
+        return type;
+    }
+
+    private Ric addRic(String ricName) {
+        if (rics.get(ricName) != null) {
+            return rics.get(ricName);
+        }
+        Vector<String> mes = new Vector<>();
+        RicConfig conf = ImmutableRicConfig.builder() //
+            .name(ricName) //
+            .baseUrl(ricName) //
+            .managedElementIds(mes) //
+            .build();
+        Ric ric = new Ric(conf);
+        this.rics.put(ric);
+        return ric;
+    }
+
+    private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
+        addRic(ric);
+        Policy p = ImmutablePolicy.builder().id(id) //
+            .json(jsonString()) //
+            .ownerServiceName(service) //
+            .ric(rics.getRic(ric)) //
+            .type(addPolicyType(typeName, ric)) //
+            .lastModified("lastModified").build();
+        policies.put(p);
+        return p;
+    }
+
+    private Policy addPolicy(String id, String typeName, String service) throws ServiceException {
+        return addPolicy(id, typeName, service, "ric");
+    }
+
+    private String createServiceJson(String name) {
+        ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, 1, "callbackUrl");
+
+        String json = gson.toJson(service);
+        return json;
+    }
+
+    private void putService(String name) {
+        String url = baseUrl() + "/service";
+        HttpEntity<String> entity = createJsonHttpEntity(createServiceJson(name));
+        this.restTemplate.put(url, entity);
+    }
+
+    private String baseUrl() {
+        return "http://localhost:" + port;
+    }
+
+    private void reset() {
+        rics.clear();
+        policies.clear();
+        policyTypes.clear();
+        services.clear();
+        assertThat(policies.size()).isEqualTo(0);
+        restTemplate.setErrorHandler(new RestTemplateResponseErrorHandler());
+    }
+
+    private String jsonString() {
+        return "{\n  \"servingCellNrcgi\": \"1\"\n }";
+    }
+
+    private MockA1Client getA1Client(String ricName) throws ServiceException {
+        return a1ClientFactory.getOrCreateA1Client(ricName);
+    }
+
+    private HttpEntity<String> createJsonHttpEntity(String content) {
+        HttpHeaders headers = new HttpHeaders();
+        headers.setContentType(MediaType.APPLICATION_JSON);
+        return new HttpEntity<String>(content, headers);
     }
 
     private static <T> List<T> parseList(String jsonString, Class<T> clazz) {
index 7162478..21894c2 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent.clients;
 
+import static ch.qos.logback.classic.Level.WARN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -28,7 +29,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 
@@ -140,14 +140,14 @@ public class A1ClientFactoryTest {
         whenGetProtocolVersionOscA1ClientThrowException();
         whenGetProtocolVersionStdA1ClientThrowException();
 
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(A1ClientFactory.class);
+        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(A1ClientFactory.class, WARN);
         StepVerifier.create(factoryUnderTest.createA1Client(ric)) //
             .expectSubscription() //
             .expectErrorMatches(
                 throwable -> throwable instanceof Exception && throwable.getMessage().equals(EXCEPTION_MESSAGE))
             .verify();
 
-        assertEquals(Level.WARN, logAppender.list.get(0).getLevel(), "Warning not logged");
+        assertEquals(WARN, logAppender.list.get(0).getLevel(), "Warning not logged");
         assertTrue(logAppender.list.toString().contains("Could not get protocol version from RIC: " + RIC_NAME),
             "Correct message not logged");
 
index 4fd8405..c5a6b4c 100644 (file)
@@ -40,14 +40,14 @@ import reactor.core.publisher.Mono;
 public class A1ClientHelper {
     private static Gson gson = new GsonBuilder() //
         .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES) //
-        .create(); //
+        .create();
 
     private A1ClientHelper() {
     }
 
     protected static <T> String createInputJsonString(T inputParams) {
         JSONObject inputJson = new JSONObject();
-        inputJson.put("input", new JSONObject(gson.toJson(inputParams)));
+        inputJson.put("input", gson.toJson(inputParams));
         return inputJson.toString();
     }
 
index 71c8500..09dbf92 100644 (file)
@@ -46,6 +46,9 @@ import reactor.test.StepVerifier;
 
 public class DmaapMessageHandlerTest {
 
+    private static final String URL = "url";
+    private static final String PAYLOAD = "payload";
+
     private ApplicationConfig appConfig = mock(ApplicationConfig.class);
     private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
     private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
@@ -56,20 +59,20 @@ public class DmaapMessageHandlerTest {
 
     @BeforeEach
     private void setUp() throws Exception {
-        testedObject = spy(new DmaapMessageHandler(dmaapClient, appConfig, agentClient));
+        testedObject = spy(new DmaapMessageHandler(dmaapClient, agentClient));
     }
 
-    ImmutableDmaapRequestMessage dmaapRequestMessage(Operation operation) {
+    DmaapRequestMessage dmaapRequestMessage(Operation operation) {
         return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") //
             .correlationId("correlationId") //
             .operation(operation) //
             .originatorId("originatorId") //
-            .payload("payload") //
+            .payload(PAYLOAD) //
             .requestId("requestId") //
             .target("target") //
             .timestamp("timestamp") //
             .type("type") //
-            .url("url") //
+            .url(URL) //
             .build();
     }
 
@@ -78,8 +81,8 @@ public class DmaapMessageHandlerTest {
     }
 
     @Test
-    public void successfulCase() throws IOException {
-        doReturn(Mono.just("OK")).when(agentClient).delete("url");
+    public void successfulDelete() throws IOException {
+        doReturn(Mono.just("OK")).when(agentClient).delete(anyString());
         doReturn(1).when(dmaapClient).send(anyString());
         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
 
@@ -89,7 +92,67 @@ public class DmaapMessageHandlerTest {
             .expectNext("OK") //
             .verifyComplete(); //
 
-        verify(agentClient, times(1)).delete("url");
+        verify(agentClient, times(1)).delete(URL);
+        verifyNoMoreInteractions(agentClient);
+
+        verify(dmaapClient, times(1)).send(anyString());
+        verify(dmaapClient, times(1)).sendBatchWithResponse();
+        verifyNoMoreInteractions(dmaapClient);
+    }
+
+    @Test
+    public void successfulGet() throws IOException {
+        doReturn(Mono.just("OK")).when(agentClient).get(anyString());
+        doReturn(1).when(dmaapClient).send(anyString());
+        doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+        StepVerifier //
+            .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+            .expectSubscription() //
+            .expectNext("OK") //
+            .verifyComplete(); //
+
+        verify(agentClient, times(1)).get(URL);
+        verifyNoMoreInteractions(agentClient);
+
+        verify(dmaapClient, times(1)).send(anyString());
+        verify(dmaapClient, times(1)).sendBatchWithResponse();
+        verifyNoMoreInteractions(dmaapClient);
+    }
+
+    @Test
+    public void successfulPut() throws IOException {
+        doReturn(Mono.just("OK")).when(agentClient).put(anyString(), anyString());
+        doReturn(1).when(dmaapClient).send(anyString());
+        doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+        StepVerifier //
+            .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
+            .expectSubscription() //
+            .expectNext("OK") //
+            .verifyComplete(); //
+
+        verify(agentClient, times(1)).put(URL, PAYLOAD);
+        verifyNoMoreInteractions(agentClient);
+
+        verify(dmaapClient, times(1)).send(anyString());
+        verify(dmaapClient, times(1)).sendBatchWithResponse();
+        verifyNoMoreInteractions(dmaapClient);
+    }
+
+    @Test
+    public void successfulPost() throws IOException {
+        doReturn(Mono.just("OK")).when(agentClient).post(anyString(), anyString());
+        doReturn(1).when(dmaapClient).send(anyString());
+        doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+        StepVerifier //
+            .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
+            .expectSubscription() //
+            .expectNext("OK") //
+            .verifyComplete(); //
+
+        verify(agentClient, times(1)).post(URL, PAYLOAD);
         verifyNoMoreInteractions(agentClient);
 
         verify(dmaapClient, times(1)).send(anyString());
@@ -99,7 +162,7 @@ public class DmaapMessageHandlerTest {
 
     @Test
     public void errorCase() throws IOException {
-        doReturn(Mono.error(new Exception("Refused"))).when(agentClient).put("url", "payload");
+        doReturn(Mono.error(new Exception("Refused"))).when(agentClient).put(anyString(), anyString());
         doReturn(1).when(dmaapClient).send(anyString());
         doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
         StepVerifier //
@@ -107,7 +170,7 @@ public class DmaapMessageHandlerTest {
             .expectSubscription() //
             .verifyComplete(); //
 
-        verify(agentClient, times(1)).put("url", "payload");
+        verify(agentClient, times(1)).put(URL, PAYLOAD);
         verifyNoMoreInteractions(agentClient);
 
         // Error response
index 90b3847..859708a 100644 (file)
 package org.oransc.policyagent.tasks;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 import com.google.gson.JsonIOException;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -47,8 +45,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.Vector;
-
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -61,9 +57,7 @@ import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.configuration.ApplicationConfigParser;
 import org.oransc.policyagent.configuration.ImmutableRicConfig;
 import org.oransc.policyagent.configuration.RicConfig;
-import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.utils.LoggingUtils;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -71,6 +65,7 @@ import reactor.test.StepVerifier;
 @ExtendWith(MockitoExtension.class)
 public class RefreshConfigTaskTest {
 
+
     private RefreshConfigTask refreshTaskUnderTest;
 
     @Spy
@@ -79,8 +74,9 @@ public class RefreshConfigTaskTest {
     @Mock
     CbsClient cbsClient;
 
+    private static final String RIC_1_NAME = "ric1";
     public static final ImmutableRicConfig CORRECT_RIC_CONIFG = ImmutableRicConfig.builder() //
-        .name("ric1") //
+        .name(RIC_1_NAME) //
         .baseUrl("http://localhost:8080/") //
         .managedElementIds(new Vector<String>(Arrays.asList("kista_1", "kista_2"))) //
         .build();
@@ -95,7 +91,7 @@ public class RefreshConfigTaskTest {
     }
 
     @Test
-    public void whenTheConfigurationFits() throws IOException, ServiceException {
+    public void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception {
         refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
         refreshTaskUnderTest.systemEnvironment = new Properties();
         // When
@@ -113,7 +109,7 @@ public class RefreshConfigTaskTest {
     }
 
     @Test
-    public void whenFileIsExistsButJsonIsIncorrect() throws IOException, ServiceException {
+    public void whenFileExistsButJsonIsIncorrect_thenNoRicsArePutInRepository() throws Exception {
         refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
         refreshTaskUnderTest.systemEnvironment = new Properties();
 
@@ -124,11 +120,11 @@ public class RefreshConfigTaskTest {
 
         // Then
         verify(refreshTaskUnderTest, times(1)).loadConfigurationFromFile();
-        Assertions.assertEquals(0, appConfig.getRicConfigs().size());
+        assertThat(appConfig.getRicConfigs().size()).isEqualTo(0);
     }
 
     @Test
-    public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
+    public void whenPeriodicConfigRefreshNoEnvironmentVariables_thenErrorIsLogged() {
         refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
         refreshTaskUnderTest.systemEnvironment = new Properties();
 
@@ -137,11 +133,12 @@ public class RefreshConfigTaskTest {
 
         StepVerifier.create(task).expectSubscription().verifyComplete();
 
-        assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+        assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.ERROR);
+        assertThat(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined")).isTrue();
     }
 
     @Test
-    public void whenPeriodicConfigRefreshNoConsul() {
+    public void whenPeriodicConfigRefreshNoConsul_thenErrorIsLogged() {
         refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
         refreshTaskUnderTest.systemEnvironment = new Properties();
 
@@ -160,12 +157,14 @@ public class RefreshConfigTaskTest {
             .expectSubscription() //
             .verifyComplete();
 
-        assertTrue(
-            logAppender.list.toString().contains("Could not refresh application configuration java.io.IOException"));
+        assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.ERROR);
+        assertThat(
+            logAppender.list.toString().contains("Could not refresh application configuration. java.io.IOException"))
+                .isTrue();
     }
 
     @Test
-    public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
+    public void whenPeriodicConfigRefreshSuccess_thenNewConfigIsCreated() throws Exception {
         refreshTaskUnderTest = spy(new RefreshConfigTask(appConfig));
         refreshTaskUnderTest.systemEnvironment = new Properties();
 
@@ -173,7 +172,10 @@ public class RefreshConfigTaskTest {
         doReturn(Mono.just(props)).when(refreshTaskUnderTest).getEnvironment(any());
         doReturn(Mono.just(cbsClient)).when(refreshTaskUnderTest).createCbsClient(props);
 
-        Flux<JsonObject> json = Flux.just(getJsonRootObject());
+        JsonObject configAsJson = getJsonRootObject();
+        String newBaseUrl = "newBaseUrl";
+        modifyTheRicConfiguration(configAsJson, newBaseUrl);
+        Flux<JsonObject> json = Flux.just(configAsJson);
         doReturn(json).when(cbsClient).updates(any(), any(), any());
 
         Flux<ApplicationConfig> task = refreshTaskUnderTest.createRefreshTask();
@@ -184,7 +186,13 @@ public class RefreshConfigTaskTest {
             .expectNext(appConfig) //
             .verifyComplete();
 
-        Assertions.assertNotNull(appConfig.getRicConfigs());
+        assertThat(appConfig.getRicConfigs()).isNotNull();
+        assertThat(appConfig.getRic(RIC_1_NAME).baseUrl()).isEqualTo(newBaseUrl);
+    }
+
+    private void modifyTheRicConfiguration(JsonObject configAsJson, String newBaseUrl) {
+        ((JsonObject) configAsJson.getAsJsonObject("config").getAsJsonArray("ric").get(0)).addProperty("baseUrl",
+            newBaseUrl);
     }
 
     private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
index fb8d46f..7166c18 100644 (file)
 
 package org.oransc.policyagent.tasks;
 
-import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Vector;
-
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -49,83 +49,264 @@ import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
-import org.oransc.policyagent.repository.Services;
-
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
 public class RepositorySupervisionTest {
+    private static final String POLICY_TYPE_1_NAME = "type1";
+    private static final PolicyType POLICY_TYPE_1 = ImmutablePolicyType.builder() //
+        .name(POLICY_TYPE_1_NAME) //
+        .schema("") //
+        .build();
+
+    private static final Ric RIC_1 = new Ric(ImmutableRicConfig.builder() //
+        .name("ric1") //
+        .baseUrl("baseUrl1") //
+        .managedElementIds(new Vector<String>(Arrays.asList("kista_1", "kista_2"))) //
+        .build());
+
+    private static final String POLICY_1_ID = "policyId1";
+    private static final Policy POLICY_1 = ImmutablePolicy.builder() //
+        .id(POLICY_1_ID) //
+        .json("") //
+        .ownerServiceName("service") //
+        .ric(RIC_1) //
+        .type(POLICY_TYPE_1) //
+        .lastModified("now") //
+        .build();
+
+    private static final Policy POLICY_2 = ImmutablePolicy.builder() //
+        .id("policyId2") //
+        .json("") //
+        .ownerServiceName("service") //
+        .ric(RIC_1) //
+        .type(POLICY_TYPE_1) //
+        .lastModified("now") //
+        .build();
+
     @Mock
-    A1Client a1ClientMock;
+    private A1Client a1ClientMock;
 
     @Mock
-    A1ClientFactory a1ClientFactory;
+    private A1ClientFactory a1ClientFactory;
+
+    @Mock
+    private RicSynchronizationTask recoveryTaskMock;
+
+    private PolicyTypes types;
+    private Policies policies;
+    private Rics rics;
 
     @BeforeEach
     public void init() {
-        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any());
+        doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
+        types = new PolicyTypes();
+        policies = new Policies();
+        rics = new Rics();
+        RIC_1.setState(RicState.UNDEFINED);
+        RIC_1.clearSupportedPolicyTypes();
+    }
+
+    @Test
+    public void whenRicIdleAndNoChangedPoliciesOrPolicyTypes_thenNoRecovery() {
+        RIC_1.setState(RicState.IDLE);
+        RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
+        rics.put(RIC_1);
+
+        types.put(POLICY_TYPE_1);
+
+        policies.put(POLICY_1);
+
+        setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID)));
+        setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME)));
+
+        RepositorySupervision supervisorUnderTest =
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+
+        supervisorUnderTest.checkAllRics();
+
+        verify(supervisorUnderTest).checkAllRics();
+        verifyNoMoreInteractions(supervisorUnderTest);
+    }
+
+    @Test
+    public void whenRicUndefined_thenRecovery() {
+        RIC_1.setState(RicState.UNDEFINED);
+        rics.put(RIC_1);
+
+        RepositorySupervision supervisorUnderTest =
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+
+        doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
+
+        supervisorUnderTest.checkAllRics();
+
+        verify(supervisorUnderTest).checkAllRics();
+        verify(supervisorUnderTest).createSynchronizationTask();
+        verify(recoveryTaskMock).run(RIC_1);
+        verifyNoMoreInteractions(supervisorUnderTest);
     }
 
     @Test
-    public void test() {
-        Ric ric1 = new Ric(ImmutableRicConfig.builder() //
-            .name("ric1") //
-            .baseUrl("baseUrl1") //
-            .managedElementIds(new Vector<String>(Arrays.asList("kista_1", "kista_2"))) //
-            .build());
-        ric1.setState(Ric.RicState.IDLE);
-        Ric ric2 = new Ric(ImmutableRicConfig.builder() //
-            .name("ric2") //
-            .baseUrl("baseUrl2") //
-            .managedElementIds(new Vector<String>(Arrays.asList("kista_3", "kista_4"))) //
-            .build());
-        ric2.setState(Ric.RicState.UNDEFINED);
-        Ric ric3 = new Ric(ImmutableRicConfig.builder() //
-            .name("ric3") //
-            .baseUrl("baseUrl3") //
-            .managedElementIds(new Vector<String>(Arrays.asList("kista_5"))) //
-            .build());
-        Rics rics = new Rics();
-        rics.put(ric1);
-        rics.put(ric2);
-        rics.put(ric3);
-
-        PolicyType policyType = ImmutablePolicyType.builder() //
-            .name("type") //
+    public void whenRicRecovering_thenNoRecovery() {
+        RIC_1.setState(RicState.SYNCHRONIZING);
+        rics.put(RIC_1);
+
+        RepositorySupervision supervisorUnderTest =
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+
+        supervisorUnderTest.checkAllRics();
+
+        verify(supervisorUnderTest).checkAllRics();
+        verifyNoMoreInteractions(supervisorUnderTest);
+    }
+
+    @Test
+    public void whenRicIdleAndErrorGettingPolicyIdentities_thenNoRecovery() {
+        RIC_1.setState(RicState.IDLE);
+        RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
+        rics.put(RIC_1);
+
+        setUpGetPolicyIdentitiesToReturn(new Exception("Failed"));
+
+        RepositorySupervision supervisorUnderTest =
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        supervisorUnderTest.checkAllRics();
+
+        verify(supervisorUnderTest).checkAllRics();
+        verifyNoMoreInteractions(supervisorUnderTest);
+    }
+
+    @Test
+    public void whenRicIdleAndNotSameAmountOfPolicies_thenRecovery() {
+        RIC_1.setState(RicState.IDLE);
+        rics.put(RIC_1);
+
+        policies.put(POLICY_1);
+        policies.put(POLICY_2);
+
+        setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID)));
+
+        RepositorySupervision supervisorUnderTest =
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+
+        doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
+
+        supervisorUnderTest.checkAllRics();
+
+        verify(supervisorUnderTest).checkAllRics();
+        verify(supervisorUnderTest).createSynchronizationTask();
+        verify(recoveryTaskMock).run(RIC_1);
+        verifyNoMoreInteractions(supervisorUnderTest);
+    }
+
+    @Test
+    public void whenRicIdleAndSameAmountOfPoliciesButNotSamePolicies_thenRecovery() {
+        RIC_1.setState(RicState.IDLE);
+        rics.put(RIC_1);
+
+        policies.put(POLICY_1);
+        policies.put(POLICY_2);
+
+        setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID, "Another_policy")));
+
+        RepositorySupervision supervisorUnderTest =
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+
+        doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
+
+        supervisorUnderTest.checkAllRics();
+
+        verify(supervisorUnderTest).checkAllRics();
+        verify(supervisorUnderTest).createSynchronizationTask();
+        verify(recoveryTaskMock).run(RIC_1);
+        verifyNoMoreInteractions(supervisorUnderTest);
+    }
+
+    @Test
+    public void whenRicIdleAndErrorGettingPolicyTypes_thenNoRecovery() {
+        RIC_1.setState(RicState.IDLE);
+        RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
+        rics.put(RIC_1);
+
+        setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
+        setUpGetPolicyTypeIdentitiesToReturn(new Exception("Failed"));
+
+        RepositorySupervision supervisorUnderTest =
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        supervisorUnderTest.checkAllRics();
+
+        verify(supervisorUnderTest).checkAllRics();
+        verifyNoMoreInteractions(supervisorUnderTest);
+    }
+
+    @Test
+    public void whenRicIdleAndNotSameAmountOfPolicyTypes_thenRecovery() {
+        RIC_1.setState(RicState.IDLE);
+        RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
+        rics.put(RIC_1);
+
+        types.put(POLICY_TYPE_1);
+
+        setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
+        setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type")));
+
+        RepositorySupervision supervisorUnderTest =
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+
+        doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
+
+        supervisorUnderTest.checkAllRics();
+
+        verify(supervisorUnderTest).checkAllRics();
+        verify(supervisorUnderTest).createSynchronizationTask();
+        verify(recoveryTaskMock).run(RIC_1);
+        verifyNoMoreInteractions(supervisorUnderTest);
+    }
+
+    @Test
+    public void whenRicIdleAndSameAmountOfPolicyTypesButNotSameTypes_thenRecovery() {
+        PolicyType policyType2 = ImmutablePolicyType.builder() //
+            .name("policyType2") //
             .schema("") //
             .build();
-        Policy policy1 = ImmutablePolicy.builder() //
-            .id("policyId1") //
-            .json("") //
-            .ownerServiceName("service") //
-            .ric(ric1) //
-            .type(policyType) //
-            .lastModified("now") //
-            .build();
-        Policies policies = new Policies();
-        policies.put(policy1);
-        PolicyTypes types = new PolicyTypes();
-        Services services = new Services();
 
-        Mono<List<String>> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2"));
+        RIC_1.setState(RicState.IDLE);
+        RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
+        RIC_1.addSupportedPolicyType(policyType2);
+        rics.put(RIC_1);
 
-        doReturn(policyIds).when(a1ClientMock).getPolicyTypeIdentities();
-        doReturn(policyIds).when(a1ClientMock).getPolicyIdentities();
-        doReturn(Mono.just("schema")).when(a1ClientMock).getPolicyTypeSchema(anyString());
-        doReturn(Mono.just("OK")).when(a1ClientMock).putPolicy(any());
-        doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
+        setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
+        setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type")));
 
         RepositorySupervision supervisorUnderTest =
-            new RepositorySupervision(rics, policies, a1ClientFactory, types, services);
+            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+
+        doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
 
         supervisorUnderTest.checkAllRics();
 
-        await().untilAsserted(() -> RicState.IDLE.equals(ric1.getState()));
-        await().untilAsserted(() -> RicState.IDLE.equals(ric2.getState()));
-        await().untilAsserted(() -> RicState.IDLE.equals(ric3.getState()));
+        verify(supervisorUnderTest).checkAllRics();
+        verify(supervisorUnderTest).createSynchronizationTask();
+        verify(recoveryTaskMock).run(RIC_1);
+        verifyNoMoreInteractions(supervisorUnderTest);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void setUpGetPolicyIdentitiesToReturn(Object returnValue) {
+        if (returnValue instanceof List<?>) {
+            when(a1ClientMock.getPolicyIdentities()).thenReturn(Mono.just((List<String>) returnValue));
+        } else if (returnValue instanceof Exception) {
+            when(a1ClientMock.getPolicyIdentities()).thenReturn(Mono.error((Exception) returnValue));
+        }
+    }
 
-        verify(a1ClientMock, times(3)).deleteAllPolicies();
-        verifyNoMoreInteractions(a1ClientMock);
+    @SuppressWarnings("unchecked")
+    private void setUpGetPolicyTypeIdentitiesToReturn(Object returnValue) {
+        if (returnValue instanceof List<?>) {
+            when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.just((List<String>) returnValue));
+        } else if (returnValue instanceof Exception) {
+            when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.error((Exception) returnValue));
+        }
     }
-}
+}
\ No newline at end of file
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java
new file mode 100644 (file)
index 0000000..a969f3f
--- /dev/null
@@ -0,0 +1,327 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.tasks;
+
+import static ch.qos.logback.classic.Level.WARN;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.oransc.policyagent.clients.A1Client;
+import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ImmutableRicConfig;
+import org.oransc.policyagent.repository.ImmutablePolicy;
+import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Ric.RicState;
+import org.oransc.policyagent.repository.Service;
+import org.oransc.policyagent.repository.Services;
+import org.oransc.policyagent.utils.LoggingUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@ExtendWith(MockitoExtension.class)
+public class RicSynchronizationTaskTest {
+    private static final String POLICY_TYPE_1_NAME = "type1";
+    private static final PolicyType POLICY_TYPE_1 = ImmutablePolicyType.builder() //
+        .name(POLICY_TYPE_1_NAME) //
+        .schema("") //
+        .build();
+
+    private static final String RIC_1_NAME = "ric1";
+    private static final Ric RIC_1 = new Ric(ImmutableRicConfig.builder() //
+        .name(RIC_1_NAME) //
+        .baseUrl("baseUrl1") //
+        .managedElementIds(Collections.emptyList()) //
+        .build());
+
+    private static final Policy POLICY_1 = ImmutablePolicy.builder() //
+        .id("policyId1") //
+        .json("") //
+        .ownerServiceName("service") //
+        .ric(RIC_1) //
+        .type(POLICY_TYPE_1) //
+        .lastModified("now") //
+        .build();
+
+    private static final String SERVICE_1_NAME = "service1";
+    private static final String SERVICE_1_CALLBACK_URL = "callbackUrl";
+    private static final Service SERVICE_1 = new Service(SERVICE_1_NAME, Duration.ofSeconds(1), SERVICE_1_CALLBACK_URL);
+
+    @Mock
+    private A1Client a1ClientMock;
+
+    @Mock
+    private A1ClientFactory a1ClientFactoryMock;
+
+    private PolicyTypes policyTypes;
+    private Policies policies;
+    private Services services;
+
+    @BeforeEach
+    public void init() {
+        policyTypes = new PolicyTypes();
+        policies = new Policies();
+        services = new Services();
+        RIC_1.setState(RicState.UNDEFINED);
+        RIC_1.clearSupportedPolicyTypes();
+    }
+
+    @Test
+    public void ricAlreadySynchronizing_thenNoSynchronization() {
+        RIC_1.setState(RicState.SYNCHRONIZING);
+        RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
+
+        policyTypes.put(POLICY_TYPE_1);
+        policies.put(POLICY_1);
+
+        RicSynchronizationTask synchronizerUnderTest =
+            new RicSynchronizationTask(a1ClientFactoryMock, policyTypes, policies, services);
+
+        synchronizerUnderTest.run(RIC_1);
+
+        verifyNoInteractions(a1ClientMock);
+
+        assertThat(policyTypes.size()).isEqualTo(1);
+        assertThat(policies.size()).isEqualTo(1);
+        assertThat(RIC_1.getState()).isEqualTo(RicState.SYNCHRONIZING);
+        assertThat(RIC_1.getSupportedPolicyTypeNames().size()).isEqualTo(1);
+    }
+
+    @Test
+    public void ricIdlePolicyTypeInRepo_thenSynchronizationWithReuseOfTypeFromRepoAndCorrectServiceNotified() {
+        RIC_1.setState(RicState.IDLE);
+
+        policyTypes.put(POLICY_TYPE_1);
+
+        services.put(SERVICE_1);
+        Service serviceWithoutCallbackUrlShouldNotBeNotified = new Service("service2", Duration.ofSeconds(1), "");
+        services.put(serviceWithoutCallbackUrlShouldNotBeNotified);
+
+        setUpCreationOfA1Client();
+        simulateRicWithOnePolicyType();
+
+        RicSynchronizationTask synchronizerUnderTest =
+            spy(new RicSynchronizationTask(a1ClientFactoryMock, policyTypes, policies, services));
+
+        AsyncRestClient restClientMock = setUpCreationOfAsyncRestClient(synchronizerUnderTest);
+        when(restClientMock.put(anyString(), anyString())).thenReturn(Mono.just("Ok"));
+
+        synchronizerUnderTest.run(RIC_1);
+
+        verify(a1ClientMock).getPolicyTypeIdentities();
+        verifyNoMoreInteractions(a1ClientMock);
+
+        verify(synchronizerUnderTest).run(RIC_1);
+        verify(synchronizerUnderTest).createNotificationClient(SERVICE_1_CALLBACK_URL);
+        verifyNoMoreInteractions(synchronizerUnderTest);
+
+        verify(restClientMock).put("", "Synchronization completed for:" + RIC_1_NAME);
+        verifyNoMoreInteractions(restClientMock);
+
+        assertThat(policyTypes.size()).isEqualTo(1);
+        assertThat(policies.size()).isEqualTo(0);
+        assertThat(RIC_1.getState()).isEqualTo(RicState.IDLE);
+    }
+
+    @Test
+    public void ricIdlePolicyTypeNotInRepo_thenSynchronizationWithTypeFromRic() throws Exception {
+        RIC_1.setState(RicState.IDLE);
+
+        setUpCreationOfA1Client();
+        simulateRicWithOnePolicyType();
+        String typeSchema = "schema";
+        when(a1ClientMock.getPolicyTypeSchema(POLICY_TYPE_1_NAME)).thenReturn(Mono.just(typeSchema));
+
+        RicSynchronizationTask synchronizerUnderTest =
+            new RicSynchronizationTask(a1ClientFactoryMock, policyTypes, policies, services);
+
+        synchronizerUnderTest.run(RIC_1);
+
+        verify(a1ClientMock).getPolicyTypeIdentities();
+        verifyNoMoreInteractions(a1ClientMock);
+
+        assertThat(policyTypes.size()).isEqualTo(1);
+        assertThat(policyTypes.getType(POLICY_TYPE_1_NAME).schema()).isEqualTo(typeSchema);
+        assertThat(policies.size()).isEqualTo(0);
+        assertThat(RIC_1.getState()).isEqualTo(RicState.IDLE);
+    }
+
+    @Test
+    public void ricIdleAndHavePolicies_thenSynchronizationWithRecreationOfPolicies() {
+        RIC_1.setState(RicState.IDLE);
+
+        policies.put(POLICY_1);
+
+        setUpCreationOfA1Client();
+        simulateRicWithNoPolicyTypes();
+
+        when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.just("OK"));
+        when(a1ClientMock.putPolicy(any(Policy.class))).thenReturn(Mono.just("OK"));
+
+        RicSynchronizationTask synchronizerUnderTest =
+            new RicSynchronizationTask(a1ClientFactoryMock, policyTypes, policies, services);
+
+        synchronizerUnderTest.run(RIC_1);
+
+        verify(a1ClientMock).deleteAllPolicies();
+        verify(a1ClientMock).putPolicy(POLICY_1);
+        verifyNoMoreInteractions(a1ClientMock);
+
+        assertThat(policyTypes.size()).isEqualTo(0);
+        assertThat(policies.size()).isEqualTo(1);
+        assertThat(RIC_1.getState()).isEqualTo(RicState.IDLE);
+    }
+
+    @Test
+    public void ricIdleAndErrorDeletingPoliciesFirstTime_thenSynchronizationWithDeletionOfPolicies() {
+        RIC_1.setState(RicState.IDLE);
+
+        policies.put(POLICY_1);
+
+        setUpCreationOfA1Client();
+        simulateRicWithNoPolicyTypes();
+
+        when(a1ClientMock.deleteAllPolicies()) //
+            .thenReturn(Flux.error(new Exception("Exception"))) //
+            .thenReturn(Flux.just("OK"));
+
+        RicSynchronizationTask synchronizerUnderTest =
+            new RicSynchronizationTask(a1ClientFactoryMock, policyTypes, policies, services);
+
+        synchronizerUnderTest.run(RIC_1);
+
+        verify(a1ClientMock, times(2)).deleteAllPolicies();
+        verifyNoMoreInteractions(a1ClientMock);
+
+        assertThat(policyTypes.size()).isEqualTo(0);
+        assertThat(policies.size()).isEqualTo(0);
+        assertThat(RIC_1.getState()).isEqualTo(RicState.IDLE);
+    }
+
+    @Test
+    public void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() {
+        RIC_1.setState(RicState.IDLE);
+
+        policies.put(POLICY_1);
+
+        setUpCreationOfA1Client();
+        simulateRicWithNoPolicyTypes();
+
+        String originalErrorMessage = "Exception";
+        when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(new Exception(originalErrorMessage)));
+
+        RicSynchronizationTask synchronizerUnderTest =
+            new RicSynchronizationTask(a1ClientFactoryMock, policyTypes, policies, services);
+
+        final ListAppender<ILoggingEvent> logAppender =
+            LoggingUtils.getLogListAppender(RicSynchronizationTask.class, WARN);
+
+        synchronizerUnderTest.run(RIC_1);
+
+        verifyCorrectLogMessage(0, logAppender,
+            "Synchronization failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
+        verifyCorrectLogMessage(1, logAppender,
+            "Synchronization failure recovery failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
+
+        verify(a1ClientMock, times(2)).deleteAllPolicies();
+        verifyNoMoreInteractions(a1ClientMock);
+
+        assertThat(policyTypes.size()).isEqualTo(0);
+        assertThat(policies.size()).isEqualTo(0);
+        assertThat(RIC_1.getState()).isEqualTo(RicState.UNDEFINED);
+    }
+
+    @Test
+    public void ricIdlePolicyTypeInRepo_thenSynchronizationWithErrorOnServiceNotificationErrorLogged() {
+        RIC_1.setState(RicState.IDLE);
+
+        policyTypes.put(POLICY_TYPE_1);
+
+        services.put(SERVICE_1);
+
+        setUpCreationOfA1Client();
+        simulateRicWithOnePolicyType();
+
+        final ListAppender<ILoggingEvent> logAppender =
+            LoggingUtils.getLogListAppender(RicSynchronizationTask.class, WARN);
+
+        RicSynchronizationTask synchronizerUnderTest =
+            spy(new RicSynchronizationTask(a1ClientFactoryMock, policyTypes, policies, services));
+
+        AsyncRestClient restClientMock = setUpCreationOfAsyncRestClient(synchronizerUnderTest);
+        String originalErrorMessage = "Exception";
+        when(restClientMock.put(anyString(), anyString())).thenReturn(Mono.error(new Exception(originalErrorMessage)));
+
+        synchronizerUnderTest.run(RIC_1);
+
+        ILoggingEvent loggingEvent = logAppender.list.get(0);
+        assertThat(loggingEvent.getThrowableProxy().getMessage()).isEqualTo(originalErrorMessage);
+        verifyCorrectLogMessage(0, logAppender, "Service notification failed for service: " + SERVICE_1_NAME);
+    }
+
+    private void setUpCreationOfA1Client() {
+        when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
+    }
+
+    private AsyncRestClient setUpCreationOfAsyncRestClient(RicSynchronizationTask synchronizerUnderTest) {
+        AsyncRestClient restClientMock = mock(AsyncRestClient.class);
+        doReturn(restClientMock).when(synchronizerUnderTest).createNotificationClient(anyString());
+        return restClientMock;
+    }
+
+    private void simulateRicWithOnePolicyType() {
+        when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)));
+    }
+
+    private void simulateRicWithNoPolicyTypes() {
+        when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.just(Collections.emptyList()));
+    }
+
+    private void verifyCorrectLogMessage(int messageIndex, ListAppender<ILoggingEvent> logAppender,
+        String expectedMessage) {
+        ILoggingEvent loggingEvent = logAppender.list.get(messageIndex);
+        assertThat(loggingEvent.toString().contains(expectedMessage)).isTrue();
+    }
+}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java
new file mode 100644 (file)
index 0000000..381c2d1
--- /dev/null
@@ -0,0 +1,181 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.tasks;
+
+import static ch.qos.logback.classic.Level.WARN;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import java.time.Duration;
+import java.util.Collections;
+import org.awaitility.Durations;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.oransc.policyagent.clients.A1Client;
+import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.configuration.ImmutableRicConfig;
+import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.ImmutablePolicy;
+import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Service;
+import org.oransc.policyagent.repository.Services;
+import org.oransc.policyagent.utils.LoggingUtils;
+import reactor.core.publisher.Mono;
+
+@ExtendWith(MockitoExtension.class)
+public class ServiceSupervisionTest {
+
+    private static final String SERVICE_NAME = "Service name";
+    private static final String RIC_NAME = "name";
+    private static final String POLICY_ID = "policy";
+
+    @Mock
+    A1ClientFactory a1ClientFactoryMock;
+    @Mock
+    A1Client a1ClientMock;
+
+    private Services services;
+    private Service service;
+    private Policies policies;
+    private RicConfig ricConfig = ImmutableRicConfig.builder() //
+        .name(RIC_NAME) //
+        .baseUrl("baseUrl") //
+        .managedElementIds(Collections.emptyList()) //
+        .build();
+    private Ric ric = new Ric(ricConfig);
+    private PolicyType policyType = ImmutablePolicyType.builder() //
+        .name("plicyTypeName") //
+        .schema("schema") //
+        .build();
+    private Policy policy = ImmutablePolicy.builder() //
+        .id(POLICY_ID) //
+        .json("json") //
+        .ownerServiceName(SERVICE_NAME) //
+        .ric(ric) //
+        .type(policyType) //
+        .lastModified("lastModified") //
+        .build();
+
+    @Test
+    public void serviceExpired_policyAndServiceAreDeletedInRepoAndPolicyIsDeletedInRic() {
+        setUpRepositoryWithKeepAliveInterval(Duration.ofSeconds(2));
+
+        setUpCreationOfA1Client();
+        when(a1ClientMock.deletePolicy(any(Policy.class))).thenReturn(Mono.just("Policy deleted"));
+
+        ServiceSupervision serviceSupervisionUnderTest =
+            new ServiceSupervision(services, policies, a1ClientFactoryMock);
+
+        await().atMost(Durations.FIVE_SECONDS).with().pollInterval(Durations.ONE_SECOND).until(service::isExpired);
+
+        serviceSupervisionUnderTest.checkAllServices();
+
+        assertThat(policies.size()).isEqualTo(0);
+        assertThat(services.size()).isEqualTo(0);
+
+        verify(a1ClientMock).deletePolicy(policy);
+        verifyNoMoreInteractions(a1ClientMock);
+    }
+
+    @Test
+    public void serviceExpiredButDeleteInRicFails_policyAndServiceAreDeletedInRepoAndErrorLoggedForRic() {
+        setUpRepositoryWithKeepAliveInterval(Duration.ofSeconds(2));
+
+        setUpCreationOfA1Client();
+        String originalErrorMessage = "Failed";
+        when(a1ClientMock.deletePolicy(any(Policy.class))).thenReturn(Mono.error(new Exception(originalErrorMessage)));
+
+        ServiceSupervision serviceSupervisionUnderTest =
+            new ServiceSupervision(services, policies, a1ClientFactoryMock);
+
+        await().atMost(Durations.FIVE_SECONDS).with().pollInterval(Durations.ONE_SECOND).until(service::isExpired);
+
+        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ServiceSupervision.class, WARN);
+
+        serviceSupervisionUnderTest.checkAllServices();
+
+        assertThat(policies.size()).isEqualTo(0);
+        assertThat(services.size()).isEqualTo(0);
+
+        ILoggingEvent loggingEvent = logAppender.list.get(0);
+        assertThat(loggingEvent.getThrowableProxy().getMessage()).isEqualTo(originalErrorMessage);
+        String expectedLogMessage = "Could not delete policy: " + POLICY_ID + " from ric: " + RIC_NAME;
+        assertThat(loggingEvent.toString().contains(expectedLogMessage)).isTrue();
+    }
+
+    @Test
+    public void serviceNotExpired_shouldNotBeChecked() {
+        setUpRepositoryWithKeepAliveInterval(Duration.ofSeconds(2));
+
+        ServiceSupervision serviceSupervisionUnderTest =
+            new ServiceSupervision(services, policies, a1ClientFactoryMock);
+
+        serviceSupervisionUnderTest.checkAllServices();
+
+        assertThat(policies.size()).isEqualTo(1);
+        assertThat(services.size()).isEqualTo(1);
+
+        verifyNoInteractions(a1ClientFactoryMock);
+        verifyNoInteractions(a1ClientMock);
+    }
+
+    @Test
+    public void serviceWithoutKeepAliveInterval_shouldNotBeChecked() {
+        setUpRepositoryWithKeepAliveInterval(Duration.ofSeconds(0));
+
+        ServiceSupervision serviceSupervisionUnderTest =
+            new ServiceSupervision(services, policies, a1ClientFactoryMock);
+
+        serviceSupervisionUnderTest.checkAllServices();
+
+        assertThat(policies.size()).isEqualTo(1);
+        assertThat(services.size()).isEqualTo(1);
+
+        verifyNoInteractions(a1ClientFactoryMock);
+        verifyNoInteractions(a1ClientMock);
+    }
+
+    private void setUpCreationOfA1Client() {
+        when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
+    }
+
+    private void setUpRepositoryWithKeepAliveInterval(Duration keepAliveInterval) {
+        services = new Services();
+        service = new Service(SERVICE_NAME, keepAliveInterval, "callbackUrl");
+        services.put(service);
+
+        policies = new Policies();
+        policies.put(policy);
+    }
+}
index 8bf705c..ae9385f 100644 (file)
@@ -29,12 +29,12 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.oransc.policyagent.repository.Ric.RicState.IDLE;
 
+import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Vector;
 
@@ -48,6 +48,8 @@ import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.configuration.ImmutableRicConfig;
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Ric.RicState;
@@ -86,12 +88,11 @@ public class StartupServiceTest {
     }
 
     @Test
-    public void startup_allOk() {
+    public void startupAndAllOk_thenRicsAreConfiguredInRepository() {
         Mono<List<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
         Mono<List<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
         doReturn(policyTypes1, policyTypes2).when(a1ClientMock).getPolicyTypeIdentities();
         doReturn(Mono.just("Schema")).when(a1ClientMock).getPolicyTypeSchema(anyString());
-        doReturn(Flux.just("OK")).when(a1ClientMock).deleteAllPolicies();
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
@@ -108,8 +109,6 @@ public class StartupServiceTest {
 
         await().untilAsserted(() -> assertThat(policyTypes.size()).isEqualTo(2));
 
-        verify(a1ClientMock, times(2)).deleteAllPolicies();
-
         assertTrue(policyTypes.contains(POLICY_TYPE_1_NAME), POLICY_TYPE_1_NAME + " not added to PolicyTypes.");
         assertTrue(policyTypes.contains(POLICY_TYPE_2_NAME), POLICY_TYPE_2_NAME + " not added to PolicyTypes.");
         assertEquals(2, rics.size(), "Correct number of Rics not added to Rics");
@@ -143,7 +142,7 @@ public class StartupServiceTest {
     }
 
     @Test
-    public void startup_unableToConnectToGetTypes() {
+    public void startupAndUnableToConnectToGetTypes_thenRicStateSetToUndefined() {
         Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
         doReturn(error, error).when(a1ClientMock).getPolicyTypeIdentities();
 
@@ -161,7 +160,7 @@ public class StartupServiceTest {
     }
 
     @Test
-    public void startup_unableToConnectToDeleteAllPolicies() {
+    public void startupAndUnableToConnectToDeleteAllPolicies_thenRicStateSetToUndefined() {
 
         Mono<List<String>> policyTypes = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
         when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(policyTypes);
@@ -169,16 +168,28 @@ public class StartupServiceTest {
         Flux<?> error = Flux.error(new Exception("Unable to contact ric."));
         doReturn(error).when(a1ClientMock).deleteAllPolicies();
 
-        Rics rics = new Rics();
-        StartupService serviceUnderTest = new StartupService(appConfigMock, refreshTaskMock, rics, new PolicyTypes(),
-            a1ClientFactory, new Policies(), new Services());
+        RicConfig ricConfig = mock(RicConfig.class);
+        when(ricConfig.name()).thenReturn(FIRST_RIC_NAME);
+        when(ricConfig.managedElementIds()).thenReturn(ImmutableList.copyOf(Collections.emptyList()));
+        Ric ric = new Ric(ricConfig);
+
+        PolicyType policyType = mock(PolicyType.class);
+        when(policyType.name()).thenReturn(POLICY_TYPE_1_NAME);
+
+        Policy policy = mock(Policy.class);
+        when(policy.ric()).thenReturn(ric);
+        when(policy.type()).thenReturn(policyType);
+        Policies policies = new Policies();
+        policies.put(policy);
+
+        StartupService serviceUnderTest = new StartupService(appConfigMock, refreshTaskMock, new Rics(),
+            new PolicyTypes(), a1ClientFactory, policies, new Services());
 
         serviceUnderTest.startup();
         serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
             ApplicationConfig.RicConfigUpdate.ADDED);
 
-        assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).getState(),
-            "Not correct state for " + FIRST_RIC_NAME);
+        assertEquals(RicState.UNDEFINED, ric.getState(), "Not correct state for " + FIRST_RIC_NAME);
     }
 
     @SafeVarargs
index a822bb3..a594091 100644 (file)
@@ -30,23 +30,27 @@ import org.slf4j.LoggerFactory;
 public class LoggingUtils {
 
     /**
-     * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+     * Returns a ListAppender that contains all logging events. Call this method right before calling the tested
+     * method.
+     *
+     * @return the log list appender for the given class.
      */
     public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass) {
-        return getLogListAppender(logClass, false);
+        return getLogListAppender(logClass, Level.ALL);
     }
 
     /**
-     * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+     * Returns a ListAppender that contains events for the given level. Call this method right before calling the tested
+     * method.
      *
      * @param logClass class whose appender is wanted.
-     * @param allLevels true if all log levels should be activated.
+     * @param level the log level to log at.
+     *
+     * @return the log list appender for the given class logging on the given level.
      */
-    public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass, boolean allLevels) {
+    public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass, Level level) {
         Logger logger = (Logger) LoggerFactory.getLogger(logClass);
-        if (allLevels) {
-            logger.setLevel(Level.ALL);
-        }
+        logger.setLevel(level);
         ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
         listAppender.start();
         logger.addAppender(listAppender);