From b47a7130c10bef2bf812366ca971e4eaa938b152 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 10 Jan 2020 07:49:31 +0100 Subject: [PATCH] Recovery handling Supervision of NearRT RIC policy types and instances. When there is a difference between the view in the NonRT RIC and the NearRT RIC, all policies will be deleted and the types will be refreshed. Introduced that policies will be created and deleted in the NonRT RIC when created/deleted from the agent NBI. Added Mock mode, run by command: mvn -Dtest=MockPolicyAgent test Issue-ID: NONRTRIC-84 Change-Id: Ica621c990f352cd69efa0dca51451d5f3c755b68 Signed-off-by: PatrikBuhr --- policy-agent/README.md | 34 ++++++ .../org/oransc/policyagent/clients/A1Client.java | 7 +- .../oransc/policyagent/clients/A1ClientImpl.java | 21 ++-- .../configuration/ApplicationConfig.java | 2 +- .../policyagent/controllers/PolicyController.java | 80 +++++++------ .../oransc/policyagent/controllers/RicInfo.java | 2 +- .../controllers/RicRepositoryController.java | 2 +- .../oransc/policyagent/repository/Policies.java | 4 + .../oransc/policyagent/repository/PolicyTypes.java | 4 + .../org/oransc/policyagent/repository/Ric.java | 43 +++---- .../org/oransc/policyagent/repository/Rics.java | 2 +- .../org/oransc/policyagent/repository/Service.java | 4 +- .../policyagent/tasks/RepositorySupervision.java | 76 +++++++----- .../oransc/policyagent/tasks/RicRecoveryTask.java | 130 +++++++++++++++++++++ .../oransc/policyagent/tasks/StartupService.java | 80 +++---------- .../org/oransc/policyagent/ApplicationTest.java | 77 ++++++++++-- .../org/oransc/policyagent/MockPolicyAgent.java | 23 ++-- .../policyagent/clients/A1ClientImplTest.java | 10 +- .../tasks/RepositorySupervisionTest.java | 18 +-- .../policyagent/tasks/StartupServiceTest.java | 62 +++++----- 20 files changed, 443 insertions(+), 238 deletions(-) create mode 100644 policy-agent/README.md create mode 100644 policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java diff --git a/policy-agent/README.md b/policy-agent/README.md new file mode 100644 index 00000000..be049073 --- /dev/null +++ b/policy-agent/README.md @@ -0,0 +1,34 @@ +# O-RAN-SC NonRT RIC Dashboard Web Application + +The O-RAN NonRT RIC PolicyAgent provides a REST API for management of +policices. It provides support for +-Policy configuration. This includes + -One REST API towards all RICs in the network + -Query functions that can find all policies in a RIC, all policies owned by a service (R-APP), all policies of a type etc. + -Maps O1 resources (ManagedElement) as defined in O1 to the controlling RIC +-Supervision of clients (R-APPs) to eliminate stray policies in case of failure +-Consistency monitoring of the SMO view of policies and the actual situation in the RICs +-Consistency monitoring of RIC capabilities (policy types) + +The agent can be run stand alone in a simulated test mode. Then it +simulates RICs. +The REST API is published on port 8081 and it is started by command: +mvn -Dtest=MockPolicyAgent test + +The backend server publishes live API documentation at the +URL `http://your-host-name-here:8080/swagger-ui.html` + +## License + +Copyright (C) 2019 Nordix Foundation. All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java index ddb71af4..bc6d7cda 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java @@ -20,14 +20,15 @@ package org.oransc.policyagent.clients; -import reactor.core.publisher.Flux; +import java.util.Collection; + import reactor.core.publisher.Mono; public interface A1Client { - public Flux getPolicyTypeIdentities(String nearRtRicUrl); + public Mono> getPolicyTypeIdentities(String nearRtRicUrl); - public Flux getPolicyIdentities(String nearRtRicUrl); + public Mono> getPolicyIdentities(String nearRtRicUrl); public Mono getPolicyType(String nearRtRicUrl, String policyTypeId); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java index f621566e..b3773b8e 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientImpl.java @@ -22,6 +22,7 @@ package org.oransc.policyagent.clients; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.json.JSONArray; @@ -29,8 +30,6 @@ import org.json.JSONException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class A1ClientImpl implements A1Client { @@ -45,19 +44,19 @@ public class A1ClientImpl implements A1Client { } @Override - public Flux getPolicyTypeIdentities(String nearRtRicUrl) { + public Mono> getPolicyTypeIdentities(String nearRtRicUrl) { logger.debug("getPolicyTypeIdentities nearRtRicUrl = {}", nearRtRicUrl); AsyncRestClient client = createClient(nearRtRicUrl); - Mono response = client.get("/policytypes/identities"); - return response.flatMapMany(this::createFlux); + return client.get("/policytypes/identities") // + .flatMap(this::parseJsonArrayOfString); } @Override - public Flux getPolicyIdentities(String nearRtRicUrl) { + public Mono> getPolicyIdentities(String nearRtRicUrl) { logger.debug("getPolicyIdentities nearRtRicUrl = {}", nearRtRicUrl); AsyncRestClient client = createClient(nearRtRicUrl); - Mono response = client.get("/policies/identities"); - return response.flatMapMany(this::createFlux); + return client.get("/policies/identities") // + .flatMap(this::parseJsonArrayOfString); } @Override @@ -84,7 +83,7 @@ public class A1ClientImpl implements A1Client { return client.delete("/policies/" + policyId); } - private Flux createFlux(String inputString) { + private Mono> parseJsonArrayOfString(String inputString) { try { List arrayList = new ArrayList<>(); JSONArray jsonArray = new JSONArray(inputString); @@ -92,9 +91,9 @@ public class A1ClientImpl implements A1Client { arrayList.add(jsonArray.getString(i)); } logger.debug("A1 client: received list = {}", arrayList); - return Flux.fromIterable(arrayList); + return Mono.just(arrayList); } catch (JSONException ex) { // invalid json - return Flux.error(ex); + return Mono.error(ex); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index e41f55eb..00e5223f 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -109,7 +109,7 @@ public class ApplicationConfig { loadConfigurationFromFile(this.filepath); refreshConfigTask = createRefreshTask() // - .subscribe(e -> logger.info("Refreshed configuration data"), + .subscribe(notUsed -> logger.info("Refreshed configuration data"), throwable -> logger.error("Configuration refresh terminated due to exception", throwable), () -> logger.error("Configuration refresh terminated")); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java index e29a4e9c..6d849b1b 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java @@ -31,6 +31,7 @@ import io.swagger.annotations.ApiResponses; import java.util.Collection; import java.util.Vector; +import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.ImmutablePolicy; @@ -49,6 +50,7 @@ import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; @RestController @Api(value = "Policy Management API") @@ -57,15 +59,18 @@ public class PolicyController { private final Rics rics; private final PolicyTypes policyTypes; private final Policies policies; + private final A1Client a1Client; + private static Gson gson = new GsonBuilder() // .serializeNulls() // .create(); // @Autowired - PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics) { + PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics, A1Client a1Client) { this.policyTypes = types; this.policies = policies; this.rics = rics; + this.a1Client = a1Client; } @GetMapping("/policy_schemas") @@ -132,11 +137,48 @@ public class PolicyController { @DeleteMapping("/policy") @ApiOperation(value = "Deletes the policy") @ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted")}) - public ResponseEntity deletePolicy( // - @RequestParam(name = "instance", required = true) String instance) { + public Mono> deletePolicy( // + @RequestParam(name = "instance", required = true) String id) { + Policy policy = policies.get(id); + if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) { + return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) // + .doOnEach(notUsed -> policies.removeId(id)) // + .flatMap(notUsed -> { + return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)); + }); + } else { + return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)); + } + } - policies.removeId(instance); - return new ResponseEntity<>(HttpStatus.NO_CONTENT); + @PutMapping(path = "/policy") + @ApiOperation(value = "Create the policy") + @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")}) + public Mono> putPolicy( // + @RequestParam(name = "type", required = true) String typeName, // + @RequestParam(name = "instance", required = true) String instanceId, // + @RequestParam(name = "ric", required = true) String ricName, // + @RequestParam(name = "service", required = true) String service, // + @RequestBody String jsonBody) { + + Ric ric = rics.get(ricName); + PolicyType type = policyTypes.get(typeName); + if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) { + Policy policy = ImmutablePolicy.builder() // + .id(instanceId) // + .json(jsonBody) // + .type(type) // + .ric(ric) // + .ownerServiceName(service) // + .lastModified(getTimeStampUTC()) // + .build(); + return a1Client.putPolicy(policy.ric().getConfig().baseUrl(), policy.id(), policy.json()) // + .doOnNext(notUsed -> policies.put(policy)) // + .flatMap(notUsed -> { + return Mono.just(new ResponseEntity<>(HttpStatus.CREATED)); + }); + } + return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)); } @GetMapping("/policies") @@ -226,32 +268,4 @@ public class PolicyController { return java.time.Instant.now().toString(); } - @PutMapping(path = "/policy") - @ApiOperation(value = "Create the policy") - @ApiResponses(value = {@ApiResponse(code = 201, message = "Policy created")}) - public ResponseEntity putPolicy( // - @RequestParam(name = "type", required = true) String type, // - @RequestParam(name = "instance", required = true) String instanceId, // - @RequestParam(name = "ric", required = true) String ric, // - @RequestParam(name = "service", required = true) String service, // - @RequestBody String jsonBody) { - - try { - // services.getService(service).ping(); - Ric ricObj = rics.getRic(ric); - Policy policy = ImmutablePolicy.builder() // - .id(instanceId) // - .json(jsonBody) // - .type(policyTypes.getType(type)) // - .ric(ricObj) // - .ownerServiceName(service) // - .lastModified(getTimeStampUTC()) // - .build(); - policies.put(policy); - return new ResponseEntity(HttpStatus.CREATED); - } catch (ServiceException e) { - return new ResponseEntity(e.getMessage(), HttpStatus.NOT_FOUND); - } - } - } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java index db9a4e5b..cbb205cc 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicInfo.java @@ -31,7 +31,7 @@ interface RicInfo { public String name(); - public Collection nodeNames(); + public Collection managedElementIds(); public Collection policyTypes(); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java index 0075fb71..797ec719 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java @@ -99,7 +99,7 @@ public class RicRepositoryController { if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) { result.add(ImmutableRicInfo.builder() // .name(ric.name()) // - .nodeNames(ric.getManagedNodes()) // + .managedElementIds(ric.getManagedElementIds()) // .policyTypes(ric.getSupportedPolicyTypeNames()) // .build()); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java index 2a4eb5af..58c91b35 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java @@ -75,6 +75,10 @@ public class Policies { return policiesId.containsKey(id); } + public synchronized Policy get(String id) { + return policiesId.get(id); + } + public synchronized Policy getPolicy(String id) throws ServiceException { Policy p = policiesId.get(id); if (p == null) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java index 0d66e7de..9dee6f91 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java @@ -40,6 +40,10 @@ public class PolicyTypes { return t; } + public synchronized PolicyType get(String name) { + return types.get(name); + } + public synchronized void put(PolicyType type) { types.put(type.name(), type); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java index 98c497f7..82d84f12 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java @@ -65,38 +65,38 @@ public class Ric { * * @return a vector containing the nodes managed by this Ric. */ - public Vector getManagedNodes() { + public Vector getManagedElementIds() { return ricConfig.managedElementIds(); } /** * Determines if the given node is managed by this Ric. * - * @param nodeName the node name to check. + * @param managedElementId the node name to check. * @return true if the given node is managed by this Ric. */ - public boolean isManaging(String nodeName) { - return ricConfig.managedElementIds().contains(nodeName); + public boolean isManaging(String managedElementId) { + return ricConfig.managedElementIds().contains(managedElementId); } /** * Adds the given node as managed by this Ric. * - * @param nodeName the node to add. + * @param managedElementId the node to add. */ - public void addManagedNode(String nodeName) { - if (!ricConfig.managedElementIds().contains(nodeName)) { - ricConfig.managedElementIds().add(nodeName); + public void addManagedElement(String managedElementId) { + if (!ricConfig.managedElementIds().contains(managedElementId)) { + ricConfig.managedElementIds().add(managedElementId); } } /** * Removes the given node as managed by this Ric. * - * @param nodeName the node to remove. + * @param managedElementId the node to remove. */ - public void removeManagedNode(String nodeName) { - ricConfig.managedElementIds().remove(nodeName); + public void removeManagedElement(String managedElementId) { + ricConfig.managedElementIds().remove(managedElementId); } /** @@ -122,23 +122,10 @@ public class Ric { } /** - * Adds policy types as supported by this Ric. - * - * @param types the policy types to support. - */ - public void addSupportedPolicyTypes(Collection types) { - for (PolicyType type : types) { - addSupportedPolicyType(type); - } - } - - /** - * Removes a policy type as supported by this Ric. - * - * @param type the policy type to remove as supported by this Ric. + * Removes all policy type as supported by this Ric. */ - public void removeSupportedPolicyType(PolicyType type) { - supportedPolicyTypes.remove(type.name()); + public void clearSupportedPolicyTypes() { + supportedPolicyTypes.clear(); } /** @@ -173,6 +160,6 @@ public class Ric { /** * The Ric cannot be contacted. */ - NOT_REACHABLE + NOT_REACHABLE, RECOVERING } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java index 153d193b..6b8138fc 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java @@ -32,7 +32,7 @@ import org.oransc.policyagent.exceptions.ServiceException; public class Rics { Map rics = new HashMap<>(); - public void put(Ric ric) { + public synchronized void put(Ric ric) { rics.put(ric.name(), ric); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java index 512d065b..81ef7ff9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java @@ -35,11 +35,11 @@ public class Service { ping(); } - public String getName() { + public synchronized String getName() { return this.name; } - public Duration getKeepAliveInterval() { + public synchronized Duration getKeepAliveInterval() { return this.keepAliveInterval; } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index 7984a62a..9ac9d705 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -20,10 +20,12 @@ package org.oransc.policyagent.tasks; +import java.util.Collection; + import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; -import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,13 +47,15 @@ public class RepositorySupervision { private final Rics rics; private final Policies policies; + private final PolicyTypes policyTypes; private final A1Client a1Client; @Autowired - public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client) { + public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes) { this.rics = rics; this.policies = policies; this.a1Client = a1Client; + this.policyTypes = policyTypes; } /** @@ -66,45 +70,59 @@ public class RepositorySupervision { private Flux createTask() { return Flux.fromIterable(rics.getRics()) // - .groupBy(ric -> ric.state()) // - .flatMap(fluxGroup -> handleGroup(fluxGroup.key(), fluxGroup)); + .flatMap(ric -> checkInstances(ric)) // + .flatMap(ric -> checkTypes(ric)); } - private Flux handleGroup(Ric.RicState key, Flux fluxGroup) { - logger.debug("Handling group {}", key); - switch (key) { - case ACTIVE: - return fluxGroup.flatMap(this::checkActive); + private Mono checkInstances(Ric ric) { - case NOT_REACHABLE: - return fluxGroup.flatMap(this::checkNotReachable); + return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // + .onErrorResume(t -> Mono.empty()) // + .flatMap(ricP -> validateInstances(ricP, ric)); + } - default: - // If not initiated, leave it to the StartupService. - return Flux.empty(); - } + private Flux junk() { + return Flux.empty(); } - private Mono checkActive(Ric ric) { - logger.debug("Handling active ric {}", ric); - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .filter(policyId -> !policies.containsPolicy(policyId)) // - .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId)) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(); + private Mono validateInstances(Collection ricPolicies, Ric ric) { + if (ricPolicies.size() != policies.getForRic(ric.name()).size()) { + return startRecovery(ric); + } + for (String policyId : ricPolicies) { + if (!policies.containsPolicy(policyId)) { + return startRecovery(ric); + } + } return Mono.just(ric); } - private Mono checkNotReachable(Ric ric) { - logger.debug("Handling not reachable ric {}", ric); - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .filter(policyId -> !policies.containsPolicy(policyId)) // - .doOnNext(policyId -> logger.debug("Deleting policy {}", policyId)) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(null, null, () -> ric.setState(RicState.ACTIVE)); + private Mono checkTypes(Ric ric) { + return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + .onErrorResume(t -> { + return Mono.empty(); + }) // + .flatMap(ricTypes -> validateTypes(ricTypes, ric)); + } + + private Mono validateTypes(Collection ricTypes, Ric ric) { + if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) { + return startRecovery(ric); + } + for (String typeName : ricTypes) { + if (!ric.isSupportingType(typeName)) { + return startRecovery(ric); + } + } return Mono.just(ric); } + private Mono startRecovery(Ric ric) { + RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies); + recovery.run(ric); + return Mono.empty(); + } + private void onRicChecked(Ric ric) { logger.info("Ric: " + ric.name() + " checked"); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java new file mode 100644 index 00000000..3883585b --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java @@ -0,0 +1,130 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.tasks; + +import java.util.Collection; +import java.util.Vector; + +import org.oransc.policyagent.clients.A1Client; +import org.oransc.policyagent.exceptions.ServiceException; +import org.oransc.policyagent.repository.ImmutablePolicyType; +import org.oransc.policyagent.repository.Policies; +import org.oransc.policyagent.repository.Policy; +import org.oransc.policyagent.repository.PolicyType; +import org.oransc.policyagent.repository.PolicyTypes; +import org.oransc.policyagent.repository.Ric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Loads information about RealTime-RICs at startup. + */ +public class RicRecoveryTask { + + private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class); + + private final A1Client a1Client; + private final PolicyTypes policyTypes; + private final Policies policies; + + public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) { + this.a1Client = a1Client; + this.policyTypes = policyTypes; + this.policies = policies; + } + + public void run(Collection rics) { + for (Ric ric : rics) { + run(ric); + } + } + + public void run(Ric ric) { + logger.debug("Handling ric: {}", ric.getConfig().name()); + + synchronized (ric) { + if (ric.state().equals(Ric.RicState.RECOVERING)) { + return; // Already running + } + ric.setState(Ric.RicState.RECOVERING); + } + Flux recoveredTypes = recoverPolicyTypes(ric); + Flux deletedPolicies = deletePolicies(ric); + + Flux.merge(recoveredTypes, deletedPolicies) // + .subscribe(x -> logger.debug("Recover: " + x), // + throwable -> onError(ric, throwable), // + () -> onComplete(ric)); + } + + private void onComplete(Ric ric) { + logger.debug("Recovery completed for:" + ric.name()); + ric.setState(Ric.RicState.ACTIVE); + + } + + private void onError(Ric ric, Throwable t) { + logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); + ric.setState(Ric.RicState.NOT_REACHABLE); + } + + private Flux recoverPolicyTypes(Ric ric) { + ric.clearSupportedPolicyTypes(); + return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + .flatMapMany(types -> Flux.fromIterable(types)) // + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) + .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) // + .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); // + } + + private Mono getPolicyType(Ric ric, String policyTypeId) { + if (policyTypes.contains(policyTypeId)) { + try { + return Mono.just(policyTypes.getType(policyTypeId)); + } catch (ServiceException e) { + return Mono.error(e); + } + } + return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) // + .flatMap(schema -> createPolicyType(policyTypeId, schema)); + } + + private Mono createPolicyType(String policyTypeId, String schema) { + PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build(); + policyTypes.put(pt); + return Mono.just(pt); + } + + private Flux deletePolicies(Ric ric) { + Collection ricPolicies = new Vector<>(policies.getForRic(ric.name())); + for (Policy policy : ricPolicies) { + this.policies.remove(policy); + } + + return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // + .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) // + .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) + .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); // + } +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java index 283e8ea9..d2356ea9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java @@ -22,21 +22,16 @@ package org.oransc.policyagent.tasks; import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.exceptions.ServiceException; -import org.oransc.policyagent.repository.ImmutablePolicyType; -import org.oransc.policyagent.repository.PolicyType; +import org.oransc.policyagent.configuration.RicConfig; +import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; -import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - /** * Loads information about RealTime-RICs at startup. */ @@ -57,78 +52,29 @@ public class StartupService { @Autowired private A1Client a1Client; - StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) { + @Autowired + private Policies policies; + + StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client, + Policies policies) { this.applicationConfig = appConfig; this.rics = rics; this.policyTypes = policyTypes; this.a1Client = a1Client; + this.policies = policies; } /** * Reads the configured Rics and performs the service discovery. The result is put into the repository. */ public void startup() { + logger.debug("Starting up"); applicationConfig.initialize(); - Flux.fromIterable(applicationConfig.getRicConfigs()) // - .map(ricConfig -> new Ric(ricConfig)) // - .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())) // - .flatMap(this::addPolicyTypesForRic) // - .flatMap(this::deletePoliciesForRic) // - .doOnNext(rics::put) // - .subscribe(); - } - - private Mono addPolicyTypesForRic(Ric ric) { - a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) - .flatMap((policyTypeId) -> addTypeToRepo(ric, policyTypeId)) // - .flatMap(type -> addTypeToRic(ric, type)) // - .subscribe(null, cause -> setRicToNotReachable(ric, cause), () -> setRicToActive(ric)); - return Mono.just(ric); - } - - private Mono addTypeToRepo(Ric ric, String policyTypeId) { - if (policyTypes.contains(policyTypeId)) { - try { - return Mono.just(policyTypes.getType(policyTypeId)); - } catch (ServiceException e) { - return Mono.error(e); - } + for (RicConfig ricConfig : applicationConfig.getRicConfigs()) { + rics.put(new Ric(ricConfig)); } - return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) // - .flatMap(schema -> createPolicyType(policyTypeId, schema)); - } - - private Mono createPolicyType(String policyTypeId, String schema) { - PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build(); - policyTypes.put(pt); - return Mono.just(pt); - } - - private Mono addTypeToRic(Ric ric, PolicyType policyType) { - ric.addSupportedPolicyType(policyType); - return Mono.just(policyType); - } - - private Mono deletePoliciesForRic(Ric ric) { - if (!Ric.RicState.NOT_REACHABLE.equals(ric.state())) { - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .doOnNext( - policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(null, cause -> setRicToNotReachable(ric, cause), null); - } - - return Mono.just(ric); - } - - private void setRicToNotReachable(Ric ric, Throwable t) { - ric.setState(Ric.RicState.NOT_REACHABLE); - logger.info("Unable to connect to ric {}. Cause: {}", ric.name(), t.getMessage()); - } - - private void setRicToActive(Ric ric) { - ric.setState(RicState.ACTIVE); + RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies); + recoveryTask.run(rics.getRics()); // recover all Rics } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index 8a6523c4..365d418a 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -29,11 +29,13 @@ import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import java.net.URL; +import java.util.Collection; import java.util.List; import java.util.Vector; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.configuration.ImmutableRicConfig; import org.oransc.policyagent.configuration.RicConfig; @@ -61,6 +63,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.web.client.RestTemplate; +import reactor.core.publisher.Mono; @ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) @@ -89,25 +92,81 @@ public class ApplicationTest { } } + static class A1ClientMock implements A1Client { + private final Policies policies; + private final PolicyTypes policyTypes; + + A1ClientMock(Policies policies, PolicyTypes policyTypes) { + this.policies = policies; + this.policyTypes = policyTypes; + } + + @Override + public Mono> getPolicyTypeIdentities(String nearRtRicUrl) { + Vector result = new Vector<>(); + for (PolicyType p : this.policyTypes.getAll()) { + result.add(p.name()); + } + return Mono.just(result); + } + + @Override + public Mono getPolicyType(String nearRtRicUrl, String policyTypeId) { + try { + return Mono.just(this.policies.get(policyTypeId).json()); + } catch (Exception e) { + return Mono.error(e); + } + } + + @Override + public Mono putPolicy(String nearRtRicUrl, String policyId, String policyString) { + return Mono.just("OK"); + } + + @Override + public Mono deletePolicy(String nearRtRicUrl, String policyId) { + return Mono.just("OK"); + } + + @Override + public Mono> getPolicyIdentities(String nearRtRicUrl) { + return Mono.empty(); // problem is that a recovery will start + } + } + /** * Overrides the BeanFactory. */ @TestConfiguration static class TestBeanFactory { + private final Rics rics = new Rics(); + private final Policies policies = new Policies(); + private final PolicyTypes policyTypes = new PolicyTypes(); @Bean public ApplicationConfig getApplicationConfig() { return new MockApplicationConfig(); } + @Bean + A1Client getA1Client() { + return new A1ClientMock(this.policies, this.policyTypes); + } + + @Bean + public Policies getPolicies() { + return this.policies; + } + + @Bean + public PolicyTypes getPolicyTypes() { + return this.policyTypes; + } + @Bean public Rics getRics() { - Rics rics = new Rics(); - rics.put(new Ric(ImmutableRicConfig.builder().name("kista_1").baseUrl("kista_url") - .managedElementIds(new Vector<>()).build())); - rics.put(new Ric(ImmutableRicConfig.builder().name("ric1").baseUrl("ric_url") - .managedElementIds(new Vector<>()).build())); - return rics; + return this.rics; } } @@ -146,8 +205,6 @@ public class ApplicationTest { assertThat(rsp).isEqualTo("ric1"); } - // managedElmentId -> nodeName - @Test public void testPutPolicy() throws Exception { putService("service1"); @@ -155,6 +212,7 @@ public class ApplicationTest { String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1"; String json = "{}"; addPolicyType("type1", "ric1"); + this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE); this.restTemplate.put(url, json); @@ -234,7 +292,8 @@ public class ApplicationTest { public void testDeletePolicy() throws Exception { reset(); String url = baseUrl() + "/policy?instance=id"; - addPolicy("id", "typeName", "service1", "ric1"); + Policy policy = addPolicy("id", "typeName", "service1", "ric1"); + policy.ric().setState(Ric.RicState.ACTIVE); assertThat(policies.size()).isEqualTo(1); this.restTemplate.delete(url); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index 743fa665..dd57710c 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -27,12 +27,13 @@ import java.io.File; import java.io.IOException; import java.net.URL; import java.nio.file.Files; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Vector; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.repository.ImmutablePolicyType; @@ -46,12 +47,10 @@ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.web.server.LocalServerPort; import org.springframework.context.annotation.Bean; -import org.springframework.test.context.junit4.SpringRunner; - -import reactor.core.publisher.Flux; +import org.springframework.test.context.junit.jupiter.SpringExtension; import reactor.core.publisher.Mono; -@RunWith(SpringRunner.class) +@ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) public class MockPolicyAgent { @@ -80,7 +79,7 @@ public class MockPolicyAgent { getPolicies(nearRtRicUrl).put(policyId, policyString); } - public Iterable getPolicyIdentities(String nearRtRicUrl) { + public Collection getPolicyIdentities(String nearRtRicUrl) { return getPolicies(nearRtRicUrl).keySet(); } @@ -106,18 +105,18 @@ public class MockPolicyAgent { } @Override - public Flux getPolicyTypeIdentities(String nearRtRicUrl) { + public Mono> getPolicyTypeIdentities(String nearRtRicUrl) { Vector result = new Vector<>(); for (PolicyType p : this.policyTypes.getAll()) { result.add(p.name()); } - return Flux.fromIterable(result); + return Mono.just(result); } @Override - public Flux getPolicyIdentities(String nearRtRicUrl) { - Iterable result = policies.getPolicyIdentities(nearRtRicUrl); - return Flux.fromIterable(result); + public Mono> getPolicyIdentities(String nearRtRicUrl) { + Collection result = policies.getPolicyIdentities(nearRtRicUrl); + return Mono.just(result); } @Override diff --git a/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java b/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java index 7b11b9ac..f9a93c8c 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientImplTest.java @@ -36,7 +36,6 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -74,10 +73,9 @@ public class A1ClientImplTest { Mono policyTypeIds = Mono.just(Arrays.toString(new String[] {POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME})); when(asyncRestClientMock.get(POLICYTYPES_IDENTITIES_URL)).thenReturn(policyTypeIds); - Flux policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL); + Mono policyTypeIdsFlux = a1Client.getPolicyTypeIdentities(RIC_URL); verify(asyncRestClientMock).get(POLICYTYPES_IDENTITIES_URL); - StepVerifier.create(policyTypeIdsFlux).expectNext(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME).expectComplete() - .verify(); + StepVerifier.create(policyTypeIdsFlux).expectNextCount(1).expectComplete().verify(); } @Test @@ -85,9 +83,9 @@ public class A1ClientImplTest { Mono policyIds = Mono.just(Arrays.toString(new String[] {POLICY_1_ID, POLICY_2_ID})); when(asyncRestClientMock.get(POLICIES_IDENTITIES_URL)).thenReturn(policyIds); - Flux policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL); + Mono policyIdsFlux = a1Client.getPolicyIdentities(RIC_URL); verify(asyncRestClientMock).get(POLICIES_IDENTITIES_URL); - StepVerifier.create(policyIdsFlux).expectNext(POLICY_1_ID, POLICY_2_ID).expectComplete().verify(); + StepVerifier.create(policyIdsFlux).expectNextCount(1).expectComplete().verify(); } @Test diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java index 195a4869..791acee2 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.util.Arrays; +import java.util.Collection; import java.util.Vector; import org.junit.jupiter.api.Test; @@ -42,11 +43,10 @@ import org.oransc.policyagent.repository.ImmutablePolicyType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; +import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; - -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) @@ -93,19 +93,23 @@ public class RepositorySupervisionTest { .build(); Policies policies = new Policies(); policies.put(policy1); + PolicyTypes types = new PolicyTypes(); - RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock); + RepositorySupervision supervisorUnderTest = new RepositorySupervision(rics, policies, a1ClientMock, types); - when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(Flux.just("policyId1", "policyId2")); + Mono> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2")); + when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policyIds); when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty()); + when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds); + when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema")); supervisorUnderTest.checkAllRics(); - await().untilAsserted(() -> RicState.ACTIVE.equals(rics.getRic("ric2").state())); + await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state())); + await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state())); + await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state())); - verify(a1ClientMock).getPolicyIdentities("baseUrl1"); verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2"); - verify(a1ClientMock).getPolicyIdentities("baseUrl2"); verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2"); verifyNoMoreInteractions(a1ClientMock); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java index 5cfabd9c..0c254d59 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java @@ -32,6 +32,8 @@ import static org.mockito.Mockito.when; import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE; import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE; +import java.util.Arrays; +import java.util.Collection; import java.util.Vector; import org.junit.jupiter.api.Test; @@ -44,11 +46,10 @@ import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.configuration.ImmutableRicConfig; import org.oransc.policyagent.configuration.RicConfig; +import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Rics; - -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) @@ -80,18 +81,18 @@ public class StartupServiceTest { ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); - Flux fluxType1 = Flux.just(POLICY_TYPE_1_NAME); - Flux fluxType2 = Flux.just(POLICY_TYPE_2_NAME); - when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1) - .thenReturn(fluxType1.concatWith(fluxType2)); - Flux policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2}); + Mono> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); + Mono> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME)); + when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2); + Mono> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2)); when(a1ClientMock.getPolicyIdentities(anyString())).thenReturn(policies); when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema")); when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK")); Rics rics = new Rics(); PolicyTypes policyTypes = new PolicyTypes(); - StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock); + StartupService serviceUnderTest = + new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies()); serviceUnderTest.startup(); @@ -117,7 +118,8 @@ public class StartupServiceTest { "Not correct no of types supported for ric " + FIRST_RIC_NAME); assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME), POLICY_TYPE_1_NAME + " not supported by ric " + FIRST_RIC_NAME); - assertEquals(1, firstRic.getManagedNodes().size(), "Not correct no of managed nodes for ric " + FIRST_RIC_NAME); + assertEquals(1, firstRic.getManagedElementIds().size(), + "Not correct no of managed nodes for ric " + FIRST_RIC_NAME); assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME); Ric secondRic = rics.get(SECOND_RIC_NAME); @@ -130,7 +132,7 @@ public class StartupServiceTest { POLICY_TYPE_1_NAME + " not supported by ric " + SECOND_RIC_NAME); assertTrue(secondRic.isSupportingType(POLICY_TYPE_2_NAME), POLICY_TYPE_2_NAME + " not supported by ric " + SECOND_RIC_NAME); - assertEquals(2, secondRic.getManagedNodes().size(), + assertEquals(2, secondRic.getManagedElementIds().size(), "Not correct no of managed nodes for ric " + SECOND_RIC_NAME); assertTrue(secondRic.isManaging(MANAGED_NODE_B), MANAGED_NODE_B + " not managed by ric " + SECOND_RIC_NAME); assertTrue(secondRic.isManaging(MANAGED_NODE_C), MANAGED_NODE_C + " not managed by ric " + SECOND_RIC_NAME); @@ -143,18 +145,19 @@ public class StartupServiceTest { ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); - Flux fluxType1 = Flux.just(POLICY_TYPE_1_NAME); - doReturn(Flux.error(new Exception("Unable to contact ric.")), fluxType1).when(a1ClientMock) - .getPolicyTypeIdentities(anyString()); + Mono> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); + Mono error = Mono.error(new Exception("Unable to contact ric.")); + doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString()); - Flux policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2}); + Mono> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2)); doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString()); when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema")); when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK")); Rics rics = new Rics(); PolicyTypes policyTypes = new PolicyTypes(); - StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock); + StartupService serviceUnderTest = + new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies()); serviceUnderTest.startup(); @@ -173,19 +176,19 @@ public class StartupServiceTest { ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); - Flux fluxType1 = Flux.just(POLICY_TYPE_1_NAME); - Flux fluxType2 = Flux.just(POLICY_TYPE_2_NAME); - when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(fluxType1) - .thenReturn(fluxType1.concatWith(fluxType2)); + Mono> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); + Mono> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME)); + when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2); when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema")); - Flux policies = Flux.just(new String[] {POLICY_ID_1, POLICY_ID_2}); - doReturn(Flux.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock) + Mono> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2)); + doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock) .getPolicyIdentities(anyString()); when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK")); Rics rics = new Rics(); PolicyTypes policyTypes = new PolicyTypes(); - StartupService serviceUnderTest = new StartupService(appConfigMock, rics, policyTypes, a1ClientMock); + StartupService serviceUnderTest = + new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies()); serviceUnderTest.startup(); @@ -197,14 +200,19 @@ public class StartupServiceTest { assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME); } - private RicConfig getRicConfig(String name, String baseUrl, String... nodeNames) { - Vector managedNodes = new Vector(1); - for (String nodeName : nodeNames) { - managedNodes.add(nodeName); + @SafeVarargs + private Vector toVector(T... objs) { + Vector result = new Vector<>(); + for (T o : objs) { + result.add(o); } + return result; + } + + private RicConfig getRicConfig(String name, String baseUrl, String... managedElementIds) { ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() // .name(name) // - .managedElementIds(managedNodes) // + .managedElementIds(toVector(managedElementIds)) // .baseUrl(baseUrl) // .build(); return ricConfig; -- 2.16.6