From: Henrik Andersson Date: Tue, 14 Jan 2020 07:33:22 +0000 (+0000) Subject: Merge "Recovery handling" X-Git-Tag: 1.0.1~49 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=637540bc28fbf337e0c4c58c051a6b4f7ceb321d;hp=ffe0c150f08205d73ee362f58f492aeb2703f295;p=nonrtric.git Merge "Recovery handling" --- diff --git a/policy-agent/README.md b/policy-agent/README.md new file mode 100644 index 00000000..be049073 --- /dev/null +++ b/policy-agent/README.md @@ -0,0 +1,34 @@ +# O-RAN-SC NonRT RIC Dashboard Web Application + +The O-RAN NonRT RIC PolicyAgent provides a REST API for management of +policices. It provides support for +-Policy configuration. This includes + -One REST API towards all RICs in the network + -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc. + -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC +-Supervision of clients (R-APPs) to eliminate stray policies in case of failure +-Consistency monitoring of the SMO view of policies and the actual situation in the RICs +-Consistency monitoring of RIC capabilities (policy types) + +The agent can be run stand alone in a simulated test mode. Then it +simulates RICs. +The REST API is published on port 8081 and it is started by command: +mvn -Dtest=MockPolicyAgent test + +The backend server publishes live API documentation at the +URL `http://your-host-name-here:8080/swagger-ui.html` + +## License + +Copyright (C) 2019 Nordix Foundation. All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java index ddb71af4..bc6d7cda 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java @@ -20,14 +20,15 @@ package org.oransc.policyagent.clients; -import reactor.core.publisher.Flux; +import java.util.Collection; + import reactor.core.publisher.Mono; public interface A1Client { - public Flux getPolicyTypeIdentities(String nearRtRicUrl); + public Mono> getPolicyTypeIdentities(String nearRtRicUrl); - public Flux getPolicyIdentities(String nearRtRicUrl); + public Mono> getPolicyIdentities(String nearRtRicUrl); public Mono getPolicyType(String nearRtRicUrl, String policyTypeId); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java index f621566e..b3773b8e 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java @@ -22,6 +22,7 @@ package org.oransc.policyagent.clients; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.json.JSONArray; @@ -29,8 +30,6 @@ import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class A1ClientImpl implements A1Client { @@ -45,19 +44,19 @@ public class A1ClientImpl implements A1Client { } @Override - public Flux getPolicyTypeIdentities(String nearRtRicUrl) { + public Mono> getPolicyTypeIdentities(String nearRtRicUrl) { logger.debug("getPolicyTypeIdentities nearRtRicUrl = {}", nearRtRicUrl); AsyncRestClient client = createClient(nearRtRicUrl); - Mono response = client.get("/policytypes/identities"); - return response.flatMapMany(this::createFlux); + return client.get("/policytypes/identities") // + .flatMap(this::parseJsonArrayOfString); } @Override - public Flux getPolicyIdentities(String nearRtRicUrl) { + public Mono> getPolicyIdentities(String nearRtRicUrl) { logger.debug("getPolicyIdentities nearRtRicUrl = {}", nearRtRicUrl); AsyncRestClient client = createClient(nearRtRicUrl); - Mono response = client.get("/policies/identities"); - return response.flatMapMany(this::createFlux); + return client.get("/policies/identities") // + .flatMap(this::parseJsonArrayOfString); } @Override @@ -84,7 +83,7 @@ public class A1ClientImpl implements A1Client { return client.delete("/policies/" + policyId); } - private Flux createFlux(String inputString) { + private Mono> parseJsonArrayOfString(String inputString) { try { List arrayList = new ArrayList<>(); JSONArray jsonArray = new JSONArray(inputString); @@ -92,9 +91,9 @@ public class A1ClientImpl implements A1Client { arrayList.add(jsonArray.getString(i)); } logger.debug("A1 client: received list = {}", arrayList); - return Flux.fromIterable(arrayList); + return Mono.just(arrayList); } catch (JSONException ex) { // invalid json - return Flux.error(ex); + return Mono.error(ex); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index e41f55eb..00e5223f 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -109,7 +109,7 @@ public class ApplicationConfig { loadConfigurationFromFile(this.filepath); refreshConfigTask = createRefreshTask() // - .subscribe(e -> logger.info("Refreshed configuration data"), + .subscribe(notUsed -> logger.info("Refreshed configuration data"), throwable -> logger.error("Configuration refresh terminated due to exception", throwable), () -> logger.error("Configuration refresh terminated")); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java index e29a4e9c..6d849b1b 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java @@ -31,6 +31,7 @@ import io.swagger.annotations.ApiResponses; import java.util.Collection; import java.util.Vector; +import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.ImmutablePolicy; @@ -49,6 +50,7 @@ import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; @RestController @Api(value = "Policy Management API") @@ -57,15 +59,18 @@ public class PolicyController { private final Rics rics; private final PolicyTypes policyTypes; private final Policies policies; + private final A1Client a1Client; + private static Gson gson = new GsonBuilder() // .serializeNulls() // .create(); // @Autowired - PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics) { + PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics, A1Client a1Client) { this.policyTypes = types; this.policies = policies; this.rics = rics; + this.a1Client = a1Client; } @GetMapping("/policy_schemas") @@ -132,11 +137,48 @@ public class PolicyController { @DeleteMapping("/policy") @ApiOperation(value = "Deletes the policy") @ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted")}) - public ResponseEntity deletePolicy( // - @RequestParam(name = "instance", required = true) String instance) { + public Mono> deletePolicy( // + @RequestParam(name = "instance", required = true) String id) { + Policy policy = policies.get(id); + if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) { + return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) // + .doOnEach(notUsed -> policies.removeId(id)) // + .flatMap(notUsed -> { + return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)); + }); + } else { + return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)); + } + } - policies.removeId(instance); - return new ResponseEntity<>(HttpStatus.NO_CONTENT); + @PutMapping(path = "/policy") + @ApiOperation(value = "Create the policy") + @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")}) + public Mono> putPolicy( // + @RequestParam(name = "type", required = true) String typeName, // + @RequestParam(name = "instance", required = true) String instanceId, // + @RequestParam(name = "ric", required = true) String ricName, // + @RequestParam(name = "service", required = true) String service, // + @RequestBody String jsonBody) { + + Ric ric = rics.get(ricName); + PolicyType type = policyTypes.get(typeName); + if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) { + Policy policy = ImmutablePolicy.builder() // + .id(instanceId) // + .json(jsonBody) // + .type(type) // + .ric(ric) // + .ownerServiceName(service) // + .lastModified(getTimeStampUTC()) // + .build(); + return a1Client.putPolicy(policy.ric().getConfig().baseUrl(), policy.id(), policy.json()) // + .doOnNext(notUsed -> policies.put(policy)) // + .flatMap(notUsed -> { + return Mono.just(new ResponseEntity<>(HttpStatus.CREATED)); + }); + } + return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)); } @GetMapping("/policies") @@ -226,32 +268,4 @@ public class PolicyController { return java.time.Instant.now().toString(); } - @PutMapping(path = "/policy") - @ApiOperation(value = "Create the policy") - @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")}) - public ResponseEntity putPolicy( // - @RequestParam(name = "type", required = true) String type, // - @RequestParam(name = "instance", required = true) String instanceId, // - @RequestParam(name = "ric", required = true) String ric, // - @RequestParam(name = "service", required = true) String service, // - @RequestBody String jsonBody) { - - try { - // services.getService(service).ping(); - Ric ricObj = rics.getRic(ric); - Policy policy = ImmutablePolicy.builder() // - .id(instanceId) // - .json(jsonBody) // - .type(policyTypes.getType(type)) // - .ric(ricObj) // - .ownerServiceName(service) // - .lastModified(getTimeStampUTC()) // - .build(); - policies.put(policy); - return new ResponseEntity(HttpStatus.CREATED); - } catch (ServiceException e) { - return new ResponseEntity(e.getMessage(), HttpStatus.NOT_FOUND); - } - } - } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java index db9a4e5b..cbb205cc 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java @@ -31,7 +31,7 @@ interface RicInfo { public String name(); - public Collection nodeNames(); + public Collection managedElementIds(); public Collection policyTypes(); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java index 0075fb71..797ec719 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java @@ -99,7 +99,7 @@ public class RicRepositoryController { if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) { result.add(ImmutableRicInfo.builder() // .name(ric.name()) // - .nodeNames(ric.getManagedNodes()) // + .managedElementIds(ric.getManagedElementIds()) // .policyTypes(ric.getSupportedPolicyTypeNames()) // .build()); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java index 2a4eb5af..58c91b35 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java @@ -75,6 +75,10 @@ public class Policies { return policiesId.containsKey(id); } + public synchronized Policy get(String id) { + return policiesId.get(id); + } + public synchronized Policy getPolicy(String id) throws ServiceException { Policy p = policiesId.get(id); if (p == null) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java index 0d66e7de..9dee6f91 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java @@ -40,6 +40,10 @@ public class PolicyTypes { return t; } + public synchronized PolicyType get(String name) { + return types.get(name); + } + public synchronized void put(PolicyType type) { types.put(type.name(), type); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java index 98c497f7..82d84f12 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java @@ -65,38 +65,38 @@ public class Ric { * * @return a vector containing the nodes managed by this Ric. */ - public Vector getManagedNodes() { + public Vector getManagedElementIds() { return ricConfig.managedElementIds(); } /** * Determines if the given node is managed by this Ric. * - * @param nodeName the node name to check. + * @param managedElementId the node name to check. * @return true if the given node is managed by this Ric. */ - public boolean isManaging(String nodeName) { - return ricConfig.managedElementIds().contains(nodeName); + public boolean isManaging(String managedElementId) { + return ricConfig.managedElementIds().contains(managedElementId); } /** * Adds the given node as managed by this Ric. * - * @param nodeName the node to add. + * @param managedElementId the node to add. */ - public void addManagedNode(String nodeName) { - if (!ricConfig.managedElementIds().contains(nodeName)) { - ricConfig.managedElementIds().add(nodeName); + public void addManagedElement(String managedElementId) { + if (!ricConfig.managedElementIds().contains(managedElementId)) { + ricConfig.managedElementIds().add(managedElementId); } } /** * Removes the given node as managed by this Ric. * - * @param nodeName the node to remove. + * @param managedElementId the node to remove. */ - public void removeManagedNode(String nodeName) { - ricConfig.managedElementIds().remove(nodeName); + public void removeManagedElement(String managedElementId) { + ricConfig.managedElementIds().remove(managedElementId); } /** @@ -122,23 +122,10 @@ public class Ric { } /** - * Adds policy types as supported by this Ric. - * - * @param types the policy types to support. - */ - public void addSupportedPolicyTypes(Collection types) { - for (PolicyType type : types) { - addSupportedPolicyType(type); - } - } - - /** - * Removes a policy type as supported by this Ric. - * - * @param type the policy type to remove as supported by this Ric. + * Removes all policy type as supported by this Ric. */ - public void removeSupportedPolicyType(PolicyType type) { - supportedPolicyTypes.remove(type.name()); + public void clearSupportedPolicyTypes() { + supportedPolicyTypes.clear(); } /** @@ -173,6 +160,6 @@ public class Ric { /** * The Ric cannot be contacted. */ - NOT_REACHABLE + NOT_REACHABLE, RECOVERING } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java index 153d193b..6b8138fc 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java @@ -32,7 +32,7 @@ import org.oransc.policyagent.exceptions.ServiceException; public class Rics { Map rics = new HashMap<>(); - public void put(Ric ric) { + public synchronized void put(Ric ric) { rics.put(ric.name(), ric); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java index 512d065b..81ef7ff9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java @@ -35,11 +35,11 @@ public class Service { ping(); } - public String getName() { + public synchronized String getName() { return this.name; } - public Duration getKeepAliveInterval() { + public synchronized Duration getKeepAliveInterval() { return this.keepAliveInterval; } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index 7984a62a..9ac9d705 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -20,10 +20,12 @@ package org.oransc.policyagent.tasks; +import java.util.Collection; + import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; -import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,13 +47,15 @@ public class RepositorySupervision { private final Rics rics; private final Policies policies; + private final PolicyTypes policyTypes; private final A1Client a1Client; @Autowired - public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client) { + public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes) { this.rics = rics; this.policies = policies; this.a1Client = a1Client; + this.policyTypes = policyTypes; } /** @@ -66,45 +70,59 @@ public class RepositorySupervision { private Flux createTask() { return Flux.fromIterable(rics.getRics()) // - .groupBy(ric -> ric.state()) // - .flatMap(fluxGroup -> handleGroup(fluxGroup.key(), fluxGroup)); + .flatMap(ric -> checkInstances(ric)) // + .flatMap(ric -> checkTypes(ric)); } - private Flux handleGroup(Ric.RicState key, Flux fluxGroup) { - logger.debug("Handling group {}", key); - switch (key) { - case ACTIVE: - return fluxGroup.flatMap(this::checkActive); + private Mono checkInstances(Ric ric) { - case NOT_REACHABLE: - return fluxGroup.flatMap(this::checkNotReachable); + return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // + .onErrorResume(t -> Mono.empty()) // + .flatMap(ricP -> validateInstances(ricP, ric)); + } - default: - // If not initiated, leave it to the StartupService. - return Flux.empty(); - } + private Flux junk() { + return Flux.empty(); } - private Mono checkActive(Ric ric) { - logger.debug("Handling active ric {}", ric); - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .filter(policyId -> !policies.containsPolicy(policyId)) // - .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId)) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(); + private Mono validateInstances(Collection ricPolicies, Ric ric) { + if (ricPolicies.size() != policies.getForRic(ric.name()).size()) { + return startRecovery(ric); + } + for (String policyId : ricPolicies) { + if (!policies.containsPolicy(policyId)) { + return startRecovery(ric); + } + } return Mono.just(ric); } - private Mono checkNotReachable(Ric ric) { - logger.debug("Handling not reachable ric {}", ric); - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .filter(policyId -> !policies.containsPolicy(policyId)) // - .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId)) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(null, null, () -> ric.setState(RicState.ACTIVE)); + private Mono checkTypes(Ric ric) { + return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + .onErrorResume(t -> { + return Mono.empty(); + }) // + .flatMap(ricTypes -> validateTypes(ricTypes, ric)); + } + + private Mono validateTypes(Collection ricTypes, Ric ric) { + if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) { + return startRecovery(ric); + } + for (String typeName : ricTypes) { + if (!ric.isSupportingType(typeName)) { + return startRecovery(ric); + } + } return Mono.just(ric); } + private Mono startRecovery(Ric ric) { + RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies); + recovery.run(ric); + return Mono.empty(); + } + private void onRicChecked(Ric ric) { logger.info("Ric: " + ric.name() + " checked"); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java new file mode 100644 index 00000000..3883585b --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java @@ -0,0 +1,130 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.tasks; + +import java.util.Collection; +import java.util.Vector; + +import org.oransc.policyagent.clients.A1Client; +import org.oransc.policyagent.exceptions.ServiceException; +import org.oransc.policyagent.repository.ImmutablePolicyType; +import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.Policy; +import org.oransc.policyagent.repository.PolicyType; +import org.oransc.policyagent.repository.PolicyTypes; +import org.oransc.policyagent.repository.Ric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Loads information about RealTime-RICs at startup. + */ +public class RicRecoveryTask { + + private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class); + + private final A1Client a1Client; + private final PolicyTypes policyTypes; + private final Policies policies; + + public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) { + this.a1Client = a1Client; + this.policyTypes = policyTypes; + this.policies = policies; + } + + public void run(Collection rics) { + for (Ric ric : rics) { + run(ric); + } + } + + public void run(Ric ric) { + logger.debug("Handling ric: {}", ric.getConfig().name()); + + synchronized (ric) { + if (ric.state().equals(Ric.RicState.RECOVERING)) { + return; // Already running + } + ric.setState(Ric.RicState.RECOVERING); + } + Flux recoveredTypes = recoverPolicyTypes(ric); + Flux deletedPolicies = deletePolicies(ric); + + Flux.merge(recoveredTypes, deletedPolicies) // + .subscribe(x -> logger.debug("Recover: " + x), // + throwable -> onError(ric, throwable), // + () -> onComplete(ric)); + } + + private void onComplete(Ric ric) { + logger.debug("Recovery completed for:" + ric.name()); + ric.setState(Ric.RicState.ACTIVE); + + } + + private void onError(Ric ric, Throwable t) { + logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); + ric.setState(Ric.RicState.NOT_REACHABLE); + } + + private Flux recoverPolicyTypes(Ric ric) { + ric.clearSupportedPolicyTypes(); + return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + .flatMapMany(types -> Flux.fromIterable(types)) // + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) + .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) // + .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); // + } + + private Mono getPolicyType(Ric ric, String policyTypeId) { + if (policyTypes.contains(policyTypeId)) { + try { + return Mono.just(policyTypes.getType(policyTypeId)); + } catch (ServiceException e) { + return Mono.error(e); + } + } + return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) // + .flatMap(schema -> createPolicyType(policyTypeId, schema)); + } + + private Mono createPolicyType(String policyTypeId, String schema) { + PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build(); + policyTypes.put(pt); + return Mono.just(pt); + } + + private Flux deletePolicies(Ric ric) { + Collection ricPolicies = new Vector<>(policies.getForRic(ric.name())); + for (Policy policy : ricPolicies) { + this.policies.remove(policy); + } + + return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // + .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) // + .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) + .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); // + } +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java index 283e8ea9..d2356ea9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java @@ -22,21 +22,16 @@ package org.oransc.policyagent.tasks; import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.exceptions.ServiceException; -import org.oransc.policyagent.repository.ImmutablePolicyType; -import org.oransc.policyagent.repository.PolicyType; +import org.oransc.policyagent.configuration.RicConfig; +import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; -import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - /** * Loads information about RealTime-RICs at startup. */ @@ -57,78 +52,29 @@ public class StartupService { @Autowired private A1Client a1Client; - StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) { + @Autowired + private Policies policies; + + StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client, + Policies policies) { this.applicationConfig = appConfig; this.rics = rics; this.policyTypes = policyTypes; this.a1Client = a1Client; + this.policies = policies; } /** * Reads the configured Rics and performs the service discovery. The result is put into the repository. */ public void startup() { + logger.debug("Starting up"); applicationConfig.initialize(); - Flux.fromIterable(applicationConfig.getRicConfigs()) // - .map(ricConfig -> new Ric(ricConfig)) // - .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())) // - .flatMap(this::addPolicyTypesForRic) // - .flatMap(this::deletePoliciesForRic) // - .doOnNext(rics::put) // - .subscribe(); - } - - private Mono addPolicyTypesForRic(Ric ric) { - a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) - .flatMap((policyTypeId) -> addTypeToRepo(ric, policyTypeId)) // - .flatMap(type -> addTypeToRic(ric, type)) // - .subscribe(null, cause -> setRicToNotReachable(ric, cause), () -> setRicToActive(ric)); - return Mono.just(ric); - } - - private Mono addTypeToRepo(Ric ric, String policyTypeId) { - if (policyTypes.contains(policyTypeId)) { - try { - return Mono.just(policyTypes.getType(policyTypeId)); - } catch (ServiceException e) { - return Mono.error(e); - } + for (RicConfig ricConfig : applicationConfig.getRicConfigs()) { + rics.put(new Ric(ricConfig)); } - return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) // - .flatMap(schema -> createPolicyType(policyTypeId, schema)); - } - - private Mono createPolicyType(String policyTypeId, String schema) { - PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build(); - policyTypes.put(pt); - return Mono.just(pt); - } - - private Mono addTypeToRic(Ric ric, PolicyType policyType) { - ric.addSupportedPolicyType(policyType); - return Mono.just(policyType); - } - - private Mono deletePoliciesForRic(Ric ric) { - if (!Ric.RicState.NOT_REACHABLE.equals(ric.state())) { - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .doOnNext( - policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(null, cause -> setRicToNotReachable(ric, cause), null); - } - - return Mono.just(ric); - } - - private void setRicToNotReachable(Ric ric, Throwable t) { - ric.setState(Ric.RicState.NOT_REACHABLE); - logger.info("Unable to connect to ric {}. Cause: {}", ric.name(), t.getMessage()); - } - - private void setRicToActive(Ric ric) { - ric.setState(RicState.ACTIVE); + RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies); + recoveryTask.run(rics.getRics()); // recover all Rics } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index 8a6523c4..365d418a 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -29,11 +29,13 @@ import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import java.net.URL; +import java.util.Collection; import java.util.List; import java.util.Vector; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.configuration.ImmutableRicConfig; import org.oransc.policyagent.configuration.RicConfig; @@ -61,6 +63,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.web.client.RestTemplate; +import reactor.core.publisher.Mono; @ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) @@ -89,25 +92,81 @@ public class ApplicationTest { } } + static class A1ClientMock implements A1Client { + private final Policies policies; + private final PolicyTypes policyTypes; + + A1ClientMock(Policies policies, PolicyTypes policyTypes) { + this.policies = policies; + this.policyTypes = policyTypes; + } + + @Override + public Mono> getPolicyTypeIdentities(String nearRtRicUrl) { + Vector result = new Vector<>(); + for (PolicyType p : this.policyTypes.getAll()) { + result.add(p.name()); + } + return Mono.just(result); + } + + @Override + public Mono getPolicyType(String nearRtRicUrl, String policyTypeId) { + try { + return Mono.just(this.policies.get(policyTypeId).json()); + } catch (Exception e) { + return Mono.error(e); + } + } + + @Override + public Mono putPolicy(String nearRtRicUrl, String policyId, String policyString) { + return Mono.just("OK"); + } + + @Override + public Mono deletePolicy(String nearRtRicUrl, String policyId) { + return Mono.just("OK"); + } + + @Override + public Mono> getPolicyIdentities(String nearRtRicUrl) { + return Mono.empty(); // problem is that a recovery will start + } + } + /** * Overrides the BeanFactory. */ @TestConfiguration static class TestBeanFactory { + private final Rics rics = new Rics(); + private final Policies policies = new Policies(); + private final PolicyTypes policyTypes = new PolicyTypes(); @Bean public ApplicationConfig getApplicationConfig() { return new MockApplicationConfig(); } + @Bean + A1Client getA1Client() { + return new A1ClientMock(this.policies, this.policyTypes); + } + + @Bean + public Policies getPolicies() { + return this.policies; + } + + @Bean + public PolicyTypes getPolicyTypes() { + return this.policyTypes; + } + @Bean public Rics getRics() { - Rics rics = new Rics(); - rics.put(new Ric(ImmutableRicConfig.builder().name("kista_1").baseUrl("kista_url") - .managedElementIds(new Vector<>()).build())); - rics.put(new Ric(ImmutableRicConfig.builder().name("ric1").baseUrl("ric_url") - .managedElementIds(new Vector<>()).build())); - return rics; + return this.rics; } } @@ -146,8 +205,6 @@ public class ApplicationTest { assertThat(rsp).isEqualTo("ric1"); } - // managedElmentId -> nodeName - @Test public void testPutPolicy() throws Exception { putService("service1"); @@ -155,6 +212,7 @@ public class ApplicationTest { String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1"; String json = "{}"; addPolicyType("type1", "ric1"); + this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE); this.restTemplate.put(url, json); @@ -234,7 +292,8 @@ public class ApplicationTest { public void testDeletePolicy() throws Exception { reset(); String url = baseUrl() + "/policy?instance=id"; - addPolicy("id", "typeName", "service1", "ric1"); + Policy policy = addPolicy("id", "typeName", "service1", "ric1"); + policy.ric().setState(Ric.RicState.ACTIVE); assertThat(policies.size()).isEqualTo(1); this.restTemplate.delete(url); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index 743fa665..dd57710c 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -27,12 +27,13 @@ import java.io.File; import java.io.IOException; import java.net.URL; import java.nio.file.Files; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Vector; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.repository.ImmutablePolicyType; @@ -46,12 +47,10 @@ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.web.server.LocalServerPort; import org.springframework.context.annotation.Bean; -import org.springframework.test.context.junit4.SpringRunner; - -import reactor.core.publisher.Flux; +import org.springframework.test.context.junit.jupiter.SpringExtension; import reactor.core.publisher.Mono; -@RunWith(SpringRunner.class) +@ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) public class MockPolicyAgent { @@ -80,7 +79,7 @@ public class MockPolicyAgent { getPolicies(nearRtRicUrl).put(policyId, policyString); } - public Iterable getPolicyIdentities(String nearRtRicUrl) { + public Collection getPolicyIdentities(String nearRtRicUrl) { return getPolicies(nearRtRicUrl).keySet(); } @@ -106,18 +105,18 @@ public class MockPolicyAgent { } @Override - public Flux getPolicyTypeIdentities(String nearRtRicUrl) { + public Mono> getPolicyTypeIdentities(String nearRtRicUrl) { Vector result = new Vector<>(); for (PolicyType p : this.policyTypes.getAll()) { result.add(p.name()); } - return Flux.fromIterable(result); + return Mono.just(result); } @Override - public Flux getPolicyIdentities(String nearRtRicUrl) { - Iterable result = policies.getPolicyIdentities(nearRtRicUrl); - return Flux.fromIterable(result); + public Mono> getPolicyIdentities(String nearRtRicUrl) { + Collection result = policies.getPolicyIdentities(nearRtRicUrl); + return Mono.just(result); } @Override diff --git a/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java b/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java index 7b11b9ac..f9a93c8c 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java @@ -36,7 +36,6 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -74,10 +73,9 @@ public class A1ClientImplTest { Mono policyTypeIds = Mono.just(Arrays.toString(new String[] {POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME})); when(asyncRestClientMock.get(POLICYTYPES_IDENTITIES_URL)).thenReturn(policyTypeIds); - Flux policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL); + Mono policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL); verify(asyncRestClientMock).get(POLICYTYPES_IDENTITIES_URL); - StepVerifier.create(policyTypeIdsFlux).expectNext(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME).expectComplete() - .verify(); + StepVerifier.create(policyTypeIdsFlux).expectNextCount(1).expectComplete().verify(); } @Test @@ -85,9 +83,9 @@ public class A1ClientImplTest { Mono policyIds = Mono.just(Arrays.toString(new String[] {POLICY_1_ID, POLICY_2_ID})); when(asyncRestClientMock.get(POLICIES_IDENTITIES_URL)).thenReturn(policyIds); - Flux policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL); + Mono policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL); verify(asyncRestClientMock).get(POLICIES_IDENTITIES_URL); - StepVerifier.create(policyIdsFlux).expectNext(POLICY_1_ID, POLICY_2_ID).expectComplete().verify(); + StepVerifier.create(policyIdsFlux).expectNextCount(1).expectComplete().verify(); } @Test diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java index 195a4869..791acee2 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.util.Arrays; +import java.util.Collection; import java.util.Vector; import org.junit.jupiter.api.Test; @@ -42,11 +43,10 @@ import org.oransc.policyagent.repository.ImmutablePolicyType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; +import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; - -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) @@ -93,19 +93,23 @@ public class RepositorySupervisionTest { .build(); Policies policies = new Policies(); policies.put(policy1); + PolicyTypes types = new PolicyTypes(); - RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock); + RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock, types); - when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(Flux.just("policyId1", "policyId2")); + Mono> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2")); + when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policyIds); when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty()); + when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds); + when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema")); supervisorUnderTest.checkAllRics(); - await().untilAsserted(() -> RicState.ACTIVE.equals(rics.getRic("ric2").state())); + await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state())); + await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state())); + await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state())); - verify(a1ClientMock).getPolicyIdentities("baseUrl1"); verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2"); - verify(a1ClientMock).getPolicyIdentities("baseUrl2"); verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2"); verifyNoMoreInteractions(a1ClientMock); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java index 5cfabd9c..0c254d59 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java @@ -32,6 +32,8 @@ import static org.mockito.Mockito.when; import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE; import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE; +import java.util.Arrays; +import java.util.Collection; import java.util.Vector; import org.junit.jupiter.api.Test; @@ -44,11 +46,10 @@ import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.configuration.ImmutableRicConfig; import org.oransc.policyagent.configuration.RicConfig; +import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Rics; - -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) @@ -80,18 +81,18 @@ public class StartupServiceTest { ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); - Flux fluxType1 = Flux.just(POLICY_TYPE_1_NAME); - Flux fluxType2 = Flux.just(POLICY_TYPE_2_NAME); - when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1) - .thenReturn(fluxType1.concatWith(fluxType2)); - Flux policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2}); + Mono> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); + Mono> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME)); + when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2); + Mono> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2)); when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policies); when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema")); when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK")); Rics rics = new Rics(); PolicyTypes policyTypes = new PolicyTypes(); - StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock); + StartupService serviceUnderTest = + new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies()); serviceUnderTest.startup(); @@ -117,7 +118,8 @@ public class StartupServiceTest { "Not correct no of types supported for ric " + FIRST_RIC_NAME); assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME), POLICY_TYPE_1_NAME + " not supported by ric " + FIRST_RIC_NAME); - assertEquals(1, firstRic.getManagedNodes().size(), "Not correct no of managed nodes for ric " + FIRST_RIC_NAME); + assertEquals(1, firstRic.getManagedElementIds().size(), + "Not correct no of managed nodes for ric " + FIRST_RIC_NAME); assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME); Ric secondRic = rics.get(SECOND_RIC_NAME); @@ -130,7 +132,7 @@ public class StartupServiceTest { POLICY_TYPE_1_NAME + " not supported by ric " + SECOND_RIC_NAME); assertTrue(secondRic.isSupportingType(POLICY_TYPE_2_NAME), POLICY_TYPE_2_NAME + " not supported by ric " + SECOND_RIC_NAME); - assertEquals(2, secondRic.getManagedNodes().size(), + assertEquals(2, secondRic.getManagedElementIds().size(), "Not correct no of managed nodes for ric " + SECOND_RIC_NAME); assertTrue(secondRic.isManaging(MANAGED_NODE_B), MANAGED_NODE_B + " not managed by ric " + SECOND_RIC_NAME); assertTrue(secondRic.isManaging(MANAGED_NODE_C), MANAGED_NODE_C + " not managed by ric " + SECOND_RIC_NAME); @@ -143,18 +145,19 @@ public class StartupServiceTest { ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); - Flux fluxType1 = Flux.just(POLICY_TYPE_1_NAME); - doReturn(Flux.error(new Exception("Unable to contact ric.")), fluxType1).when(a1ClientMock) - .getPolicyTypeIdentities(anyString()); + Mono> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); + Mono error = Mono.error(new Exception("Unable to contact ric.")); + doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString()); - Flux policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2}); + Mono> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2)); doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString()); when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema")); when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK")); Rics rics = new Rics(); PolicyTypes policyTypes = new PolicyTypes(); - StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock); + StartupService serviceUnderTest = + new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies()); serviceUnderTest.startup(); @@ -173,19 +176,19 @@ public class StartupServiceTest { ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); - Flux fluxType1 = Flux.just(POLICY_TYPE_1_NAME); - Flux fluxType2 = Flux.just(POLICY_TYPE_2_NAME); - when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1) - .thenReturn(fluxType1.concatWith(fluxType2)); + Mono> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); + Mono> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME)); + when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2); when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema")); - Flux policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2}); - doReturn(Flux.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock) + Mono> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2)); + doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock) .getPolicyIdentities(anyString()); when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK")); Rics rics = new Rics(); PolicyTypes policyTypes = new PolicyTypes(); - StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock); + StartupService serviceUnderTest = + new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies()); serviceUnderTest.startup(); @@ -197,14 +200,19 @@ public class StartupServiceTest { assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME); } - private RicConfig getRicConfig(String name, String baseUrl, String... nodeNames) { - Vector managedNodes = new Vector(1); - for (String nodeName : nodeNames) { - managedNodes.add(nodeName); + @SafeVarargs + private Vector toVector(T... objs) { + Vector result = new Vector<>(); + for (T o : objs) { + result.add(o); } + return result; + } + + private RicConfig getRicConfig(String name, String baseUrl, String... managedElementIds) { ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() // .name(name) // - .managedElementIds(managedNodes) // + .managedElementIds(toVector(managedElementIds)) // .baseUrl(baseUrl) // .build(); return ricConfig;