Merge "Recovery handling"
authorHenrik Andersson <henrik.b.andersson@est.tech>
Tue, 14 Jan 2020 07:33:22 +0000 (07:33 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Tue, 14 Jan 2020 07:33:22 +0000 (07:33 +0000)
20 files changed:
policy-agent/README.md [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java

diff --git a/policy-agent/README.md b/policy-agent/README.md
new file mode 100644 (file)
index 0000000..be04907
--- /dev/null
@@ -0,0 +1,34 @@
+# O-RAN-SC NonRT RIC Dashboard Web Application
+
+The O-RAN NonRT RIC PolicyAgent provides a REST API for management of 
+policices. It provides support for 
+-Policy configuration. This includes
+ -One REST API towards all RICs in the network
+ -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc.
+ -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC 
+-Supervision of clients (R-APPs) to eliminate stray policies in case of failure
+-Consistency monitoring of the SMO view of policies and the actual situation in the RICs
+-Consistency monitoring of RIC capabilities (policy types)
+
+The agent can be run stand alone in a simulated test mode. Then it 
+simulates RICs. 
+The REST API is published on port 8081 and it is started by command:
+mvn -Dtest=MockPolicyAgent test
+
+The backend server publishes live API documentation at the
+URL `http://your-host-name-here:8080/swagger-ui.html`
+
+## License
+
+Copyright (C) 2019 Nordix Foundation. All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
index ddb71af..bc6d7cd 100644 (file)
 
 package org.oransc.policyagent.clients;
 
-import reactor.core.publisher.Flux;
+import java.util.Collection;
+
 import reactor.core.publisher.Mono;
 
 public interface A1Client {
 
-    public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl);
+    public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl);
 
-    public Flux<String> getPolicyIdentities(String nearRtRicUrl);
+    public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl);
 
     public Mono<String> getPolicyType(String nearRtRicUrl, String policyTypeId);
 
index f621566..b3773b8 100644 (file)
@@ -22,6 +22,7 @@ package org.oransc.policyagent.clients;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.json.JSONArray;
@@ -29,8 +30,6 @@ import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class A1ClientImpl implements A1Client {
@@ -45,19 +44,19 @@ public class A1ClientImpl implements A1Client {
     }
 
     @Override
-    public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl) {
+    public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
         logger.debug("getPolicyTypeIdentities nearRtRicUrl = {}", nearRtRicUrl);
         AsyncRestClient client = createClient(nearRtRicUrl);
-        Mono<String> response = client.get("/policytypes/identities");
-        return response.flatMapMany(this::createFlux);
+        return client.get("/policytypes/identities") //
+            .flatMap(this::parseJsonArrayOfString);
     }
 
     @Override
-    public Flux<String> getPolicyIdentities(String nearRtRicUrl) {
+    public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
         logger.debug("getPolicyIdentities nearRtRicUrl = {}", nearRtRicUrl);
         AsyncRestClient client = createClient(nearRtRicUrl);
-        Mono<String> response = client.get("/policies/identities");
-        return response.flatMapMany(this::createFlux);
+        return client.get("/policies/identities") //
+            .flatMap(this::parseJsonArrayOfString);
     }
 
     @Override
@@ -84,7 +83,7 @@ public class A1ClientImpl implements A1Client {
         return client.delete("/policies/" + policyId);
     }
 
-    private Flux<String> createFlux(String inputString) {
+    private Mono<Collection<String>> parseJsonArrayOfString(String inputString) {
         try {
             List<String> arrayList = new ArrayList<>();
             JSONArray jsonArray = new JSONArray(inputString);
@@ -92,9 +91,9 @@ public class A1ClientImpl implements A1Client {
                 arrayList.add(jsonArray.getString(i));
             }
             logger.debug("A1 client: received list = {}", arrayList);
-            return Flux.fromIterable(arrayList);
+            return Mono.just(arrayList);
         } catch (JSONException ex) { // invalid json
-            return Flux.error(ex);
+            return Mono.error(ex);
         }
     }
 
index e41f55e..00e5223 100644 (file)
@@ -109,7 +109,7 @@ public class ApplicationConfig {
         loadConfigurationFromFile(this.filepath);
 
         refreshConfigTask = createRefreshTask() //
-            .subscribe(e -> logger.info("Refreshed configuration data"),
+            .subscribe(notUsed -> logger.info("Refreshed configuration data"),
                 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
                 () -> logger.error("Configuration refresh terminated"));
     }
index e29a4e9..6d849b1 100644 (file)
@@ -31,6 +31,7 @@ import io.swagger.annotations.ApiResponses;
 import java.util.Collection;
 import java.util.Vector;
 
+import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.ImmutablePolicy;
@@ -49,6 +50,7 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @RestController
 @Api(value = "Policy Management API")
@@ -57,15 +59,18 @@ public class PolicyController {
     private final Rics rics;
     private final PolicyTypes policyTypes;
     private final Policies policies;
+    private final A1Client a1Client;
+
     private static Gson gson = new GsonBuilder() //
         .serializeNulls() //
         .create(); //
 
     @Autowired
-    PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics) {
+    PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics, A1Client a1Client) {
         this.policyTypes = types;
         this.policies = policies;
         this.rics = rics;
+        this.a1Client = a1Client;
     }
 
     @GetMapping("/policy_schemas")
@@ -132,11 +137,48 @@ public class PolicyController {
     @DeleteMapping("/policy")
     @ApiOperation(value = "Deletes the policy")
     @ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted")})
-    public ResponseEntity<Void> deletePolicy( //
-        @RequestParam(name = "instance", required = true) String instance) {
+    public Mono<ResponseEntity<Void>> deletePolicy( //
+        @RequestParam(name = "instance", required = true) String id) {
+        Policy policy = policies.get(id);
+        if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) {
+            return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) //
+                .doOnEach(notUsed -> policies.removeId(id)) //
+                .flatMap(notUsed -> {
+                    return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT));
+                });
+        } else {
+            return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+        }
+    }
 
-        policies.removeId(instance);
-        return new ResponseEntity<>(HttpStatus.NO_CONTENT);
+    @PutMapping(path = "/policy")
+    @ApiOperation(value = "Create the policy")
+    @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")})
+    public Mono<ResponseEntity<String>> putPolicy( //
+        @RequestParam(name = "type", required = true) String typeName, //
+        @RequestParam(name = "instance", required = true) String instanceId, //
+        @RequestParam(name = "ric", required = true) String ricName, //
+        @RequestParam(name = "service", required = true) String service, //
+        @RequestBody String jsonBody) {
+
+        Ric ric = rics.get(ricName);
+        PolicyType type = policyTypes.get(typeName);
+        if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) {
+            Policy policy = ImmutablePolicy.builder() //
+                .id(instanceId) //
+                .json(jsonBody) //
+                .type(type) //
+                .ric(ric) //
+                .ownerServiceName(service) //
+                .lastModified(getTimeStampUTC()) //
+                .build();
+            return a1Client.putPolicy(policy.ric().getConfig().baseUrl(), policy.id(), policy.json()) //
+                .doOnNext(notUsed -> policies.put(policy)) //
+                .flatMap(notUsed -> {
+                    return Mono.just(new ResponseEntity<>(HttpStatus.CREATED));
+                });
+        }
+        return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
     }
 
     @GetMapping("/policies")
@@ -226,32 +268,4 @@ public class PolicyController {
         return java.time.Instant.now().toString();
     }
 
-    @PutMapping(path = "/policy")
-    @ApiOperation(value = "Create the policy")
-    @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")})
-    public ResponseEntity<String> putPolicy( //
-        @RequestParam(name = "type", required = true) String type, //
-        @RequestParam(name = "instance", required = true) String instanceId, //
-        @RequestParam(name = "ric", required = true) String ric, //
-        @RequestParam(name = "service", required = true) String service, //
-        @RequestBody String jsonBody) {
-
-        try {
-            // services.getService(service).ping();
-            Ric ricObj = rics.getRic(ric);
-            Policy policy = ImmutablePolicy.builder() //
-                .id(instanceId) //
-                .json(jsonBody) //
-                .type(policyTypes.getType(type)) //
-                .ric(ricObj) //
-                .ownerServiceName(service) //
-                .lastModified(getTimeStampUTC()) //
-                .build();
-            policies.put(policy);
-            return new ResponseEntity<String>(HttpStatus.CREATED);
-        } catch (ServiceException e) {
-            return new ResponseEntity<String>(e.getMessage(), HttpStatus.NOT_FOUND);
-        }
-    }
-
 }
index db9a4e5..cbb205c 100644 (file)
@@ -31,7 +31,7 @@ interface RicInfo {
 
     public String name();
 
-    public Collection<String> nodeNames();
+    public Collection<String> managedElementIds();
 
     public Collection<String> policyTypes();
 }
index 0075fb7..797ec71 100644 (file)
@@ -99,7 +99,7 @@ public class RicRepositoryController {
             if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
                 result.add(ImmutableRicInfo.builder() //
                     .name(ric.name()) //
-                    .nodeNames(ric.getManagedNodes()) //
+                    .managedElementIds(ric.getManagedElementIds()) //
                     .policyTypes(ric.getSupportedPolicyTypeNames()) //
                     .build());
             }
index 2a4eb5a..58c91b3 100644 (file)
@@ -75,6 +75,10 @@ public class Policies {
         return policiesId.containsKey(id);
     }
 
+    public synchronized Policy get(String id) {
+        return policiesId.get(id);
+    }
+
     public synchronized Policy getPolicy(String id) throws ServiceException {
         Policy p = policiesId.get(id);
         if (p == null) {
index 0d66e7d..9dee6f9 100644 (file)
@@ -40,6 +40,10 @@ public class PolicyTypes {
         return t;
     }
 
+    public synchronized PolicyType get(String name) {
+        return types.get(name);
+    }
+
     public synchronized void put(PolicyType type) {
         types.put(type.name(), type);
     }
index 98c497f..82d84f1 100644 (file)
@@ -65,38 +65,38 @@ public class Ric {
      *
      * @return a vector containing the nodes managed by this Ric.
      */
-    public Vector<String> getManagedNodes() {
+    public Vector<String> getManagedElementIds() {
         return ricConfig.managedElementIds();
     }
 
     /**
      * Determines if the given node is managed by this Ric.
      *
-     * @param nodeName the node name to check.
+     * @param managedElementId the node name to check.
      * @return true if the given node is managed by this Ric.
      */
-    public boolean isManaging(String nodeName) {
-        return ricConfig.managedElementIds().contains(nodeName);
+    public boolean isManaging(String managedElementId) {
+        return ricConfig.managedElementIds().contains(managedElementId);
     }
 
     /**
      * Adds the given node as managed by this Ric.
      *
-     * @param nodeName the node to add.
+     * @param managedElementId the node to add.
      */
-    public void addManagedNode(String nodeName) {
-        if (!ricConfig.managedElementIds().contains(nodeName)) {
-            ricConfig.managedElementIds().add(nodeName);
+    public void addManagedElement(String managedElementId) {
+        if (!ricConfig.managedElementIds().contains(managedElementId)) {
+            ricConfig.managedElementIds().add(managedElementId);
         }
     }
 
     /**
      * Removes the given node as managed by this Ric.
      *
-     * @param nodeName the node to remove.
+     * @param managedElementId the node to remove.
      */
-    public void removeManagedNode(String nodeName) {
-        ricConfig.managedElementIds().remove(nodeName);
+    public void removeManagedElement(String managedElementId) {
+        ricConfig.managedElementIds().remove(managedElementId);
     }
 
     /**
@@ -122,23 +122,10 @@ public class Ric {
     }
 
     /**
-     * Adds policy types as supported by this Ric.
-     *
-     * @param types the policy types to support.
-     */
-    public void addSupportedPolicyTypes(Collection<PolicyType> types) {
-        for (PolicyType type : types) {
-            addSupportedPolicyType(type);
-        }
-    }
-
-    /**
-     * Removes a policy type as supported by this Ric.
-     *
-     * @param type the policy type to remove as supported by this Ric.
+     * Removes all policy type as supported by this Ric.
      */
-    public void removeSupportedPolicyType(PolicyType type) {
-        supportedPolicyTypes.remove(type.name());
+    public void clearSupportedPolicyTypes() {
+        supportedPolicyTypes.clear();
     }
 
     /**
@@ -173,6 +160,6 @@ public class Ric {
         /**
          * The Ric cannot be contacted.
          */
-        NOT_REACHABLE
+        NOT_REACHABLE, RECOVERING
     }
 }
index 153d193..6b8138f 100644 (file)
@@ -32,7 +32,7 @@ import org.oransc.policyagent.exceptions.ServiceException;
 public class Rics {
     Map<String, Ric> rics = new HashMap<>();
 
-    public void put(Ric ric) {
+    public synchronized void put(Ric ric) {
         rics.put(ric.name(), ric);
     }
 
index 512d065..81ef7ff 100644 (file)
@@ -35,11 +35,11 @@ public class Service {
         ping();
     }
 
-    public String getName() {
+    public synchronized String getName() {
         return this.name;
     }
 
-    public Duration getKeepAliveInterval() {
+    public synchronized Duration getKeepAliveInterval() {
         return this.keepAliveInterval;
     }
 
index 7984a62..9ac9d70 100644 (file)
 
 package org.oransc.policyagent.tasks;
 
+import java.util.Collection;
+
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,13 +47,15 @@ public class RepositorySupervision {
 
     private final Rics rics;
     private final Policies policies;
+    private final PolicyTypes policyTypes;
     private final A1Client a1Client;
 
     @Autowired
-    public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client) {
+    public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes) {
         this.rics = rics;
         this.policies = policies;
         this.a1Client = a1Client;
+        this.policyTypes = policyTypes;
     }
 
     /**
@@ -66,45 +70,59 @@ public class RepositorySupervision {
 
     private Flux<Ric> createTask() {
         return Flux.fromIterable(rics.getRics()) //
-            .groupBy(ric -> ric.state()) //
-            .flatMap(fluxGroup -> handleGroup(fluxGroup.key(), fluxGroup));
+            .flatMap(ric -> checkInstances(ric)) //
+            .flatMap(ric -> checkTypes(ric));
     }
 
-    private Flux<Ric> handleGroup(Ric.RicState key, Flux<Ric> fluxGroup) {
-        logger.debug("Handling group {}", key);
-        switch (key) {
-            case ACTIVE:
-                return fluxGroup.flatMap(this::checkActive);
+    private Mono<Ric> checkInstances(Ric ric) {
 
-            case NOT_REACHABLE:
-                return fluxGroup.flatMap(this::checkNotReachable);
+        return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
+            .onErrorResume(t -> Mono.empty()) //
+            .flatMap(ricP -> validateInstances(ricP, ric));
+    }
 
-            default:
-                // If not initiated, leave it to the StartupService.
-                return Flux.empty();
-        }
+    private Flux<Ric> junk() {
+        return Flux.empty();
     }
 
-    private Mono<Ric> checkActive(Ric ric) {
-        logger.debug("Handling active ric {}", ric);
-        a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
-            .filter(policyId -> !policies.containsPolicy(policyId)) //
-            .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId))
-            .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
-            .subscribe();
+    private Mono<Ric> validateInstances(Collection<String> ricPolicies, Ric ric) {
+        if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
+            return startRecovery(ric);
+        }
+        for (String policyId : ricPolicies) {
+            if (!policies.containsPolicy(policyId)) {
+                return startRecovery(ric);
+            }
+        }
         return Mono.just(ric);
     }
 
-    private Mono<Ric> checkNotReachable(Ric ric) {
-        logger.debug("Handling not reachable ric {}", ric);
-        a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
-            .filter(policyId -> !policies.containsPolicy(policyId)) //
-            .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId))
-            .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
-            .subscribe(null, null, () -> ric.setState(RicState.ACTIVE));
+    private Mono<Ric> checkTypes(Ric ric) {
+        return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
+            .onErrorResume(t -> {
+                return Mono.empty();
+            }) //
+            .flatMap(ricTypes -> validateTypes(ricTypes, ric));
+    }
+
+    private Mono<Ric> validateTypes(Collection<String> ricTypes, Ric ric) {
+        if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) {
+            return startRecovery(ric);
+        }
+        for (String typeName : ricTypes) {
+            if (!ric.isSupportingType(typeName)) {
+                return startRecovery(ric);
+            }
+        }
         return Mono.just(ric);
     }
 
+    private Mono<Ric> startRecovery(Ric ric) {
+        RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies);
+        recovery.run(ric);
+        return Mono.empty();
+    }
+
     private void onRicChecked(Ric ric) {
         logger.info("Ric: " + ric.name() + " checked");
     }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
new file mode 100644 (file)
index 0000000..3883585
--- /dev/null
@@ -0,0 +1,130 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.tasks;
+
+import java.util.Collection;
+import java.util.Vector;
+
+import org.oransc.policyagent.clients.A1Client;
+import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Policies;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Loads information about RealTime-RICs at startup.
+ */
+public class RicRecoveryTask {
+
+    private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class);
+
+    private final A1Client a1Client;
+    private final PolicyTypes policyTypes;
+    private final Policies policies;
+
+    public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) {
+        this.a1Client = a1Client;
+        this.policyTypes = policyTypes;
+        this.policies = policies;
+    }
+
+    public void run(Collection<Ric> rics) {
+        for (Ric ric : rics) {
+            run(ric);
+        }
+    }
+
+    public void run(Ric ric) {
+        logger.debug("Handling ric: {}", ric.getConfig().name());
+
+        synchronized (ric) {
+            if (ric.state().equals(Ric.RicState.RECOVERING)) {
+                return; // Already running
+            }
+            ric.setState(Ric.RicState.RECOVERING);
+        }
+        Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
+        Flux<?> deletedPolicies = deletePolicies(ric);
+
+        Flux.merge(recoveredTypes, deletedPolicies) //
+            .subscribe(x -> logger.debug("Recover: " + x), //
+                throwable -> onError(ric, throwable), //
+                () -> onComplete(ric));
+    }
+
+    private void onComplete(Ric ric) {
+        logger.debug("Recovery completed for:" + ric.name());
+        ric.setState(Ric.RicState.ACTIVE);
+
+    }
+
+    private void onError(Ric ric, Throwable t) {
+        logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+        ric.setState(Ric.RicState.NOT_REACHABLE);
+    }
+
+    private Flux<PolicyType> recoverPolicyTypes(Ric ric) {
+        ric.clearSupportedPolicyTypes();
+        return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
+            .flatMapMany(types -> Flux.fromIterable(types)) //
+            .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
+            .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
+            .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
+    }
+
+    private Mono<PolicyType> getPolicyType(Ric ric, String policyTypeId) {
+        if (policyTypes.contains(policyTypeId)) {
+            try {
+                return Mono.just(policyTypes.getType(policyTypeId));
+            } catch (ServiceException e) {
+                return Mono.error(e);
+            }
+        }
+        return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
+            .flatMap(schema -> createPolicyType(policyTypeId, schema));
+    }
+
+    private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
+        PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
+        policyTypes.put(pt);
+        return Mono.just(pt);
+    }
+
+    private Flux<String> deletePolicies(Ric ric) {
+        Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
+        for (Policy policy : ricPolicies) {
+            this.policies.remove(policy);
+        }
+
+        return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
+            .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
+            .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
+            .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
+    }
+}
index 283e8ea..d2356ea 100644 (file)
@@ -22,21 +22,16 @@ package org.oransc.policyagent.tasks;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.exceptions.ServiceException;
-import org.oransc.policyagent.repository.ImmutablePolicyType;
-import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
 /**
  * Loads information about RealTime-RICs at startup.
  */
@@ -57,78 +52,29 @@ public class StartupService {
     @Autowired
     private A1Client a1Client;
 
-    StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) {
+    @Autowired
+    private Policies policies;
+
+    StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client,
+        Policies policies) {
         this.applicationConfig = appConfig;
         this.rics = rics;
         this.policyTypes = policyTypes;
         this.a1Client = a1Client;
+        this.policies = policies;
     }
 
     /**
      * Reads the configured Rics and performs the service discovery. The result is put into the repository.
      */
     public void startup() {
+        logger.debug("Starting up");
         applicationConfig.initialize();
-        Flux.fromIterable(applicationConfig.getRicConfigs()) //
-            .map(ricConfig -> new Ric(ricConfig)) //
-            .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())) //
-            .flatMap(this::addPolicyTypesForRic) //
-            .flatMap(this::deletePoliciesForRic) //
-            .doOnNext(rics::put) //
-            .subscribe();
-    }
-
-    private Mono<Ric> addPolicyTypesForRic(Ric ric) {
-        a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
-            .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
-            .flatMap((policyTypeId) -> addTypeToRepo(ric, policyTypeId)) //
-            .flatMap(type -> addTypeToRic(ric, type)) //
-            .subscribe(null, cause -> setRicToNotReachable(ric, cause), () -> setRicToActive(ric));
-        return Mono.just(ric);
-    }
-
-    private Mono<PolicyType> addTypeToRepo(Ric ric, String policyTypeId) {
-        if (policyTypes.contains(policyTypeId)) {
-            try {
-                return Mono.just(policyTypes.getType(policyTypeId));
-            } catch (ServiceException e) {
-                return Mono.error(e);
-            }
+        for (RicConfig ricConfig : applicationConfig.getRicConfigs()) {
+            rics.put(new Ric(ricConfig));
         }
-        return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
-            .flatMap(schema -> createPolicyType(policyTypeId, schema));
-    }
-
-    private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
-        PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
-        policyTypes.put(pt);
-        return Mono.just(pt);
-    }
-
-    private Mono<PolicyType> addTypeToRic(Ric ric, PolicyType policyType) {
-        ric.addSupportedPolicyType(policyType);
-        return Mono.just(policyType);
-    }
-
-    private Mono<Ric> deletePoliciesForRic(Ric ric) {
-        if (!Ric.RicState.NOT_REACHABLE.equals(ric.state())) {
-            a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
-                .doOnNext(
-                    policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
-                .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
-                .subscribe(null, cause -> setRicToNotReachable(ric, cause), null);
-        }
-
-        return Mono.just(ric);
-    }
-
-    private void setRicToNotReachable(Ric ric, Throwable t) {
-        ric.setState(Ric.RicState.NOT_REACHABLE);
-        logger.info("Unable to connect to ric {}. Cause: {}", ric.name(), t.getMessage());
-    }
-
-    private void setRicToActive(Ric ric) {
-        ric.setState(RicState.ACTIVE);
+        RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies);
+        recoveryTask.run(rics.getRics()); // recover all Rics
     }
 
 }
index 8a6523c..365d418 100644 (file)
@@ -29,11 +29,13 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
 
 import java.net.URL;
+import java.util.Collection;
 import java.util.List;
 import java.util.Vector;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.configuration.ImmutableRicConfig;
 import org.oransc.policyagent.configuration.RicConfig;
@@ -61,6 +63,7 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 import org.springframework.web.client.RestTemplate;
+import reactor.core.publisher.Mono;
 
 @ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@@ -89,25 +92,81 @@ public class ApplicationTest {
         }
     }
 
+    static class A1ClientMock implements A1Client {
+        private final Policies policies;
+        private final PolicyTypes policyTypes;
+
+        A1ClientMock(Policies policies, PolicyTypes policyTypes) {
+            this.policies = policies;
+            this.policyTypes = policyTypes;
+        }
+
+        @Override
+        public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
+            Vector<String> result = new Vector<>();
+            for (PolicyType p : this.policyTypes.getAll()) {
+                result.add(p.name());
+            }
+            return Mono.just(result);
+        }
+
+        @Override
+        public Mono<String> getPolicyType(String nearRtRicUrl, String policyTypeId) {
+            try {
+                return Mono.just(this.policies.get(policyTypeId).json());
+            } catch (Exception e) {
+                return Mono.error(e);
+            }
+        }
+
+        @Override
+        public Mono<String> putPolicy(String nearRtRicUrl, String policyId, String policyString) {
+            return Mono.just("OK");
+        }
+
+        @Override
+        public Mono<String> deletePolicy(String nearRtRicUrl, String policyId) {
+            return Mono.just("OK");
+        }
+
+        @Override
+        public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
+            return Mono.empty(); // problem is that a recovery will start
+        }
+    }
+
     /**
      * Overrides the BeanFactory.
      */
     @TestConfiguration
     static class TestBeanFactory {
+        private final Rics rics = new Rics();
+        private final Policies policies = new Policies();
+        private final PolicyTypes policyTypes = new PolicyTypes();
 
         @Bean
         public ApplicationConfig getApplicationConfig() {
             return new MockApplicationConfig();
         }
 
+        @Bean
+        A1Client getA1Client() {
+            return new A1ClientMock(this.policies, this.policyTypes);
+        }
+
+        @Bean
+        public Policies getPolicies() {
+            return this.policies;
+        }
+
+        @Bean
+        public PolicyTypes getPolicyTypes() {
+            return this.policyTypes;
+        }
+
         @Bean
         public Rics getRics() {
-            Rics rics = new Rics();
-            rics.put(new Ric(ImmutableRicConfig.builder().name("kista_1").baseUrl("kista_url")
-                .managedElementIds(new Vector<>()).build()));
-            rics.put(new Ric(ImmutableRicConfig.builder().name("ric1").baseUrl("ric_url")
-                .managedElementIds(new Vector<>()).build()));
-            return rics;
+            return this.rics;
         }
     }
 
@@ -146,8 +205,6 @@ public class ApplicationTest {
         assertThat(rsp).isEqualTo("ric1");
     }
 
-    // managedElmentId -> nodeName
-
     @Test
     public void testPutPolicy() throws Exception {
         putService("service1");
@@ -155,6 +212,7 @@ public class ApplicationTest {
         String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1";
         String json = "{}";
         addPolicyType("type1", "ric1");
+        this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE);
 
         this.restTemplate.put(url, json);
 
@@ -234,7 +292,8 @@ public class ApplicationTest {
     public void testDeletePolicy() throws Exception {
         reset();
         String url = baseUrl() + "/policy?instance=id";
-        addPolicy("id", "typeName", "service1", "ric1");
+        Policy policy = addPolicy("id", "typeName", "service1", "ric1");
+        policy.ric().setState(Ric.RicState.ACTIVE);
         assertThat(policies.size()).isEqualTo(1);
 
         this.restTemplate.delete(url);
index 743fa66..dd57710 100644 (file)
@@ -27,12 +27,13 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Files;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Vector;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
@@ -46,12 +47,10 @@ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.web.server.LocalServerPort;
 import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import reactor.core.publisher.Flux;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
 import reactor.core.publisher.Mono;
 
-@RunWith(SpringRunner.class)
+@ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 public class MockPolicyAgent {
 
@@ -80,7 +79,7 @@ public class MockPolicyAgent {
             getPolicies(nearRtRicUrl).put(policyId, policyString);
         }
 
-        public Iterable<String> getPolicyIdentities(String nearRtRicUrl) {
+        public Collection<String> getPolicyIdentities(String nearRtRicUrl) {
             return getPolicies(nearRtRicUrl).keySet();
         }
 
@@ -106,18 +105,18 @@ public class MockPolicyAgent {
         }
 
         @Override
-        public Flux<String> getPolicyTypeIdentities(String nearRtRicUrl) {
+        public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
             Vector<String> result = new Vector<>();
             for (PolicyType p : this.policyTypes.getAll()) {
                 result.add(p.name());
             }
-            return Flux.fromIterable(result);
+            return Mono.just(result);
         }
 
         @Override
-        public Flux<String> getPolicyIdentities(String nearRtRicUrl) {
-            Iterable<String> result = policies.getPolicyIdentities(nearRtRicUrl);
-            return Flux.fromIterable(result);
+        public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
+            Collection<String> result = policies.getPolicyIdentities(nearRtRicUrl);
+            return Mono.just(result);
         }
 
         @Override
index 7b11b9a..f9a93c8 100644 (file)
@@ -36,7 +36,6 @@ import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -74,10 +73,9 @@ public class A1ClientImplTest {
         Mono<String> policyTypeIds = Mono.just(Arrays.toString(new String[] {POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME}));
         when(asyncRestClientMock.get(POLICYTYPES_IDENTITIES_URL)).thenReturn(policyTypeIds);
 
-        Flux<String> policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL);
+        Mono<?> policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL);
         verify(asyncRestClientMock).get(POLICYTYPES_IDENTITIES_URL);
-        StepVerifier.create(policyTypeIdsFlux).expectNext(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME).expectComplete()
-            .verify();
+        StepVerifier.create(policyTypeIdsFlux).expectNextCount(1).expectComplete().verify();
     }
 
     @Test
@@ -85,9 +83,9 @@ public class A1ClientImplTest {
         Mono<String> policyIds = Mono.just(Arrays.toString(new String[] {POLICY_1_ID, POLICY_2_ID}));
         when(asyncRestClientMock.get(POLICIES_IDENTITIES_URL)).thenReturn(policyIds);
 
-        Flux<String> policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL);
+        Mono<?> policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL);
         verify(asyncRestClientMock).get(POLICIES_IDENTITIES_URL);
-        StepVerifier.create(policyIdsFlux).expectNext(POLICY_1_ID, POLICY_2_ID).expectComplete().verify();
+        StepVerifier.create(policyIdsFlux).expectNextCount(1).expectComplete().verify();
     }
 
     @Test
index 195a486..791acee 100644 (file)
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Vector;
 
 import org.junit.jupiter.api.Test;
@@ -42,11 +43,10 @@ import org.oransc.policyagent.repository.ImmutablePolicyType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
-
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
@@ -93,19 +93,23 @@ public class RepositorySupervisionTest {
             .build();
         Policies policies = new Policies();
         policies.put(policy1);
+        PolicyTypes types = new PolicyTypes();
 
-        RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock);
+        RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock, types);
 
-        when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(Flux.just("policyId1", "policyId2"));
+        Mono<Collection<String>> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2"));
+        when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policyIds);
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty());
+        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds);
+        when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema"));
 
         supervisorUnderTest.checkAllRics();
 
-        await().untilAsserted(() -> RicState.ACTIVE.equals(rics.getRic("ric2").state()));
+        await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state()));
+        await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state()));
+        await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state()));
 
-        verify(a1ClientMock).getPolicyIdentities("baseUrl1");
         verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2");
-        verify(a1ClientMock).getPolicyIdentities("baseUrl2");
         verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2");
         verifyNoMoreInteractions(a1ClientMock);
     }
index 5cfabd9..0c254d5 100644 (file)
@@ -32,6 +32,8 @@ import static org.mockito.Mockito.when;
 import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE;
 import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Vector;
 
 import org.junit.jupiter.api.Test;
@@ -44,11 +46,10 @@ import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.configuration.ImmutableRicConfig;
 import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Rics;
-
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
@@ -80,18 +81,18 @@ public class StartupServiceTest {
         ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
         when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
 
-        Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
-        Flux<String> fluxType2 = Flux.just(POLICY_TYPE_2_NAME);
-        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1)
-            .thenReturn(fluxType1.concatWith(fluxType2));
-        Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
+        Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+        Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
+        Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
         when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policies);
         when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
-        StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+        StartupService serviceUnderTest =
+            new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
 
         serviceUnderTest.startup();
 
@@ -117,7 +118,8 @@ public class StartupServiceTest {
             "Not correct no of types supported for ric " + FIRST_RIC_NAME);
         assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME),
             POLICY_TYPE_1_NAME + " not supported by ric " + FIRST_RIC_NAME);
-        assertEquals(1, firstRic.getManagedNodes().size(), "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
+        assertEquals(1, firstRic.getManagedElementIds().size(),
+            "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
         assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME);
 
         Ric secondRic = rics.get(SECOND_RIC_NAME);
@@ -130,7 +132,7 @@ public class StartupServiceTest {
             POLICY_TYPE_1_NAME + " not supported by ric " + SECOND_RIC_NAME);
         assertTrue(secondRic.isSupportingType(POLICY_TYPE_2_NAME),
             POLICY_TYPE_2_NAME + " not supported by ric " + SECOND_RIC_NAME);
-        assertEquals(2, secondRic.getManagedNodes().size(),
+        assertEquals(2, secondRic.getManagedElementIds().size(),
             "Not correct no of managed nodes for ric " + SECOND_RIC_NAME);
         assertTrue(secondRic.isManaging(MANAGED_NODE_B), MANAGED_NODE_B + " not managed by ric " + SECOND_RIC_NAME);
         assertTrue(secondRic.isManaging(MANAGED_NODE_C), MANAGED_NODE_C + " not managed by ric " + SECOND_RIC_NAME);
@@ -143,18 +145,19 @@ public class StartupServiceTest {
         ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
         when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
 
-        Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
-        doReturn(Flux.error(new Exception("Unable to contact ric.")), fluxType1).when(a1ClientMock)
-            .getPolicyTypeIdentities(anyString());
+        Mono<Collection<String>> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+        Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
+        doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString());
 
-        Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
+        Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
         doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString());
         when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
-        StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+        StartupService serviceUnderTest =
+            new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
 
         serviceUnderTest.startup();
 
@@ -173,19 +176,19 @@ public class StartupServiceTest {
         ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
         when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
 
-        Flux<String> fluxType1 = Flux.just(POLICY_TYPE_1_NAME);
-        Flux<String> fluxType2 = Flux.just(POLICY_TYPE_2_NAME);
-        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1)
-            .thenReturn(fluxType1.concatWith(fluxType2));
+        Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+        Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
         when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
-        Flux<String> policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2});
-        doReturn(Flux.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
+        Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
+        doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
             .getPolicyIdentities(anyString());
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
-        StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock);
+        StartupService serviceUnderTest =
+            new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies());
 
         serviceUnderTest.startup();
 
@@ -197,14 +200,19 @@ public class StartupServiceTest {
         assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
     }
 
-    private RicConfig getRicConfig(String name, String baseUrl, String... nodeNames) {
-        Vector<String> managedNodes = new Vector<String>(1);
-        for (String nodeName : nodeNames) {
-            managedNodes.add(nodeName);
+    @SafeVarargs
+    private <T> Vector<T> toVector(T... objs) {
+        Vector<T> result = new Vector<>();
+        for (T o : objs) {
+            result.add(o);
         }
+        return result;
+    }
+
+    private RicConfig getRicConfig(String name, String baseUrl, String... managedElementIds) {
         ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
             .name(name) //
-            .managedElementIds(managedNodes) //
+            .managedElementIds(toVector(managedElementIds)) //
             .baseUrl(baseUrl) //
             .build();
         return ricConfig;