Recovery handling 93/2193/1
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 10 Jan 2020 06:49:31 +0000 (07:49 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 10 Jan 2020 12:49:36 +0000 (13:49 +0100)
Supervision of NearRT RIC policy types and instances.
When there is a difference between the view in the NonRT RIC
and the NearRT RIC, all policies will be deleted and the
types will be refreshed.

Introduced that policies will be created and deleted in the NonRT RIC
when created/deleted from the agent NBI.

Added Mock mode, run by command:
mvn -Dtest=MockPolicyAgent test

Issue-ID: NONRTRIC-84
Change-Id: Ica621c990f352cd69efa0dca51451d5f3c755b68
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
20 files changed:
policy-agent/README.md [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java

diff --git a/policy-agent/README.md b/policy-agent/README.md
new file mode 100644 (file)
index 0000000..be04907
--- /dev/null
@@ -0,0 +1,34 @@
+# O-RAN-SC NonRT RIC Dashboard Web Application
+
+The O-RAN NonRT RIC PolicyAgent provides a REST API for management of 
+policices. It provides support for 
+-Policy configuration. This includes
+ -One REST API towards all RICs in the network
+ -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc.
+ -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC 
+-Supervision of clients (R-APPs) to eliminate stray policies in case of failure
+-Consistency monitoring of the SMO view of policies and the actual situation in the RICs
+-Consistency monitoring of RIC capabilities (policy types)
+
+The agent can be run stand alone in a simulated test mode. Then it 
+simulates RICs. 
+The REST API is published on port 8081 and it is started by command:
+mvn -Dtest=MockPolicyAgent test
+
+The backend server publishes live API documentation at the
+URL `http://your-host-name-here:8080/swagger-ui.html`
+
+## License
+
+Copyright (C) 2019 Nordix Foundation. All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
index ddb71af..bc6d7cd 100644 (file)
 
 package org.oransc.policyagent.clients;
 
-import reactor.core.publisher.Flux;
+import java.util.Collection;
+
 import reactor.core.publisher.Mono;
 
 public interface A1Client {
 
-    public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl);
+    public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl);
 
-    public Flux<String> getPolicyIdentities(String nearRtRicUrl);
+    public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl);
 
     public Mono<String> getPolicyType(String nearRtRicUrl, String policyTypeId);
 
index f621566..b3773b8 100644 (file)
@@ -22,6 +22,7 @@ package org.oransc.policyagent.clients;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.json.JSONArray;
@@ -29,8 +30,6 @@ import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class A1ClientImpl implements A1Client {
@@ -45,19 +44,19 @@ public class A1ClientImpl implements A1Client {
     }
 
     @Override
-    public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl) {
+    public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
         logger.debug("getPolicyTypeIdentities nearRtRicUrl = {}", nearRtRicUrl);
         AsyncRestClient client = createClient(nearRtRicUrl);
-        Mono<String> response = client.get("/policytypes/identities");
-        return response.flatMapMany(this::createFlux);
+        return client.get("/policytypes/identities") //
+            .flatMap(this::parseJsonArrayOfString);
     }
 
     @Override
-    public Flux<String> getPolicyIdentities(String nearRtRicUrl) {
+    public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
         logger.debug("getPolicyIdentities nearRtRicUrl = {}", nearRtRicUrl);
         AsyncRestClient client = createClient(nearRtRicUrl);
-        Mono<String> response = client.get("/policies/identities");
-        return response.flatMapMany(this::createFlux);
+        return client.get("/policies/identities") //
+            .flatMap(this::parseJsonArrayOfString);
     }
 
     @Override
@@ -84,7 +83,7 @@ public class A1ClientImpl implements A1Client {
         return client.delete("/policies/" + policyId);
     }
 
-    private Flux<String> createFlux(String inputString) {
+    private Mono<Collection<String>> parseJsonArrayOfString(String inputString) {
         try {
             List<String> arrayList = new ArrayList<>();
             JSONArray jsonArray = new JSONArray(inputString);
@@ -92,9 +91,9 @@ public class A1ClientImpl implements A1Client {
                 arrayList.add(jsonArray.getString(i));
             }
             logger.debug("A1 client: received list = {}", arrayList);
-            return Flux.fromIterable(arrayList);
+            return Mono.just(arrayList);
         } catch (JSONException ex) { // invalid json
-            return Flux.error(ex);
+            return Mono.error(ex);
         }
     }
 
index e41f55e..00e5223 100644 (file)
@@ -109,7 +109,7 @@ public class ApplicationConfig {
         loadConfigurationFromFile(this.filepath);
 
         refreshConfigTask = createRefreshTask() //
-            .subscribe(e -> logger.info("Refreshed configuration data"),
+            .subscribe(notUsed -> logger.info("Refreshed configuration data"),
                 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
                 () -> logger.error("Configuration refresh terminated"));
     }
index e29a4e9..6d849b1 100644 (file)
@@ -31,6 +31,7 @@ import io.swagger.annotations.ApiResponses;
 import java.util.Collection;
 import java.util.Vector;
 
+import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.ImmutablePolicy;
@@ -49,6 +50,7 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @RestController
 @Api(value = "Policy Management API")
@@ -57,15 +59,18 @@ public class PolicyController {
     private final Rics rics;
     private final PolicyTypes policyTypes;
     private final Policies policies;
+    private final A1Client a1Client;
+
     private static Gson gson = new GsonBuilder() //
         .serializeNulls() //
         .create(); //
 
     @Autowired
-    PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics) {
+    PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics, A1Client a1Client) {
         this.policyTypes = types;
         this.policies = policies;
         this.rics = rics;
+        this.a1Client = a1Client;
     }
 
     @GetMapping("/policy_schemas")
@@ -132,11 +137,48 @@ public class PolicyController {
     @DeleteMapping("/policy")
     @ApiOperation(value = "Deletes the policy")
     @ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted")})
-    public ResponseEntity<Void> deletePolicy( //
-        @RequestParam(name = "instance", required = true) String instance) {
+    public Mono<ResponseEntity<Void>> deletePolicy( //
+        @RequestParam(name = "instance", required = true) String id) {
+        Policy policy = policies.get(id);
+        if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) {
+            return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) //
+                .doOnEach(notUsed -> policies.removeId(id)) //
+                .flatMap(notUsed -> {
+                    return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT));
+                });
+        } else {
+            return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+        }
+    }
 
-        policies.removeId(instance);
-        return new ResponseEntity<>(HttpStatus.NO_CONTENT);
+    @PutMapping(path = "/policy")
+    @ApiOperation(value = "Create the policy")
+    @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")})
+    public Mono<ResponseEntity<String>> putPolicy( //
+        @RequestParam(name = "type", required = true) String typeName, //
+        @RequestParam(name = "instance", required = true) String instanceId, //
+        @RequestParam(name = "ric", required = true) String ricName, //
+        @RequestParam(name = "service", required = true) String service, //
+        @RequestBody String jsonBody) {
+
+        Ric ric = rics.get(ricName);
+        PolicyType type = policyTypes.get(typeName);
+        if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) {
+            Policy policy = ImmutablePolicy.builder() //
+                .id(instanceId) //
+                .json(jsonBody) //
+                .type(type) //
+                .ric(ric) //
+                .ownerServiceName(service) //
+                .lastModified(getTimeStampUTC()) //
+                .build();
+            return a1Client.putPolicy(policy.ric().getConfig().baseUrl(), policy.id(), policy.json()) //
+                .doOnNext(notUsed -> policies.put(policy)) //
+                .flatMap(notUsed -> {
+                    return Mono.just(new ResponseEntity<>(HttpStatus.CREATED));
+                });
+        }
+        return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
     }
 
     @GetMapping("/policies")
@@ -226,32 +268,4 @@ public class PolicyController {
         return java.time.Instant.now().toString();
     }
 
-    @PutMapping(path = "/policy")
-    @ApiOperation(value = "Create the policy")
-    @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")})
-    public ResponseEntity<String> putPolicy( //
-        @RequestParam(name = "type", required = true) String type, //
-        @RequestParam(name = "instance", required = true) String instanceId, //
-        @RequestParam(name = "ric", required = true) String ric, //
-        @RequestParam(name = "service", required = true) String service, //
-        @RequestBody String jsonBody) {
-
-        try {
-            // services.getService(service).ping();
-            Ric ricObj = rics.getRic(ric);
-            Policy policy = ImmutablePolicy.builder() //
-                .id(instanceId) //
-                .json(jsonBody) //
-                .type(policyTypes.getType(type)) //
-                .ric(ricObj) //
-                .ownerServiceName(service) //
-                .lastModified(getTimeStampUTC()) //
-                .build();
-            policies.put(policy);
-            return new ResponseEntity<String>(HttpStatus.CREATED);
-        } catch (ServiceException e) {
-            return new ResponseEntity<String>(e.getMessage(), HttpStatus.NOT_FOUND);
-        }
-    }
-
 }
index db9a4e5..cbb205c 100644 (file)
@@ -31,7 +31,7 @@ interface RicInfo {
 
     public String name();
 
-    public Collection<String> nodeNames();
+    public Collection<String> managedElementIds();
 
     public Collection<String> policyTypes();
 }
index 0075fb7..797ec71 100644 (file)
@@ -99,7 +99,7 @@ public class RicRepositoryController {
             if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
                 result.add(ImmutableRicInfo.builder() //
                     .name(ric.name()) //
-                    .nodeNames(ric.getManagedNodes()) //
+                    .managedElementIds(ric.getManagedElementIds()) //
                     .policyTypes(ric.getSupportedPolicyTypeNames()) //
                     .build());
             }
index 2a4eb5a..58c91b3 100644 (file)
@@ -75,6 +75,10 @@ public class Policies {
         return policiesId.containsKey(id);
     }
 
+    public synchronized Policy get(String id) {
+        return policiesId.get(id);
+    }
+
     public synchronized Policy getPolicy(String id) throws ServiceException {
         Policy p = policiesId.get(id);
         if (p == null) {
index 0d66e7d..9dee6f9 100644 (file)
@@ -40,6 +40,10 @@ public class PolicyTypes {
         return t;
     }
 
+    public synchronized PolicyType get(String name) {
+        return types.get(name);
+    }
+
     public synchronized void put(PolicyType type) {
         types.put(type.name(), type);
     }
index 98c497f..82d84f1 100644 (file)
@@ -65,38 +65,38 @@ public class Ric {
      *
      * @return a vector containing the nodes managed by this Ric.
      */
-    public Vector<String> getManagedNodes() {
+    public Vector<String> getManagedElementIds() {
         return ricConfig.managedElementIds();
     }
 
     /**
      * Determines if the given node is managed by this Ric.
      *
-     * @param nodeName the node name to check.
+     * @param managedElementId the node name to check.
      * @return true if the given node is managed by this Ric.
      */
-    public boolean isManaging(String nodeName) {
-        return ricConfig.managedElementIds().contains(nodeName);
+    public boolean isManaging(String managedElementId) {
+        return ricConfig.managedElementIds().contains(managedElementId);
     }
 
     /**
      * Adds the given node as managed by this Ric.
      *
-     * @param nodeName the node to add.
+     * @param managedElementId the node to add.
      */
-    public void addManagedNode(String nodeName) {
-        if (!ricConfig.managedElementIds().contains(nodeName)) {
-            ricConfig.managedElementIds().add(nodeName);
+    public void addManagedElement(String managedElementId) {
+        if (!ricConfig.managedElementIds().contains(managedElementId)) {
+            ricConfig.managedElementIds().add(managedElementId);
         }
     }
 
     /**
      * Removes the given node as managed by this Ric.
      *
-     * @param nodeName the node to remove.
+     * @param managedElementId the node to remove.
      */
-    public void removeManagedNode(String nodeName) {
-        ricConfig.managedElementIds().remove(nodeName);
+    public void removeManagedElement(String managedElementId) {
+        ricConfig.managedElementIds().remove(managedElementId);
     }
 
     /**
@@ -122,23 +122,10 @@ public class Ric {
     }
 
     /**
-     * Adds policy types as supported by this Ric.
-     *
-     * @param types the policy types to support.
-     */
-    public void addSupportedPolicyTypes(Collection<PolicyType> types) {
-        for (PolicyType type : types) {
-            addSupportedPolicyType(type);
-        }
-    }
-
-    /**
-     * Removes a policy type as supported by this Ric.
-     *
-     * @param type the policy type to remove as supported by this Ric.
+     * Removes all policy type as supported by this Ric.
      */
-    public void removeSupportedPolicyType(PolicyType type) {
-        supportedPolicyTypes.remove(type.name());
+    public void clearSupportedPolicyTypes() {
+        supportedPolicyTypes.clear();
     }
 
     /**
@@ -173,6 +160,6 @@ public class Ric {
         /**
          * The Ric cannot be contacted.
          */
-        NOT_REACHABLE
+        NOT_REACHABLE, RECOVERING
     }
 }
index 153d193..6b8138f 100644 (file)
@@ -32,7 +32,7 @@ import org.oransc.policyagent.exceptions.ServiceException;
 public class Rics {
     Map<String, Ric> rics = new HashMap<>();
 
-    public void put(Ric ric) {
+    public synchronized void put(Ric ric) {
         rics.put(ric.name(), ric);
     }
 
index 512d065..81ef7ff 100644 (file)
@@ -35,11 +35,11 @@ public class Service {
         ping();
     }
 
-    public String getName() {
+    public synchronized String getName() {
         return this.name;
     }
 
-    public Duration getKeepAliveInterval() {
+    public synchronized Duration getKeepAliveInterval() {
         return this.keepAliveInterval;
     }
 
index 7984a62..9ac9d70 100644 (file)
 
 package org.oransc.policyagent.tasks;
 
+import java.util.Collection;
+
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,13 +47,15 @@ public class RepositorySupervision {
 
     private final Rics rics;
     private final Policies policies;
+    private final PolicyTypes policyTypes;
     private final A1Client a1Client;
 
     @Autowired
-    public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client) {
+    public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes) {
         this.rics = rics;
         this.policies = policies;
         this.a1Client = a1Client;
+        this.policyTypes = policyTypes;
     }
 
     /**
@@ -66,45 +70,59 @@ public class RepositorySupervision {
 
     private Flux<Ric> createTask() {
         return Flux.fromIterable(rics.getRics()) //
-            .groupBy(ric -> ric.state()) //
-            .flatMap(fluxGroup -> handleGroup(fluxGroup.key(), fluxGroup));
+            .flatMap(ric -> checkInstances(ric)) //
+            .flatMap(ric -> checkTypes(ric));
     }
 
-    private Flux<Ric> handleGroup(Ric.RicState key, Flux<Ric> fluxGroup) {
-        logger.debug("Handling group {}", key);
-        switch (key) {
-            case ACTIVE:
-                return fluxGroup.flatMap(this::checkActive);
+    private Mono<Ric> checkInstances(Ric ric) {
 
-            case NOT_REACHABLE:
-                return fluxGroup.flatMap(this::checkNotReachable);
+        return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
+            .onErrorResume(t -> Mono.empty()) //
+            .flatMap(ricP -> validateInstances(ricP, ric));
+    }
 
-            default:
-                // If not initiated, leave it to the StartupService.
-                return Flux.empty();
-        }
+    private Flux<Ric> junk() {
+        return Flux.empty();
     }
 
-    private Mono<Ric> checkActive(Ric ric) {
-        logger.debug("Handling active ric {}", ric);
-        a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
-            .filter(policyId -> !policies.containsPolicy(policyId)) //
-            .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId))
-            .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
-            .subscribe();
+    private Mono<Ric> validateInstances(Collection<String> ricPolicies, Ric ric) {
+        if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
+            return startRecovery(ric);
+        }
+        for (String policyId : ricPolicies) {
+            if (!policies.containsPolicy(policyId)) {
+                return startRecovery(ric);
+            }
+        }
         return Mono.just(ric);
     }
 
-    private Mono<Ric> checkNotReachable(Ric ric) {
-        logger.debug("Handling not reachable ric {}", ric);
-        a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
-            .filter(policyId -> !policies.containsPolicy(policyId)) //
-            .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId))
-            .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
-            .subscribe(null, null, () -> ric.setState(RicState.ACTIVE));
+    private Mono<Ric> checkTypes(Ric ric) {
+        return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
+            .onErrorResume(t -> {
+                return Mono.empty();
+            }) //
+            .flatMap(ricTypes -> validateTypes(ricTypes, ric));
+    }
+
+    private Mono<Ric> validateTypes(Collection<String> ricTypes, Ric ric) {
+        if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) {
+            return startRecovery(ric);
+        }
+        for (String typeName : ricTypes) {
+            if (!ric.isSupportingType(typeName)) {
+                return startRecovery(ric);
+            }
+        }
         return Mono.just(ric);
     }
 
+    private Mono<Ric> startRecovery(Ric ric) {
+        RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies);
+        recovery.run(ric);
+        return Mono.empty();
+    }
+
     private void onRicChecked(Ric ric) {
         logger.info("Ric: " + ric.name() + " checked");
     }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
new file mode 100644 (file)
index 0000000..3883585
--- /dev/null
@@ -0,0 +1,130 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.tasks;
+
+import java.util.Collection;
+import java.util.Vector;
+
+import org.oransc.policyagent.clients.A1Client;
+import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Loads information about RealTime-RICs at startup.
+ */
+public class RicRecoveryTask {
+
+    private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class);
+
+    private final A1Client a1Client;
+    private final PolicyTypes policyTypes;
+    private final Policies policies;
+
+    public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) {
+        this.a1Client = a1Client;
+        this.policyTypes = policyTypes;
+        this.policies = policies;
+    }
+
+    public void run(Collection<Ric> rics) {
+        for (Ric ric : rics) {
+            run(ric);
+        }
+    }
+
+    public void run(Ric ric) {
+        logger.debug("Handling ric: {}", ric.getConfig().name());
+
+        synchronized (ric) {
+            if (ric.state().equals(Ric.RicState.RECOVERING)) {
+                return; // Already running
+            }
+            ric.setState(Ric.RicState.RECOVERING);
+        }
+        Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
+        Flux<?> deletedPolicies = deletePolicies(ric);
+
+        Flux.merge(recoveredTypes, deletedPolicies) //
+            .subscribe(x -> logger.debug("Recover: " + x), //
+                throwable -> onError(ric, throwable), //
+                () -> onComplete(ric));
+    }
+
+    private void onComplete(Ric ric) {
+        logger.debug("Recovery completed for:" + ric.name());
+        ric.setState(Ric.RicState.ACTIVE);
+
+    }
+
+    private void onError(Ric ric, Throwable t) {
+        logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+        ric.setState(Ric.RicState.NOT_REACHABLE);
+    }
+
+    private Flux<PolicyType> recoverPolicyTypes(Ric ric) {
+        ric.clearSupportedPolicyTypes();
+        return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
+            .flatMapMany(types -> Flux.fromIterable(types)) //
+            .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
+            .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
+            .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
+    }
+
+    private Mono<PolicyType> getPolicyType(Ric ric, String policyTypeId) {
+        if (policyTypes.contains(policyTypeId)) {
+            try {
+                return Mono.just(policyTypes.getType(policyTypeId));
+            } catch (ServiceException e) {
+                return Mono.error(e);
+            }
+        }
+        return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
+            .flatMap(schema -> createPolicyType(policyTypeId, schema));
+    }
+
+    private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
+        PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
+        policyTypes.put(pt);
+        return Mono.just(pt);
+    }
+
+    private Flux<String> deletePolicies(Ric ric) {
+        Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
+        for (Policy policy : ricPolicies) {
+            this.policies.remove(policy);
+        }
+
+        return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
+            .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
+            .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
+            .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
+    }
+}
index 283e8ea..d2356ea 100644 (file)
@@ -22,21 +22,16 @@ package org.oransc.policyagent.tasks;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.exceptions.ServiceException;
-import org.oransc.policyagent.repository.ImmutablePolicyType;
-import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
 /**
  * Loads information about RealTime-RICs at startup.
  */
@@ -57,78 +52,29 @@ public class StartupService {
     @Autowired
     private A1Client a1Client;
 
-    StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) {
+    @Autowired
+    private Policies policies;
+
+    StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client,
+        Policies policies) {
         this.applicationConfig = appConfig;
         this.rics = rics;
         this.policyTypes = policyTypes;
         this.a1Client = a1Client;
+        this.policies = policies;
     }
 
     /**
      * Reads the configured Rics and performs the service discovery. The result is put into the repository.
      */
     public void startup() {
+        logger.debug("Starting up");
         applicationConfig.initialize();
-        Flux.fromIterable(applicationConfig.getRicConfigs()) //
-            .map(ricConfig -> new Ric(ricConfig)) //
-            .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())) //
-            .flatMap(this::addPolicyTypesForRic) //
-            .flatMap(this::deletePoliciesForRic) //
-            .doOnNext(rics::put) //
-            .subscribe();
-    }
-
-    private Mono<Ric> addPolicyTypesForRic(Ric ric) {
-        a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
-            .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
-            .flatMap((policyTypeId) -> addTypeToRepo(ric, policyTypeId)) //
-            .flatMap(type -> addTypeToRic(ric, type)) //
-            .subscribe(null, cause -> setRicToNotReachable(ric, cause), () -> setRicToActive(ric));
-        return Mono.just(ric);
-    }
-
-    private Mono<PolicyType> addTypeToRepo(Ric ric, String policyTypeId) {
-        if (policyTypes.contains(policyTypeId)) {
-            try {
-                return Mono.just(policyTypes.getType(policyTypeId));
-            } catch (ServiceException e) {
-                return Mono.error(e);
-            }
+        for (RicConfig ricConfig : applicationConfig.getRicConfigs()) {
+            rics.put(new Ric(ricConfig));
         }
-        return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
-            .flatMap(schema -> createPolicyType(policyTypeId, schema));
-    }
-
-    private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
-        PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
-        policyTypes.put(pt);
-        return Mono.just(pt);
-    }
-
-    private Mono<PolicyType> addTypeToRic(Ric ric, PolicyType policyType) {
-        ric.addSupportedPolicyType(policyType);
-        return Mono.just(policyType);
-    }
-
-    private Mono<Ric> deletePoliciesForRic(Ric ric) {
-        if (!Ric.RicState.NOT_REACHABLE.equals(ric.state())) {
-            a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
-                .doOnNext(
-                    policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
-                .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
-                .subscribe(null, cause -> setRicToNotReachable(ric, cause), null);
-        }
-
-        return Mono.just(ric);
-    }
-
-    private void setRicToNotReachable(Ric ric, Throwable t) {
-        ric.setState(Ric.RicState.NOT_REACHABLE);
-        logger.info("Unable to connect to ric {}. Cause: {}", ric.name(), t.getMessage());
-    }
-
-    private void setRicToActive(Ric ric) {
-        ric.setState(RicState.ACTIVE);
+        RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies);
+        recoveryTask.run(rics.getRics()); // recover all Rics
     }
 
 }
index 8a6523c..365d418 100644 (file)
@@ -29,11 +29,13 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
 
 import java.net.URL;
+import java.util.Collection;
 import java.util.List;
 import java.util.Vector;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.configuration.ImmutableRicConfig;
 import org.oransc.policyagent.configuration.RicConfig;
@@ -61,6 +63,7 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 import org.springframework.web.client.RestTemplate;
+import reactor.core.publisher.Mono;
 
 @ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@@ -89,25 +92,81 @@ public class ApplicationTest {
         }
     }
 
+    static class A1ClientMock implements A1Client {
+        private final Policies policies;
+        private final PolicyTypes policyTypes;
+
+        A1ClientMock(Policies policies, PolicyTypes policyTypes) {
+            this.policies = policies;
+            this.policyTypes = policyTypes;
+        }
+
+        @Override
+        public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
+            Vector<String> result = new Vector<>();
+            for (PolicyType p : this.policyTypes.getAll()) {
+                result.add(p.name());
+            }
+            return Mono.just(result);
+        }
+
+        @Override
+        public Mono<String> getPolicyType(String nearRtRicUrl, String policyTypeId) {
+            try {
+                return Mono.just(this.policies.get(policyTypeId).json());
+            } catch (Exception e) {
+                return Mono.error(e);
+            }
+        }
+
+        @Override
+        public Mono<String> putPolicy(String nearRtRicUrl, String policyId, String policyString) {
+            return Mono.just("OK");
+        }
+
+        @Override
+        public Mono<String> deletePolicy(String nearRtRicUrl, String policyId) {
+            return Mono.just("OK");
+        }
+
+        @Override
+        public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
+            return Mono.empty(); // problem is that a recovery will start
+        }
+    }
+
     /**
      * Overrides the BeanFactory.
      */
     @TestConfiguration
     static class TestBeanFactory {
+        private final Rics rics = new Rics();
+        private final Policies policies = new Policies();
+        private final PolicyTypes policyTypes = new PolicyTypes();
 
         @Bean
         public ApplicationConfig getApplicationConfig() {
             return new MockApplicationConfig();
         }
 
+        @Bean
+        A1Client getA1Client() {
+            return new A1ClientMock(this.policies, this.policyTypes);
+        }
+
+        @Bean
+        public Policies getPolicies() {
+            return this.policies;
+        }
+
+        @Bean
+        public PolicyTypes getPolicyTypes() {
+            return this.policyTypes;
+        }
+
         @Bean
         public Rics getRics() {
-            Rics rics = new Rics();
-            rics.put(new Ric(ImmutableRicConfig.builder().name("kista_1").baseUrl("kista_url")
-                .managedElementIds(new Vector<>()).build()));
-            rics.put(new Ric(ImmutableRicConfig.builder().name("ric1").baseUrl("ric_url")
-                .managedElementIds(new Vector<>()).build()));
-            return rics;
+            return this.rics;
         }
     }
 
@@ -146,8 +205,6 @@ public class ApplicationTest {
         assertThat(rsp).isEqualTo("ric1");
     }
 
-    // managedElmentId -> nodeName
-
     @Test
     public void testPutPolicy() throws Exception {
         putService("service1");
@@ -155,6 +212,7 @@ public class ApplicationTest {
         String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1";
         String json = "{}";
         addPolicyType("type1", "ric1");
+        this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE);
 
         this.restTemplate.put(url, json);
 
@@ -234,7 +292,8 @@ public class ApplicationTest {
     public void testDeletePolicy() throws Exception {
         reset();
         String url = baseUrl() + "/policy?instance=id";
-        addPolicy("id", "typeName", "service1", "ric1");
+        Policy policy = addPolicy("id", "typeName", "service1", "ric1");
+        policy.ric().setState(Ric.RicState.ACTIVE);
         assertThat(policies.size()).isEqualTo(1);
 
         this.restTemplate.delete(url);
index 743fa66..dd57710 100644 (file)
@@ -27,12 +27,13 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Files;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Vector;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
@@ -46,12 +47,10 @@ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.web.server.LocalServerPort;
 import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import reactor.core.publisher.Flux;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
 import reactor.core.publisher.Mono;
 
-@RunWith(SpringRunner.class)
+@ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 public class MockPolicyAgent {
 
@@ -80,7 +79,7 @@ public class MockPolicyAgent {
             getPolicies(nearRtRicUrl).put(policyId, policyString);
         }
 
-        public Iterable<String> getPolicyIdentities(String nearRtRicUrl) {
+        public Collection<String> getPolicyIdentities(String nearRtRicUrl) {
             return getPolicies(nearRtRicUrl).keySet();
         }
 
@@ -106,18 +105,18 @@ public class MockPolicyAgent {
         }
 
         @Override
-        public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl) {
+        public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
             Vector<String> result = new Vector<>();
             for (PolicyType p : this.policyTypes.getAll()) {
                 result.add(p.name());
             }
-            return Flux.fromIterable(result);
+            return Mono.just(result);
         }
 
         @Override
-        public Flux<String> getPolicyIdentities(String nearRtRicUrl) {
-            Iterable<String> result = policies.getPolicyIdentities(nearRtRicUrl);
-            return Flux.fromIterable(result);
+        public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
+            Collection<String> result = policies.getPolicyIdentities(nearRtRicUrl);
+            return Mono.just(result);
         }
 
         @Override
index 7b11b9a..f9a93c8 100644 (file)
@@ -36,7 +36,6 @@ import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -74,10 +73,9 @@ public class A1ClientImplTest {
         Mono<String> policyTypeIds = Mono.just(Arrays.toString(new String[] {POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME}));
         when(asyncRestClientMock.get(POLICYTYPES_IDENTITIES_URL)).thenReturn(policyTypeIds);
 
-        Flux<String> policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL);
+        Mono<?> policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL);
         verify(asyncRestClientMock).get(POLICYTYPES_IDENTITIES_URL);
-        StepVerifier.create(policyTypeIdsFlux).expectNext(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME).expectComplete()
-            .verify();
+        StepVerifier.create(policyTypeIdsFlux).expectNextCount(1).expectComplete().verify();
     }
 
     @Test
@@ -85,9 +83,9 @@ public class A1ClientImplTest {
         Mono<String> policyIds = Mono.just(Arrays.toString(new String[] {POLICY_1_ID, POLICY_2_ID}));
         when(asyncRestClientMock.get(POLICIES_IDENTITIES_URL)).thenReturn(policyIds);
 
-        Flux<String> policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL);
+        Mono<?> policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL);
         verify(asyncRestClientMock).get(POLICIES_IDENTITIES_URL);
-        StepVerifier.create(policyIdsFlux).expectNext(POLICY_1_ID, POLICY_2_ID).expectComplete().verify();
+        StepVerifier.create(policyIdsFlux).expectNextCount(1).expectComplete().verify();
     }
 
     @Test
index 195a486..791acee 100644 (file)
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Vector;
 
 import org.junit.jupiter.api.Test;
@@ -42,11 +43,10 @@ import org.oransc.policyagent.repository.ImmutablePolicyType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
-
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
@@ -93,19 +93,23 @@ public class RepositorySupervisionTest {
             .build();
         Policies policies = new Policies();
         policies.put(policy1);
+        PolicyTypes types = new PolicyTypes();
 
-        RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock);
+        RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock, types);
 
-        when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(Flux.just("policyId1", "policyId2"));
+        Mono<Collection<String>> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2"));
+        when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policyIds);
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty());
+        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds);
+        when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema"));
 
         supervisorUnderTest.checkAllRics();
 
-        await().untilAsserted(() -> RicState.ACTIVE.equals(rics.getRic("ric2").state()));
+        await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state()));
+        await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state()));
+        await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state()));
 
-        verify(a1ClientMock).getPolicyIdentities("baseUrl1");
         verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2");
-        verify(a1ClientMock).getPolicyIdentities("baseUrl2");
         verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2");
         verifyNoMoreInteractions(a1ClientMock);
     }
index 5cfabd9..0c254d5 100644 (file)
@@ -32,6 +32,8 @@ import static org.mockito.Mockito.when;
 import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE;
 import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Vector;
 
 import org.junit.jupiter.api.Test;
@@ -44,11 +46,10 @@ import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.configuration.ImmutableRicConfig;
 import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Rics;
-
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
@@ -80,18 +81,18 @@ public class StartupServiceTest {
         ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
         when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
 
-        Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
-        Flux<String> fluxType2 = Flux.just(POLICY_TYPE_2_NAME);
-        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1)
-            .thenReturn(fluxType1.concatWith(fluxType2));
-        Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
+        Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+        Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
+        Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
         when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policies);
         when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
-        StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+        StartupService serviceUnderTest =
+            new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
 
         serviceUnderTest.startup();
 
@@ -117,7 +118,8 @@ public class StartupServiceTest {
             "Not correct no of types supported for ric " + FIRST_RIC_NAME);
         assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME),
             POLICY_TYPE_1_NAME + " not supported by ric " + FIRST_RIC_NAME);
-        assertEquals(1, firstRic.getManagedNodes().size(), "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
+        assertEquals(1, firstRic.getManagedElementIds().size(),
+            "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
         assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME);
 
         Ric secondRic = rics.get(SECOND_RIC_NAME);
@@ -130,7 +132,7 @@ public class StartupServiceTest {
             POLICY_TYPE_1_NAME + " not supported by ric " + SECOND_RIC_NAME);
         assertTrue(secondRic.isSupportingType(POLICY_TYPE_2_NAME),
             POLICY_TYPE_2_NAME + " not supported by ric " + SECOND_RIC_NAME);
-        assertEquals(2, secondRic.getManagedNodes().size(),
+        assertEquals(2, secondRic.getManagedElementIds().size(),
             "Not correct no of managed nodes for ric " + SECOND_RIC_NAME);
         assertTrue(secondRic.isManaging(MANAGED_NODE_B), MANAGED_NODE_B + " not managed by ric " + SECOND_RIC_NAME);
         assertTrue(secondRic.isManaging(MANAGED_NODE_C), MANAGED_NODE_C + " not managed by ric " + SECOND_RIC_NAME);
@@ -143,18 +145,19 @@ public class StartupServiceTest {
         ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
         when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
 
-        Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
-        doReturn(Flux.error(new Exception("Unable to contact ric.")), fluxType1).when(a1ClientMock)
-            .getPolicyTypeIdentities(anyString());
+        Mono<Collection<String>> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+        Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
+        doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString());
 
-        Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
+        Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
         doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString());
         when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
-        StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+        StartupService serviceUnderTest =
+            new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
 
         serviceUnderTest.startup();
 
@@ -173,19 +176,19 @@ public class StartupServiceTest {
         ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
         when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
 
-        Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
-        Flux<String> fluxType2 = Flux.just(POLICY_TYPE_2_NAME);
-        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1)
-            .thenReturn(fluxType1.concatWith(fluxType2));
+        Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+        Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
         when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
-        Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
-        doReturn(Flux.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
+        Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
+        doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
             .getPolicyIdentities(anyString());
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
-        StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+        StartupService serviceUnderTest =
+            new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
 
         serviceUnderTest.startup();
 
@@ -197,14 +200,19 @@ public class StartupServiceTest {
         assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
     }
 
-    private RicConfig getRicConfig(String name, String baseUrl, String... nodeNames) {
-        Vector<String> managedNodes = new Vector<String>(1);
-        for (String nodeName : nodeNames) {
-            managedNodes.add(nodeName);
+    @SafeVarargs
+    private <T> Vector<T> toVector(T... objs) {
+        Vector<T> result = new Vector<>();
+        for (T o : objs) {
+            result.add(o);
         }
+        return result;
+    }
+
+    private RicConfig getRicConfig(String name, String baseUrl, String... managedElementIds) {
         ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
             .name(name) //
-            .managedElementIds(managedNodes) //
+            .managedElementIds(toVector(managedElementIds)) //
             .baseUrl(baseUrl) //
             .build();
         return ricConfig;