From f694dec2ff16069e6bb5c9de845278f44e8c9591 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Thu, 16 Jan 2020 07:27:58 +0100 Subject: [PATCH] Changed in config will add and recover Rics When the application config is changed, Ric:s will be added/removed in the repository and recovered. Fixed a lot of thread synchronization issues. Issue-ID: NONRTRIC-84 Change-Id: I42ff66bc59c585d2d9f65e9e00a0ac88998a44bf Signed-off-by: PatrikBuhr --- .../configuration/ApplicationConfig.java | 85 +++++++++++++++++--- .../configuration/ApplicationConfigLoader.java | 92 ---------------------- .../policyagent/controllers/PolicyController.java | 70 ++++++++-------- .../controllers/RicRepositoryController.java | 17 ++-- .../policyagent/controllers/ServiceController.java | 9 ++- .../oransc/policyagent/repository/Policies.java | 5 +- .../oransc/policyagent/repository/PolicyTypes.java | 9 ++- .../org/oransc/policyagent/repository/Ric.java | 14 ++-- .../org/oransc/policyagent/repository/Rics.java | 15 ++-- .../oransc/policyagent/repository/Services.java | 6 +- .../policyagent/tasks/RepositorySupervision.java | 30 +++++-- .../oransc/policyagent/tasks/RicRecoveryTask.java | 41 ++++++---- .../policyagent/tasks/ServiceSupervision.java | 18 +++-- .../oransc/policyagent/tasks/StartupService.java | 28 +++++-- .../org/oransc/policyagent/ApplicationTest.java | 8 +- .../org/oransc/policyagent/MockPolicyAgent.java | 5 +- .../configuration/ApplicationConfigTest.java | 14 ++-- .../tasks/RepositorySupervisionTest.java | 10 +-- .../policyagent/tasks/StartupServiceTest.java | 46 +++++------ .../org/oransc/policyagent/utils/MockA1Client.java | 24 +++--- 20 files changed, 290 insertions(+), 256 deletions(-) delete mode 100644 policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigLoader.java diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index 00e5223f..7e522842 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -33,6 +33,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.ServiceLoader; @@ -68,25 +71,30 @@ public class ApplicationConfig { Properties systemEnvironment; private Disposable refreshConfigTask = null; + private Collection observers = new Vector<>(); + + private Map ricConfigs = new HashMap<>(); @NotEmpty private String filepath; - private Vector ricConfigs; - @Autowired public ApplicationConfig() { } + protected String getLocalConfigurationFilePath() { + return this.filepath; + } + public synchronized void setFilepath(String filepath) { this.filepath = filepath; } - public Vector getRicConfigs() { - return this.ricConfigs; + public synchronized Collection getRicConfigs() { + return this.ricConfigs.values(); } - public Optional lookupRicConfigForManagedElement(String managedElementId) { + public synchronized Optional lookupRicConfigForManagedElement(String managedElementId) { for (RicConfig ricConfig : getRicConfigs()) { if (ricConfig.managedElementIds().contains(managedElementId)) { return Optional.of(ricConfig); @@ -106,7 +114,7 @@ public class ApplicationConfig { public void initialize() { stop(); - loadConfigurationFromFile(this.filepath); + loadConfigurationFromFile(); refreshConfigTask = createRefreshTask() // .subscribe(notUsed -> logger.info("Refreshed configuration data"), @@ -114,6 +122,18 @@ public class ApplicationConfig { () -> logger.error("Configuration refresh terminated")); } + public static enum RicConfigUpdate { + ADDED, CHANGED, REMOVED + } + + public interface Observer { + void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event); + } + + public void addObserver(Observer o) { + this.observers.add(o); + } + Mono getEnvironment(Properties systemEnvironment) { return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment); } @@ -154,8 +174,48 @@ public class ApplicationConfig { return this; } - private synchronized void setConfiguration(@NotNull Vector ricConfigs) { - this.ricConfigs = ricConfigs; + private class Notification { + final RicConfig ric; + final RicConfigUpdate event; + + Notification(RicConfig ric, RicConfigUpdate event) { + this.ric = ric; + this.event = event; + } + } + + private void setConfiguration(@NotNull Collection ricConfigs) { + Collection notifications = new Vector<>(); + synchronized (this) { + Map newRicConfigs = new HashMap<>(); + for (RicConfig newConfig : ricConfigs) { + RicConfig oldConfig = this.ricConfigs.get(newConfig.name()); + if (oldConfig == null) { + newRicConfigs.put(newConfig.name(), newConfig); + notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED)); + this.ricConfigs.remove(newConfig.name()); + } else if (!newConfig.equals(newConfig)) { + notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED)); + newRicConfigs.put(newConfig.name(), newConfig); + this.ricConfigs.remove(newConfig.name()); + } else { + newRicConfigs.put(oldConfig.name(), oldConfig); + } + } + for (RicConfig deletedConfig : this.ricConfigs.values()) { + notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED)); + } + this.ricConfigs = newRicConfigs; + } + notifyObservers(notifications); + } + + private void notifyObservers(Collection notifications) { + for (Observer observer : this.observers) { + for (Notification notif : notifications) { + observer.onRicConfigUpdate(notif.ric, notif.event); + } + } } public void stop() { @@ -168,7 +228,12 @@ public class ApplicationConfig { /** * Reads the configuration from file. */ - protected void loadConfigurationFromFile(String filepath) { + public void loadConfigurationFromFile() { + String filepath = getLocalConfigurationFilePath(); + if (filepath == null) { + logger.debug("No localconfiguration file used"); + return; + } GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); @@ -180,7 +245,7 @@ public class ApplicationConfig { } ApplicationConfigParser appParser = new ApplicationConfigParser(); appParser.parse(rootObject); - this.ricConfigs = appParser.getRicConfigs(); + setConfiguration(appParser.getRicConfigs()); logger.info("Local configuration file loaded: {}", filepath); } catch (JsonSyntaxException | ServiceException | IOException e) { logger.trace("Local configuration file not loaded: {}", filepath, e); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigLoader.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigLoader.java deleted file mode 100644 index e1eb17a5..00000000 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigLoader.java +++ /dev/null @@ -1,92 +0,0 @@ -/*- - * ========================LICENSE_START================================= - * O-RAN-SC - * %% - * Copyright (C) 2019 Nordix Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================LICENSE_END=================================== - */ - -package org.oransc.policyagent.configuration; - -import io.swagger.annotations.ApiOperation; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ScheduledFuture; - -import javax.annotation.PostConstruct; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; - -@Component -@EnableScheduling -class ApplicationConfigLoader { - - private static final Logger logger = LoggerFactory.getLogger(ApplicationConfigLoader.class); - private static List> scheduledFutureList = new ArrayList<>(); - private static final Duration CONFIGURATION_REFRESH_INTERVAL = Duration.ofSeconds(15); - - private final TaskScheduler taskScheduler; - private final ApplicationConfig configuration; - - @Autowired - public ApplicationConfigLoader(TaskScheduler taskScheduler, ApplicationConfig configuration) { - this.taskScheduler = taskScheduler; - this.configuration = configuration; - } - - /** - * Function which have to stop tasks execution. - * - * @return response entity about status of cancellation operation - */ - @ApiOperation(value = "Get response on stopping task execution") - public synchronized Mono> getResponseFromCancellationOfTasks() { - scheduledFutureList.forEach(x -> x.cancel(false)); - scheduledFutureList.clear(); - logger.info("Stopped"); - return Mono.just(new ResponseEntity<>("Service has already been stopped!", HttpStatus.CREATED)); - } - - @PostConstruct - @ApiOperation(value = "Start task if possible") - public synchronized boolean start() { - logger.info("Start scheduling Datafile workflow"); - configuration.initialize(); - - if (scheduledFutureList.isEmpty()) { - scheduledFutureList - .add(taskScheduler.scheduleWithFixedDelay(this::refreshConfiguration, CONFIGURATION_REFRESH_INTERVAL)); - return true; - } else { - return false; - } - } - - private void refreshConfiguration() { - // TBD - - } - -} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java index e33fb7e4..246fdd45 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java @@ -77,15 +77,17 @@ public class PolicyController { @ApiOperation(value = "Returns policy type schema definitions") @ApiResponses(value = {@ApiResponse(code = 200, message = "Policy Types found")}) public ResponseEntity getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) { - if (ricName == null) { - Collection types = this.policyTypes.getAll(); - return new ResponseEntity(toPolicyTypeSchemasJson(types), HttpStatus.OK); - } else { - try { - Collection types = rics.getRic(ricName).getSupportedPolicyTypes(); + synchronized (this.policyTypes) { + if (ricName == null) { + Collection types = this.policyTypes.getAll(); return new ResponseEntity(toPolicyTypeSchemasJson(types), HttpStatus.OK); - } catch (ServiceException e) { - return new ResponseEntity(e.toString(), HttpStatus.NOT_FOUND); + } else { + try { + Collection types = rics.getRic(ricName).getSupportedPolicyTypes(); + return new ResponseEntity(toPolicyTypeSchemasJson(types), HttpStatus.OK); + } catch (ServiceException e) { + return new ResponseEntity(e.toString(), HttpStatus.NOT_FOUND); + } } } } @@ -106,15 +108,17 @@ public class PolicyController { @ApiOperation(value = "Returns policy types") @ApiResponses(value = {@ApiResponse(code = 200, message = "Policy Types found")}) public ResponseEntity getPolicyTypes(@RequestParam(name = "ric", required = false) String ricName) { - if (ricName == null) { - Collection types = this.policyTypes.getAll(); - return new ResponseEntity(toPolicyTypeIdsJson(types), HttpStatus.OK); - } else { - try { - Collection types = rics.getRic(ricName).getSupportedPolicyTypes(); + synchronized (this.policyTypes) { + if (ricName == null) { + Collection types = this.policyTypes.getAll(); return new ResponseEntity(toPolicyTypeIdsJson(types), HttpStatus.OK); - } catch (ServiceException e) { - return new ResponseEntity(e.toString(), HttpStatus.NOT_FOUND); + } else { + try { + Collection types = rics.getRic(ricName).getSupportedPolicyTypes(); + return new ResponseEntity(toPolicyTypeIdsJson(types), HttpStatus.OK); + } catch (ServiceException e) { + return new ResponseEntity(e.toString(), HttpStatus.NOT_FOUND); + } } } } @@ -140,7 +144,7 @@ public class PolicyController { public Mono> deletePolicy( // @RequestParam(name = "instance", required = true) String id) { Policy policy = policies.get(id); - if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) { + if (policy != null && policy.ric().state().equals(Ric.RicState.IDLE)) { return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) // .doOnEach(notUsed -> policies.removeId(id)) // .flatMap(notUsed -> { @@ -163,7 +167,7 @@ public class PolicyController { Ric ric = rics.get(ricName); PolicyType type = policyTypes.get(typeName); - if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) { + if (ric != null && type != null && ric.state().equals(Ric.RicState.IDLE)) { Policy policy = ImmutablePolicy.builder() // .id(instanceId) // .json(jsonBody) // @@ -189,22 +193,24 @@ public class PolicyController { @RequestParam(name = "ric", required = false) String ric, // @RequestParam(name = "service", required = false) String service) // { - Collection result = null; + synchronized (policies) { + Collection result = null; - if (type != null) { - result = policies.getForType(type); - result = filter(result, null, ric, service); - } else if (service != null) { - result = policies.getForService(service); - result = filter(result, type, ric, null); - } else if (ric != null) { - result = policies.getForRic(ric); - result = filter(result, type, null, service); - } else { - result = policies.getAll(); - } + if (type != null) { + result = policies.getForType(type); + result = filter(result, null, ric, service); + } else if (service != null) { + result = policies.getForService(service); + result = filter(result, type, ric, null); + } else if (ric != null) { + result = policies.getForRic(ric); + result = filter(result, type, null, service); + } else { + result = policies.getAll(); + } - return new ResponseEntity(policiesToJson(result), HttpStatus.OK); + return new ResponseEntity(policiesToJson(result), HttpStatus.OK); + } } private boolean include(String filter, String value) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java index 797ec719..6b413b2f 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java @@ -94,14 +94,17 @@ public class RicRepositoryController { }) public ResponseEntity getRics( @RequestParam(name = "policyType", required = false) String supportingPolicyType) { + Vector result = new Vector<>(); - for (Ric ric : rics.getRics()) { - if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) { - result.add(ImmutableRicInfo.builder() // - .name(ric.name()) // - .managedElementIds(ric.getManagedElementIds()) // - .policyTypes(ric.getSupportedPolicyTypeNames()) // - .build()); + synchronized (rics) { + for (Ric ric : rics.getRics()) { + if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) { + result.add(ImmutableRicInfo.builder() // + .name(ric.name()) // + .managedElementIds(ric.getManagedElementIds()) // + .policyTypes(ric.getSupportedPolicyTypeNames()) // + .build()); + } } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java index ad062311..bda5e095 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java @@ -91,10 +91,11 @@ public class ServiceController { @GetMapping("/services") public ResponseEntity getServices() { - Collection allServices = this.services.getAll(); - Collection result = new Vector<>(allServices.size()); - for (Service s : allServices) { - result.add(toServiceStatus(s)); + Collection result = new Vector<>(); + synchronized (this.services) { + for (Service s : this.services.getAll()) { + result.add(toServiceStatus(s)); + } } return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java index 58c91b35..a279db54 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java @@ -21,6 +21,7 @@ package org.oransc.policyagent.repository; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -68,7 +69,7 @@ public class Policies { if (map == null) { return new Vector(); } - return map.values(); + return Collections.unmodifiableCollection(map.values()); } public synchronized boolean containsPolicy(String id) { @@ -88,7 +89,7 @@ public class Policies { } public synchronized Collection getAll() { - return policiesId.values(); + return Collections.unmodifiableCollection(policiesId.values()); } public synchronized Collection getForService(String service) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java index 9dee6f91..77239831 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java @@ -21,6 +21,7 @@ package org.oransc.policyagent.repository; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -48,19 +49,19 @@ public class PolicyTypes { types.put(type.name(), type); } - public boolean contains(String policyType) { + public synchronized boolean contains(String policyType) { return types.containsKey(policyType); } public synchronized Collection getAll() { - return types.values(); + return Collections.unmodifiableCollection(types.values()); } - public int size() { + public synchronized int size() { return types.size(); } - public void clear() { + public synchronized void clear() { this.types.clear(); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java index 82d84f12..235ee1ab 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java @@ -32,7 +32,7 @@ import org.oransc.policyagent.configuration.RicConfig; */ public class Ric { private final RicConfig ricConfig; - private RicState state = RicState.NOT_INITIATED; + private RicState state = RicState.UNDEFINED; private Map supportedPolicyTypes = new HashMap<>(); /** @@ -150,16 +150,16 @@ public class Ric { */ public static enum RicState { /** - * The Ric has not been initiated yet. + * The agent view of the agent may be inconsistent */ - NOT_INITIATED, + UNDEFINED, /** - * The Ric is working fine. + * The normal state. Policies can be configured. */ - ACTIVE, + IDLE, /** - * The Ric cannot be contacted. + * The Ric states are recovered */ - NOT_REACHABLE, RECOVERING + RECOVERING } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java index 6b8138fc..bdf99302 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java @@ -20,7 +20,6 @@ package org.oransc.policyagent.repository; -import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -36,11 +35,11 @@ public class Rics { rics.put(ric.name(), ric); } - public Collection getRics() { + public synchronized Iterable getRics() { return rics.values(); } - public Ric getRic(String name) throws ServiceException { + public synchronized Ric getRic(String name) throws ServiceException { Ric ric = rics.get(name); if (ric == null) { throw new ServiceException("Could not find ric: " + name); @@ -48,15 +47,19 @@ public class Rics { return ric; } - public Ric get(String name) { + public synchronized Ric get(String name) { return rics.get(name); } - public int size() { + public synchronized void remove(String name) { + rics.remove(name); + } + + public synchronized int size() { return rics.size(); } - public void clear() { + public synchronized void clear() { this.rics.clear(); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java index 35cae717..2e1d8da7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java @@ -20,7 +20,6 @@ package org.oransc.policyagent.repository; -import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -50,13 +49,10 @@ public class Services { public synchronized void put(Service service) { logger.debug("Put service: " + service.getName()); - // TODO a threading problem is that this may happend at the same time as someone is iterating (getAll()) - // This is a generic problem services.put(service.getName(), service); } - // TODO the returned value should be unmodifiable if possible - public synchronized Collection getAll() { + public synchronized Iterable getAll() { return services.values(); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java index bfd40e52..1a903827 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java @@ -26,6 +26,7 @@ import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; @@ -69,25 +70,38 @@ public class RepositorySupervision { public void checkAllRics() { logger.debug("Checking Rics starting"); createTask().subscribe(this::onRicChecked, this::onError, this::onComplete); - } private Flux createTask() { - return Flux.fromIterable(rics.getRics()) // - .flatMap(ric -> checkInstances(ric)) // - .flatMap(ric -> checkTypes(ric)); + synchronized (this.rics) { + return Flux.fromIterable(rics.getRics()) // + .flatMap(ric -> checkRicState(ric)) // + .flatMap(ric -> checkRicPolicies(ric)) // + .flatMap(ric -> checkRicPolicyTypes(ric)); + } } - private Mono checkInstances(Ric ric) { + private Mono checkRicState(Ric ric) { + if (ric.state() == RicState.UNDEFINED) { + return startRecovery(ric); + } else if (ric.state() == RicState.RECOVERING) { + return Mono.empty(); + } else { + return Mono.just(ric); + } + } + private Mono checkRicPolicies(Ric ric) { return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // .onErrorResume(t -> Mono.empty()) // .flatMap(ricP -> validateInstances(ricP, ric)); } private Mono validateInstances(Collection ricPolicies, Ric ric) { - if (ricPolicies.size() != policies.getForRic(ric.name()).size()) { - return startRecovery(ric); + synchronized (this.policies) { + if (ricPolicies.size() != policies.getForRic(ric.name()).size()) { + return startRecovery(ric); + } } for (String policyId : ricPolicies) { if (!policies.containsPolicy(policyId)) { @@ -97,7 +111,7 @@ public class RepositorySupervision { return Mono.just(ric); } - private Mono checkTypes(Ric ric) { + private Mono checkRicPolicyTypes(Ric ric) { return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // .onErrorResume(t -> { return Mono.empty(); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java index 73c94e25..fb41e260 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java @@ -32,6 +32,7 @@ import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Rics; import org.oransc.policyagent.repository.Service; import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; @@ -59,9 +60,11 @@ public class RicRecoveryTask { this.services = services; } - public void run(Collection rics) { - for (Ric ric : rics) { - run(ric); + public void run(Rics rics) { + synchronized (rics) { + for (Ric ric : rics.getRics()) { + run(ric); + } } } @@ -85,26 +88,28 @@ public class RicRecoveryTask { private void onComplete(Ric ric) { logger.debug("Recovery completed for:" + ric.name()); - ric.setState(Ric.RicState.ACTIVE); + ric.setState(Ric.RicState.IDLE); notifyAllServices("Recovery completed for:" + ric.name()); } private void notifyAllServices(String body) { - for (Service service : services.getAll()) { - String url = service.getCallbackUrl(); - if (service.getCallbackUrl().length() > 0) { - createClient(url) // - .put("", body) // - .subscribe(rsp -> logger.debug("Service called"), - throwable -> logger.warn("Service called failed", throwable), - () -> logger.debug("Service called complete")); + synchronized (services) { + for (Service service : services.getAll()) { + String url = service.getCallbackUrl(); + if (service.getCallbackUrl().length() > 0) { + createClient(url) // + .put("", body) // + .subscribe(rsp -> logger.debug("Service called"), + throwable -> logger.warn("Service called failed", throwable), + () -> logger.debug("Service called complete")); + } } } } private void onError(Ric ric, Throwable t) { logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage()); - ric.setState(Ric.RicState.NOT_REACHABLE); + ric.setState(Ric.RicState.UNDEFINED); } private AsyncRestClient createClient(final String url) { @@ -115,7 +120,7 @@ public class RicRecoveryTask { ric.clearSupportedPolicyTypes(); return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // .flatMapMany(types -> Flux.fromIterable(types)) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) // .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) // .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); // } @@ -139,9 +144,11 @@ public class RicRecoveryTask { } private Flux deletePolicies(Ric ric) { - Collection ricPolicies = new Vector<>(policies.getForRic(ric.name())); - for (Policy policy : ricPolicies) { - this.policies.remove(policy); + synchronized (policies) { + Collection ricPolicies = new Vector<>(policies.getForRic(ric.name())); + for (Policy policy : ricPolicies) { + this.policies.remove(policy); + } } return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java index 335aa945..1e7f2dc7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java @@ -69,16 +69,20 @@ public class ServiceSupervision { } private Flux createTask() { - return Flux.fromIterable(services.getAll()) // - .filter(service -> service.isExpired()) // - .doOnNext(service -> logger.info("Service is expired:" + service.getName())) // - .flatMap(service -> getAllPolicies(service)) // - .doOnNext(policy -> this.policies.remove(policy)) // - .flatMap(policy -> deletePolicyInRic(policy)); + synchronized (services) { + return Flux.fromIterable(services.getAll()) // + .filter(service -> service.isExpired()) // + .doOnNext(service -> logger.info("Service is expired:" + service.getName())) // + .flatMap(service -> getAllPolicies(service)) // + .doOnNext(policy -> this.policies.remove(policy)) // + .flatMap(policy -> deletePolicyInRic(policy)); + } } private Flux getAllPolicies(Service service) { - return Flux.fromIterable(policies.getForService(service.getName())); + synchronized (policies) { + return Flux.fromIterable(policies.getForService(service.getName())); + } } private Mono deletePolicyInRic(Policy policy) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java index 251f3437..90358b28 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java @@ -31,13 +31,16 @@ import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; /** * Loads information about RealTime-RICs at startup. */ @Service("startupService") -public class StartupService { +@Order(Ordered.HIGHEST_PRECEDENCE) +public class StartupService implements ApplicationConfig.Observer { private static final Logger logger = LoggerFactory.getLogger(StartupService.class); @@ -70,17 +73,30 @@ public class StartupService { this.services = services; } + @Override + public void onRicConfigUpdate(RicConfig ricConfig, ApplicationConfig.RicConfigUpdate event) { + synchronized (this.rics) { + if (event.equals(ApplicationConfig.RicConfigUpdate.ADDED) + || event.equals(ApplicationConfig.RicConfigUpdate.CHANGED)) { + Ric ric = new Ric(ricConfig); + rics.put(ric); + RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies, services); + recoveryTask.run(ric); + } else if (event.equals(ApplicationConfig.RicConfigUpdate.REMOVED)) { + rics.remove(ricConfig.name()); + } else { + logger.debug("Unhandled event :" + event); + } + } + } + /** * Reads the configured Rics and performs the service discovery. The result is put into the repository. */ public void startup() { logger.debug("Starting up"); + applicationConfig.addObserver(this); applicationConfig.initialize(); - for (RicConfig ricConfig : applicationConfig.getRicConfigs()) { - rics.put(new Ric(ricConfig)); - } - RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies, services); - recoveryTask.run(rics.getRics()); // recover all Rics } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index e9dcaefc..d78155d4 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -85,9 +85,9 @@ public class ApplicationTest { public static class MockApplicationConfig extends ApplicationConfig { @Override - public void initialize() { + protected String getLocalConfigurationFilePath() { URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json"); - loadConfigurationFromFile(url.getFile()); + return url.getFile(); } } @@ -168,7 +168,7 @@ public class ApplicationTest { String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1"; String json = "{}"; addPolicyType("type1", "ric1"); - this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE); + this.rics.getRic("ric1").setState(Ric.RicState.IDLE); this.restTemplate.put(url, json); @@ -249,7 +249,7 @@ public class ApplicationTest { reset(); String url = baseUrl() + "/policy?instance=id"; Policy policy = addPolicy("id", "typeName", "service1", "ric1"); - policy.ric().setState(Ric.RicState.ACTIVE); + policy.ric().setState(Ric.RicState.IDLE); assertThat(policies.size()).isEqualTo(1); this.restTemplate.delete(url); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index 420c8f7e..ecb4661e 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -52,10 +52,11 @@ public class MockPolicyAgent { static class MockApplicationConfig extends ApplicationConfig { @Override - public void initialize() { + protected String getLocalConfigurationFilePath() { URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json"); - loadConfigurationFromFile(url.getFile()); + return url.getFile(); } + } /** diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java index 9cfb6160..6a4f8b36 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java @@ -96,13 +96,14 @@ public class ApplicationConfigTest { appConfigUnderTest.systemEnvironment = new Properties(); // When doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any()); + doReturn("fileName").when(appConfigUnderTest).getLocalConfigurationFilePath(); appConfigUnderTest.initialize(); // Then - verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(any()); + verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); - Vector ricConfigs = appConfigUnderTest.getRicConfigs(); - RicConfig ricConfig = ricConfigs.firstElement(); + Iterable ricConfigs = appConfigUnderTest.getRicConfigs(); + RicConfig ricConfig = ricConfigs.iterator().next(); assertThat(ricConfigs).isNotNull(); assertThat(ricConfig).isEqualTo(CORRECT_RIC_CONIFG); } @@ -115,11 +116,12 @@ public class ApplicationConfigTest { // When doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any()); - appConfigUnderTest.loadConfigurationFromFile(any()); + doReturn("fileName").when(appConfigUnderTest).getLocalConfigurationFilePath(); + appConfigUnderTest.loadConfigurationFromFile(); // Then - verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(any()); - Assertions.assertNull(appConfigUnderTest.getRicConfigs()); + verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(); + Assertions.assertEquals(0, appConfigUnderTest.getRicConfigs().size()); } @Test diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java index 418680e3..b1f397a4 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java @@ -63,13 +63,13 @@ public class RepositorySupervisionTest { .baseUrl("baseUrl1") // .managedElementIds(new Vector(Arrays.asList("kista_1", "kista_2"))) // .build()); - ric1.setState(Ric.RicState.ACTIVE); + ric1.setState(Ric.RicState.IDLE); Ric ric2 = new Ric(ImmutableRicConfig.builder() // .name("ric2") // .baseUrl("baseUrl2") // .managedElementIds(new Vector(Arrays.asList("kista_3", "kista_4"))) // .build()); - ric2.setState(Ric.RicState.NOT_REACHABLE); + ric2.setState(Ric.RicState.UNDEFINED); Ric ric3 = new Ric(ImmutableRicConfig.builder() // .name("ric3") // .baseUrl("baseUrl3") // @@ -108,9 +108,9 @@ public class RepositorySupervisionTest { supervisorUnderTest.checkAllRics(); - await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state())); - await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state())); - await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state())); + await().untilAsserted(() -> RicState.IDLE.equals(ric1.state())); + await().untilAsserted(() -> RicState.IDLE.equals(ric2.state())); + await().untilAsserted(() -> RicState.IDLE.equals(ric3.state())); verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2"); verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2"); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java index 2ee175da..729fc7b3 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java @@ -29,8 +29,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE; -import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE; +import static org.oransc.policyagent.repository.Ric.RicState.IDLE; import java.util.Arrays; import java.util.Collection; @@ -49,6 +48,7 @@ import org.oransc.policyagent.configuration.RicConfig; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; +import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; import org.oransc.policyagent.repository.Services; import reactor.core.publisher.Mono; @@ -77,11 +77,6 @@ public class StartupServiceTest { @Test public void startup_allOk() { - Vector ricConfigs = new Vector<>(2); - ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A)); - ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); - when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); - Mono> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); Mono> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME)); when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2); @@ -97,6 +92,12 @@ public class StartupServiceTest { serviceUnderTest.startup(); + serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A), + ApplicationConfig.RicConfigUpdate.ADDED); + serviceUnderTest.onRicConfigUpdate( + getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C), + ApplicationConfig.RicConfigUpdate.ADDED); + await().untilAsserted(() -> assertThat(policyTypes.size()).isEqualTo(2)); verify(a1ClientMock).getPolicyTypeIdentities(FIRST_RIC_URL); @@ -114,7 +115,7 @@ public class StartupServiceTest { Ric firstRic = rics.get(FIRST_RIC_NAME); assertNotNull(firstRic, "Ric " + FIRST_RIC_NAME + " not added to repository"); assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics"); - assertEquals(ACTIVE, firstRic.state(), "Not correct state for ric " + FIRST_RIC_NAME); + assertEquals(IDLE, firstRic.state(), "Not correct state for ric " + FIRST_RIC_NAME); assertEquals(1, firstRic.getSupportedPolicyTypes().size(), "Not correct no of types supported for ric " + FIRST_RIC_NAME); assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME), @@ -126,7 +127,7 @@ public class StartupServiceTest { Ric secondRic = rics.get(SECOND_RIC_NAME); assertNotNull(secondRic, "Ric " + SECOND_RIC_NAME + " not added to repository"); assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics"); - assertEquals(ACTIVE, secondRic.state(), "Not correct state for " + SECOND_RIC_NAME); + assertEquals(IDLE, secondRic.state(), "Not correct state for " + SECOND_RIC_NAME); assertEquals(2, secondRic.getSupportedPolicyTypes().size(), "Not correct no of types supported for ric " + SECOND_RIC_NAME); assertTrue(secondRic.isSupportingType(POLICY_TYPE_1_NAME), @@ -141,11 +142,6 @@ public class StartupServiceTest { @Test public void startup_unableToConnectToGetTypes() { - Vector ricConfigs = new Vector<>(2); - ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A)); - ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); - when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); - Mono> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); Mono error = Mono.error(new Exception("Unable to contact ric.")); doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString()); @@ -161,21 +157,22 @@ public class StartupServiceTest { new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services()); serviceUnderTest.startup(); + serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A), + ApplicationConfig.RicConfigUpdate.ADDED); + serviceUnderTest.onRicConfigUpdate( + getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C), + ApplicationConfig.RicConfigUpdate.ADDED); verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1); verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2); - assertEquals(NOT_REACHABLE, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME); + assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME); - assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME); + assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME); } @Test public void startup_unableToConnectToGetPolicies() { - Vector ricConfigs = new Vector<>(2); - ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A)); - ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C)); - when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs); Mono> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME)); Mono> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME)); @@ -192,13 +189,18 @@ public class StartupServiceTest { new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services()); serviceUnderTest.startup(); + serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A), + ApplicationConfig.RicConfigUpdate.ADDED); + serviceUnderTest.onRicConfigUpdate( + getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C), + ApplicationConfig.RicConfigUpdate.ADDED); verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1); verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2); - assertEquals(NOT_REACHABLE, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME); + assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME); - assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME); + assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME); } @SafeVarargs diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java index 1a93b4d1..aca29f58 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java @@ -42,23 +42,27 @@ public class MockA1Client implements A1Client { @Override public Mono> getPolicyTypeIdentities(String nearRtRicUrl) { - Vector result = new Vector<>(); - for (PolicyType p : this.policyTypes.getAll()) { - result.add(p.name()); + synchronized (this.policyTypes) { + Vector result = new Vector<>(); + for (PolicyType p : this.policyTypes.getAll()) { + result.add(p.name()); + } + return Mono.just(result); } - return Mono.just(result); } @Override public Mono> getPolicyIdentities(String nearRtRicUrl) { - Vector result = new Vector<>(); - for (Policy policy : getPolicies(nearRtRicUrl).getAll()) { - if (policy.ric().getConfig().baseUrl().equals(nearRtRicUrl)) { - result.add(policy.id()); + synchronized (this.policies) { + Vector result = new Vector<>(); + for (Policy policy : getPolicies(nearRtRicUrl).getAll()) { + if (policy.ric().getConfig().baseUrl().equals(nearRtRicUrl)) { + result.add(policy.id()); + } } - } - return Mono.just(result); + return Mono.just(result); + } } @Override -- 2.16.6