Changed in config will add and recover Rics 43/2243/3
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 16 Jan 2020 06:27:58 +0000 (07:27 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 16 Jan 2020 16:07:02 +0000 (17:07 +0100)
When the application config is changed, Ric:s will be added/removed in the repository and
recovered.

Fixed a lot of thread synchronization issues.

Issue-ID: NONRTRIC-84
Change-Id: I42ff66bc59c585d2d9f65e9e00a0ac88998a44bf
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
20 files changed:
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigLoader.java [deleted file]
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java

index 00e5223..7e52284 100644 (file)
@@ -33,6 +33,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.ServiceLoader;
@@ -68,25 +71,30 @@ public class ApplicationConfig {
     Properties systemEnvironment;
 
     private Disposable refreshConfigTask = null;
+    private Collection<Observer> observers = new Vector<>();
+
+    private Map<String, RicConfig> ricConfigs = new HashMap<>();
 
     @NotEmpty
     private String filepath;
 
-    private Vector<RicConfig> ricConfigs;
-
     @Autowired
     public ApplicationConfig() {
     }
 
+    protected String getLocalConfigurationFilePath() {
+        return this.filepath;
+    }
+
     public synchronized void setFilepath(String filepath) {
         this.filepath = filepath;
     }
 
-    public Vector<RicConfig> getRicConfigs() {
-        return this.ricConfigs;
+    public synchronized Collection<RicConfig> getRicConfigs() {
+        return this.ricConfigs.values();
     }
 
-    public Optional<RicConfig> lookupRicConfigForManagedElement(String managedElementId) {
+    public synchronized Optional<RicConfig> lookupRicConfigForManagedElement(String managedElementId) {
         for (RicConfig ricConfig : getRicConfigs()) {
             if (ricConfig.managedElementIds().contains(managedElementId)) {
                 return Optional.of(ricConfig);
@@ -106,7 +114,7 @@ public class ApplicationConfig {
 
     public void initialize() {
         stop();
-        loadConfigurationFromFile(this.filepath);
+        loadConfigurationFromFile();
 
         refreshConfigTask = createRefreshTask() //
             .subscribe(notUsed -> logger.info("Refreshed configuration data"),
@@ -114,6 +122,18 @@ public class ApplicationConfig {
                 () -> logger.error("Configuration refresh terminated"));
     }
 
+    public static enum RicConfigUpdate {
+        ADDED, CHANGED, REMOVED
+    }
+
+    public interface Observer {
+        void onRicConfigUpdate(RicConfig ric, RicConfigUpdate event);
+    }
+
+    public void addObserver(Observer o) {
+        this.observers.add(o);
+    }
+
     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
         return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment);
     }
@@ -154,8 +174,48 @@ public class ApplicationConfig {
         return this;
     }
 
-    private synchronized void setConfiguration(@NotNull Vector<RicConfig> ricConfigs) {
-        this.ricConfigs = ricConfigs;
+    private class Notification {
+        final RicConfig ric;
+        final RicConfigUpdate event;
+
+        Notification(RicConfig ric, RicConfigUpdate event) {
+            this.ric = ric;
+            this.event = event;
+        }
+    }
+
+    private void setConfiguration(@NotNull Collection<RicConfig> ricConfigs) {
+        Collection<Notification> notifications = new Vector<>();
+        synchronized (this) {
+            Map<String, RicConfig> newRicConfigs = new HashMap<>();
+            for (RicConfig newConfig : ricConfigs) {
+                RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
+                if (oldConfig == null) {
+                    newRicConfigs.put(newConfig.name(), newConfig);
+                    notifications.add(new Notification(newConfig, RicConfigUpdate.ADDED));
+                    this.ricConfigs.remove(newConfig.name());
+                } else if (!newConfig.equals(newConfig)) {
+                    notifications.add(new Notification(newConfig, RicConfigUpdate.CHANGED));
+                    newRicConfigs.put(newConfig.name(), newConfig);
+                    this.ricConfigs.remove(newConfig.name());
+                } else {
+                    newRicConfigs.put(oldConfig.name(), oldConfig);
+                }
+            }
+            for (RicConfig deletedConfig : this.ricConfigs.values()) {
+                notifications.add(new Notification(deletedConfig, RicConfigUpdate.REMOVED));
+            }
+            this.ricConfigs = newRicConfigs;
+        }
+        notifyObservers(notifications);
+    }
+
+    private void notifyObservers(Collection<Notification> notifications) {
+        for (Observer observer : this.observers) {
+            for (Notification notif : notifications) {
+                observer.onRicConfigUpdate(notif.ric, notif.event);
+            }
+        }
     }
 
     public void stop() {
@@ -168,7 +228,12 @@ public class ApplicationConfig {
     /**
      * Reads the configuration from file.
      */
-    protected void loadConfigurationFromFile(String filepath) {
+    public void loadConfigurationFromFile() {
+        String filepath = getLocalConfigurationFilePath();
+        if (filepath == null) {
+            logger.debug("No localconfiguration file used");
+            return;
+        }
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
 
@@ -180,7 +245,7 @@ public class ApplicationConfig {
             }
             ApplicationConfigParser appParser = new ApplicationConfigParser();
             appParser.parse(rootObject);
-            this.ricConfigs = appParser.getRicConfigs();
+            setConfiguration(appParser.getRicConfigs());
             logger.info("Local configuration file loaded: {}", filepath);
         } catch (JsonSyntaxException | ServiceException | IOException e) {
             logger.trace("Local configuration file not loaded: {}", filepath, e);
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigLoader.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigLoader.java
deleted file mode 100644 (file)
index e1eb17a..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.configuration;
-
-import io.swagger.annotations.ApiOperation;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.TaskScheduler;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
-
-@Component
-@EnableScheduling
-class ApplicationConfigLoader {
-
-    private static final Logger logger = LoggerFactory.getLogger(ApplicationConfigLoader.class);
-    private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
-    private static final Duration CONFIGURATION_REFRESH_INTERVAL = Duration.ofSeconds(15);
-
-    private final TaskScheduler taskScheduler;
-    private final ApplicationConfig configuration;
-
-    @Autowired
-    public ApplicationConfigLoader(TaskScheduler taskScheduler, ApplicationConfig configuration) {
-        this.taskScheduler = taskScheduler;
-        this.configuration = configuration;
-    }
-
-    /**
-     * Function which have to stop tasks execution.
-     *
-     * @return response entity about status of cancellation operation
-     */
-    @ApiOperation(value = "Get response on stopping task execution")
-    public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
-        scheduledFutureList.forEach(x -> x.cancel(false));
-        scheduledFutureList.clear();
-        logger.info("Stopped");
-        return Mono.just(new ResponseEntity<>("Service has already been stopped!", HttpStatus.CREATED));
-    }
-
-    @PostConstruct
-    @ApiOperation(value = "Start task if possible")
-    public synchronized boolean start() {
-        logger.info("Start scheduling Datafile workflow");
-        configuration.initialize();
-
-        if (scheduledFutureList.isEmpty()) {
-            scheduledFutureList
-                .add(taskScheduler.scheduleWithFixedDelay(this::refreshConfiguration, CONFIGURATION_REFRESH_INTERVAL));
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    private void refreshConfiguration() {
-        // TBD
-
-    }
-
-}
index e33fb7e..246fdd4 100644 (file)
@@ -77,15 +77,17 @@ public class PolicyController {
     @ApiOperation(value = "Returns policy type schema definitions")
     @ApiResponses(value = {@ApiResponse(code = 200, message = "Policy Types found")})
     public ResponseEntity<String> getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) {
-        if (ricName == null) {
-            Collection<PolicyType> types = this.policyTypes.getAll();
-            return new ResponseEntity<String>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
-        } else {
-            try {
-                Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+        synchronized (this.policyTypes) {
+            if (ricName == null) {
+                Collection<PolicyType> types = this.policyTypes.getAll();
                 return new ResponseEntity<String>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
-            } catch (ServiceException e) {
-                return new ResponseEntity<String>(e.toString(), HttpStatus.NOT_FOUND);
+            } else {
+                try {
+                    Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+                    return new ResponseEntity<String>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
+                } catch (ServiceException e) {
+                    return new ResponseEntity<String>(e.toString(), HttpStatus.NOT_FOUND);
+                }
             }
         }
     }
@@ -106,15 +108,17 @@ public class PolicyController {
     @ApiOperation(value = "Returns policy types")
     @ApiResponses(value = {@ApiResponse(code = 200, message = "Policy Types found")})
     public ResponseEntity<String> getPolicyTypes(@RequestParam(name = "ric", required = false) String ricName) {
-        if (ricName == null) {
-            Collection<PolicyType> types = this.policyTypes.getAll();
-            return new ResponseEntity<String>(toPolicyTypeIdsJson(types), HttpStatus.OK);
-        } else {
-            try {
-                Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+        synchronized (this.policyTypes) {
+            if (ricName == null) {
+                Collection<PolicyType> types = this.policyTypes.getAll();
                 return new ResponseEntity<String>(toPolicyTypeIdsJson(types), HttpStatus.OK);
-            } catch (ServiceException e) {
-                return new ResponseEntity<String>(e.toString(), HttpStatus.NOT_FOUND);
+            } else {
+                try {
+                    Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+                    return new ResponseEntity<String>(toPolicyTypeIdsJson(types), HttpStatus.OK);
+                } catch (ServiceException e) {
+                    return new ResponseEntity<String>(e.toString(), HttpStatus.NOT_FOUND);
+                }
             }
         }
     }
@@ -140,7 +144,7 @@ public class PolicyController {
     public Mono<ResponseEntity<Void>> deletePolicy( //
         @RequestParam(name = "instance", required = true) String id) {
         Policy policy = policies.get(id);
-        if (policy != null && policy.ric().state().equals(Ric.RicState.ACTIVE)) {
+        if (policy != null && policy.ric().state().equals(Ric.RicState.IDLE)) {
             return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) //
                 .doOnEach(notUsed -> policies.removeId(id)) //
                 .flatMap(notUsed -> {
@@ -163,7 +167,7 @@ public class PolicyController {
 
         Ric ric = rics.get(ricName);
         PolicyType type = policyTypes.get(typeName);
-        if (ric != null && type != null && ric.state().equals(Ric.RicState.ACTIVE)) {
+        if (ric != null && type != null && ric.state().equals(Ric.RicState.IDLE)) {
             Policy policy = ImmutablePolicy.builder() //
                 .id(instanceId) //
                 .json(jsonBody) //
@@ -189,22 +193,24 @@ public class PolicyController {
         @RequestParam(name = "ric", required = false) String ric, //
         @RequestParam(name = "service", required = false) String service) //
     {
-        Collection<Policy> result = null;
+        synchronized (policies) {
+            Collection<Policy> result = null;
 
-        if (type != null) {
-            result = policies.getForType(type);
-            result = filter(result, null, ric, service);
-        } else if (service != null) {
-            result = policies.getForService(service);
-            result = filter(result, type, ric, null);
-        } else if (ric != null) {
-            result = policies.getForRic(ric);
-            result = filter(result, type, null, service);
-        } else {
-            result = policies.getAll();
-        }
+            if (type != null) {
+                result = policies.getForType(type);
+                result = filter(result, null, ric, service);
+            } else if (service != null) {
+                result = policies.getForService(service);
+                result = filter(result, type, ric, null);
+            } else if (ric != null) {
+                result = policies.getForRic(ric);
+                result = filter(result, type, null, service);
+            } else {
+                result = policies.getAll();
+            }
 
-        return new ResponseEntity<String>(policiesToJson(result), HttpStatus.OK);
+            return new ResponseEntity<String>(policiesToJson(result), HttpStatus.OK);
+        }
     }
 
     private boolean include(String filter, String value) {
index 797ec71..6b413b2 100644 (file)
@@ -94,14 +94,17 @@ public class RicRepositoryController {
         })
     public ResponseEntity<String> getRics(
         @RequestParam(name = "policyType", required = false) String supportingPolicyType) {
+
         Vector<RicInfo> result = new Vector<>();
-        for (Ric ric : rics.getRics()) {
-            if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
-                result.add(ImmutableRicInfo.builder() //
-                    .name(ric.name()) //
-                    .managedElementIds(ric.getManagedElementIds()) //
-                    .policyTypes(ric.getSupportedPolicyTypeNames()) //
-                    .build());
+        synchronized (rics) {
+            for (Ric ric : rics.getRics()) {
+                if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
+                    result.add(ImmutableRicInfo.builder() //
+                        .name(ric.name()) //
+                        .managedElementIds(ric.getManagedElementIds()) //
+                        .policyTypes(ric.getSupportedPolicyTypeNames()) //
+                        .build());
+                }
             }
         }
 
index ad06231..bda5e09 100644 (file)
@@ -91,10 +91,11 @@ public class ServiceController {
 
     @GetMapping("/services")
     public ResponseEntity<?> getServices() {
-        Collection<Service> allServices = this.services.getAll();
-        Collection<ServiceStatus> result = new Vector<>(allServices.size());
-        for (Service s : allServices) {
-            result.add(toServiceStatus(s));
+        Collection<ServiceStatus> result = new Vector<>();
+        synchronized (this.services) {
+            for (Service s : this.services.getAll()) {
+                result.add(toServiceStatus(s));
+            }
         }
         return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK);
     }
index 58c91b3..a279db5 100644 (file)
@@ -21,6 +21,7 @@
 package org.oransc.policyagent.repository;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -68,7 +69,7 @@ public class Policies {
         if (map == null) {
             return new Vector<Policy>();
         }
-        return map.values();
+        return Collections.unmodifiableCollection(map.values());
     }
 
     public synchronized boolean containsPolicy(String id) {
@@ -88,7 +89,7 @@ public class Policies {
     }
 
     public synchronized Collection<Policy> getAll() {
-        return policiesId.values();
+        return Collections.unmodifiableCollection(policiesId.values());
     }
 
     public synchronized Collection<Policy> getForService(String service) {
index 9dee6f9..7723983 100644 (file)
@@ -21,6 +21,7 @@
 package org.oransc.policyagent.repository;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -48,19 +49,19 @@ public class PolicyTypes {
         types.put(type.name(), type);
     }
 
-    public boolean contains(String policyType) {
+    public synchronized boolean contains(String policyType) {
         return types.containsKey(policyType);
     }
 
     public synchronized Collection<PolicyType> getAll() {
-        return types.values();
+        return Collections.unmodifiableCollection(types.values());
     }
 
-    public int size() {
+    public synchronized int size() {
         return types.size();
     }
 
-    public void clear() {
+    public synchronized void clear() {
         this.types.clear();
     }
 }
index 82d84f1..235ee1a 100644 (file)
@@ -32,7 +32,7 @@ import org.oransc.policyagent.configuration.RicConfig;
  */
 public class Ric {
     private final RicConfig ricConfig;
-    private RicState state = RicState.NOT_INITIATED;
+    private RicState state = RicState.UNDEFINED;
     private Map<String, PolicyType> supportedPolicyTypes = new HashMap<>();
 
     /**
@@ -150,16 +150,16 @@ public class Ric {
      */
     public static enum RicState {
         /**
-         * The Ric has not been initiated yet.
+         * The agent view of the agent may be inconsistent
          */
-        NOT_INITIATED,
+        UNDEFINED,
         /**
-         * The Ric is working fine.
+         * The normal state. Policies can be configured.
          */
-        ACTIVE,
+        IDLE,
         /**
-         * The Ric cannot be contacted.
+         * The Ric states are recovered
          */
-        NOT_REACHABLE, RECOVERING
+        RECOVERING
     }
 }
index 6b8138f..bdf9930 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.oransc.policyagent.repository;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -36,11 +35,11 @@ public class Rics {
         rics.put(ric.name(), ric);
     }
 
-    public Collection<Ric> getRics() {
+    public synchronized Iterable<Ric> getRics() {
         return rics.values();
     }
 
-    public Ric getRic(String name) throws ServiceException {
+    public synchronized Ric getRic(String name) throws ServiceException {
         Ric ric = rics.get(name);
         if (ric == null) {
             throw new ServiceException("Could not find ric: " + name);
@@ -48,15 +47,19 @@ public class Rics {
         return ric;
     }
 
-    public Ric get(String name) {
+    public synchronized Ric get(String name) {
         return rics.get(name);
     }
 
-    public int size() {
+    public synchronized void remove(String name) {
+        rics.remove(name);
+    }
+
+    public synchronized int size() {
         return rics.size();
     }
 
-    public void clear() {
+    public synchronized void clear() {
         this.rics.clear();
     }
 }
index 35cae71..2e1d8da 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.oransc.policyagent.repository;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -50,13 +49,10 @@ public class Services {
 
     public synchronized void put(Service service) {
         logger.debug("Put service: " + service.getName());
-        // TODO a threading problem is that this may happend at the same time as someone is iterating (getAll())
-        // This is a generic problem
         services.put(service.getName(), service);
     }
 
-    // TODO the returned value should be unmodifiable if possible
-    public synchronized Collection<Service> getAll() {
+    public synchronized Iterable<Service> getAll() {
         return services.values();
     }
 
index bfd40e5..1a90382 100644 (file)
@@ -26,6 +26,7 @@ import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
 import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
@@ -69,25 +70,38 @@ public class RepositorySupervision {
     public void checkAllRics() {
         logger.debug("Checking Rics starting");
         createTask().subscribe(this::onRicChecked, this::onError, this::onComplete);
-
     }
 
     private Flux<Ric> createTask() {
-        return Flux.fromIterable(rics.getRics()) //
-            .flatMap(ric -> checkInstances(ric)) //
-            .flatMap(ric -> checkTypes(ric));
+        synchronized (this.rics) {
+            return Flux.fromIterable(rics.getRics()) //
+                .flatMap(ric -> checkRicState(ric)) //
+                .flatMap(ric -> checkRicPolicies(ric)) //
+                .flatMap(ric -> checkRicPolicyTypes(ric));
+        }
     }
 
-    private Mono<Ric> checkInstances(Ric ric) {
+    private Mono<Ric> checkRicState(Ric ric) {
+        if (ric.state() == RicState.UNDEFINED) {
+            return startRecovery(ric);
+        } else if (ric.state() == RicState.RECOVERING) {
+            return Mono.empty();
+        } else {
+            return Mono.just(ric);
+        }
+    }
 
+    private Mono<Ric> checkRicPolicies(Ric ric) {
         return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
             .onErrorResume(t -> Mono.empty()) //
             .flatMap(ricP -> validateInstances(ricP, ric));
     }
 
     private Mono<Ric> validateInstances(Collection<String> ricPolicies, Ric ric) {
-        if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
-            return startRecovery(ric);
+        synchronized (this.policies) {
+            if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
+                return startRecovery(ric);
+            }
         }
         for (String policyId : ricPolicies) {
             if (!policies.containsPolicy(policyId)) {
@@ -97,7 +111,7 @@ public class RepositorySupervision {
         return Mono.just(ric);
     }
 
-    private Mono<Ric> checkTypes(Ric ric) {
+    private Mono<Ric> checkRicPolicyTypes(Ric ric) {
         return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
             .onErrorResume(t -> {
                 return Mono.empty();
index 73c94e2..fb41e26 100644 (file)
@@ -32,6 +32,7 @@ import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
 import org.oransc.policyagent.repository.Service;
 import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
@@ -59,9 +60,11 @@ public class RicRecoveryTask {
         this.services = services;
     }
 
-    public void run(Collection<Ric> rics) {
-        for (Ric ric : rics) {
-            run(ric);
+    public void run(Rics rics) {
+        synchronized (rics) {
+            for (Ric ric : rics.getRics()) {
+                run(ric);
+            }
         }
     }
 
@@ -85,26 +88,28 @@ public class RicRecoveryTask {
 
     private void onComplete(Ric ric) {
         logger.debug("Recovery completed for:" + ric.name());
-        ric.setState(Ric.RicState.ACTIVE);
+        ric.setState(Ric.RicState.IDLE);
         notifyAllServices("Recovery completed for:" + ric.name());
     }
 
     private void notifyAllServices(String body) {
-        for (Service service : services.getAll()) {
-            String url = service.getCallbackUrl();
-            if (service.getCallbackUrl().length() > 0) {
-                createClient(url) //
-                    .put("", body) //
-                    .subscribe(rsp -> logger.debug("Service called"),
-                        throwable -> logger.warn("Service called failed", throwable),
-                        () -> logger.debug("Service called complete"));
+        synchronized (services) {
+            for (Service service : services.getAll()) {
+                String url = service.getCallbackUrl();
+                if (service.getCallbackUrl().length() > 0) {
+                    createClient(url) //
+                        .put("", body) //
+                        .subscribe(rsp -> logger.debug("Service called"),
+                            throwable -> logger.warn("Service called failed", throwable),
+                            () -> logger.debug("Service called complete"));
+                }
             }
         }
     }
 
     private void onError(Ric ric, Throwable t) {
         logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
-        ric.setState(Ric.RicState.NOT_REACHABLE);
+        ric.setState(Ric.RicState.UNDEFINED);
     }
 
     private AsyncRestClient createClient(final String url) {
@@ -115,7 +120,7 @@ public class RicRecoveryTask {
         ric.clearSupportedPolicyTypes();
         return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
             .flatMapMany(types -> Flux.fromIterable(types)) //
-            .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
+            .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
             .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
             .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
     }
@@ -139,9 +144,11 @@ public class RicRecoveryTask {
     }
 
     private Flux<String> deletePolicies(Ric ric) {
-        Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
-        for (Policy policy : ricPolicies) {
-            this.policies.remove(policy);
+        synchronized (policies) {
+            Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
+            for (Policy policy : ricPolicies) {
+                this.policies.remove(policy);
+            }
         }
 
         return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
index 335aa94..1e7f2dc 100644 (file)
@@ -69,16 +69,20 @@ public class ServiceSupervision {
     }
 
     private Flux<Policy> createTask() {
-        return Flux.fromIterable(services.getAll()) //
-            .filter(service -> service.isExpired()) //
-            .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
-            .flatMap(service -> getAllPolicies(service)) //
-            .doOnNext(policy -> this.policies.remove(policy)) //
-            .flatMap(policy -> deletePolicyInRic(policy));
+        synchronized (services) {
+            return Flux.fromIterable(services.getAll()) //
+                .filter(service -> service.isExpired()) //
+                .doOnNext(service -> logger.info("Service is expired:" + service.getName())) //
+                .flatMap(service -> getAllPolicies(service)) //
+                .doOnNext(policy -> this.policies.remove(policy)) //
+                .flatMap(policy -> deletePolicyInRic(policy));
+        }
     }
 
     private Flux<Policy> getAllPolicies(Service service) {
-        return Flux.fromIterable(policies.getForService(service.getName()));
+        synchronized (policies) {
+            return Flux.fromIterable(policies.getForService(service.getName()));
+        }
     }
 
     private Mono<Policy> deletePolicyInRic(Policy policy) {
index 251f343..90358b2 100644 (file)
@@ -31,13 +31,16 @@ import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.Ordered;
+import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Service;
 
 /**
  * Loads information about RealTime-RICs at startup.
  */
 @Service("startupService")
-public class StartupService {
+@Order(Ordered.HIGHEST_PRECEDENCE)
+public class StartupService implements ApplicationConfig.Observer {
 
     private static final Logger logger = LoggerFactory.getLogger(StartupService.class);
 
@@ -70,17 +73,30 @@ public class StartupService {
         this.services = services;
     }
 
+    @Override
+    public void onRicConfigUpdate(RicConfig ricConfig, ApplicationConfig.RicConfigUpdate event) {
+        synchronized (this.rics) {
+            if (event.equals(ApplicationConfig.RicConfigUpdate.ADDED)
+                || event.equals(ApplicationConfig.RicConfigUpdate.CHANGED)) {
+                Ric ric = new Ric(ricConfig);
+                rics.put(ric);
+                RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies, services);
+                recoveryTask.run(ric);
+            } else if (event.equals(ApplicationConfig.RicConfigUpdate.REMOVED)) {
+                rics.remove(ricConfig.name());
+            } else {
+                logger.debug("Unhandled event :" + event);
+            }
+        }
+    }
+
     /**
      * Reads the configured Rics and performs the service discovery. The result is put into the repository.
      */
     public void startup() {
         logger.debug("Starting up");
+        applicationConfig.addObserver(this);
         applicationConfig.initialize();
-        for (RicConfig ricConfig : applicationConfig.getRicConfigs()) {
-            rics.put(new Ric(ricConfig));
-        }
-        RicRecoveryTask recoveryTask = new RicRecoveryTask(a1Client, policyTypes, policies, services);
-        recoveryTask.run(rics.getRics()); // recover all Rics
     }
 
 }
index e9dcaef..d78155d 100644 (file)
@@ -85,9 +85,9 @@ public class ApplicationTest {
 
     public static class MockApplicationConfig extends ApplicationConfig {
         @Override
-        public void initialize() {
+        protected String getLocalConfigurationFilePath() {
             URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json");
-            loadConfigurationFromFile(url.getFile());
+            return url.getFile();
         }
     }
 
@@ -168,7 +168,7 @@ public class ApplicationTest {
         String url = baseUrl() + "/policy?type=type1&instance=instance1&ric=ric1&service=service1";
         String json = "{}";
         addPolicyType("type1", "ric1");
-        this.rics.getRic("ric1").setState(Ric.RicState.ACTIVE);
+        this.rics.getRic("ric1").setState(Ric.RicState.IDLE);
 
         this.restTemplate.put(url, json);
 
@@ -249,7 +249,7 @@ public class ApplicationTest {
         reset();
         String url = baseUrl() + "/policy?instance=id";
         Policy policy = addPolicy("id", "typeName", "service1", "ric1");
-        policy.ric().setState(Ric.RicState.ACTIVE);
+        policy.ric().setState(Ric.RicState.IDLE);
         assertThat(policies.size()).isEqualTo(1);
 
         this.restTemplate.delete(url);
index 420c8f7..ecb4661 100644 (file)
@@ -52,10 +52,11 @@ public class MockPolicyAgent {
     static class MockApplicationConfig extends ApplicationConfig {
 
         @Override
-        public void initialize() {
+        protected String getLocalConfigurationFilePath() {
             URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json");
-            loadConfigurationFromFile(url.getFile());
+            return url.getFile();
         }
+
     }
 
     /**
index 9cfb616..6a4f8b3 100644 (file)
@@ -96,13 +96,14 @@ public class ApplicationConfigTest {
         appConfigUnderTest.systemEnvironment = new Properties();
         // When
         doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
+        doReturn("fileName").when(appConfigUnderTest).getLocalConfigurationFilePath();
         appConfigUnderTest.initialize();
 
         // Then
-        verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(any());
+        verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
 
-        Vector<RicConfig> ricConfigs = appConfigUnderTest.getRicConfigs();
-        RicConfig ricConfig = ricConfigs.firstElement();
+        Iterable<RicConfig> ricConfigs = appConfigUnderTest.getRicConfigs();
+        RicConfig ricConfig = ricConfigs.iterator().next();
         assertThat(ricConfigs).isNotNull();
         assertThat(ricConfig).isEqualTo(CORRECT_RIC_CONIFG);
     }
@@ -115,11 +116,12 @@ public class ApplicationConfigTest {
 
         // When
         doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any());
-        appConfigUnderTest.loadConfigurationFromFile(any());
+        doReturn("fileName").when(appConfigUnderTest).getLocalConfigurationFilePath();
+        appConfigUnderTest.loadConfigurationFromFile();
 
         // Then
-        verify(appConfigUnderTest, times(1)).loadConfigurationFromFile(any());
-        Assertions.assertNull(appConfigUnderTest.getRicConfigs());
+        verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
+        Assertions.assertEquals(0, appConfigUnderTest.getRicConfigs().size());
     }
 
     @Test
index 418680e..b1f397a 100644 (file)
@@ -63,13 +63,13 @@ public class RepositorySupervisionTest {
             .baseUrl("baseUrl1") //
             .managedElementIds(new Vector<String>(Arrays.asList("kista_1", "kista_2"))) //
             .build());
-        ric1.setState(Ric.RicState.ACTIVE);
+        ric1.setState(Ric.RicState.IDLE);
         Ric ric2 = new Ric(ImmutableRicConfig.builder() //
             .name("ric2") //
             .baseUrl("baseUrl2") //
             .managedElementIds(new Vector<String>(Arrays.asList("kista_3", "kista_4"))) //
             .build());
-        ric2.setState(Ric.RicState.NOT_REACHABLE);
+        ric2.setState(Ric.RicState.UNDEFINED);
         Ric ric3 = new Ric(ImmutableRicConfig.builder() //
             .name("ric3") //
             .baseUrl("baseUrl3") //
@@ -108,9 +108,9 @@ public class RepositorySupervisionTest {
 
         supervisorUnderTest.checkAllRics();
 
-        await().untilAsserted(() -> RicState.ACTIVE.equals(ric1.state()));
-        await().untilAsserted(() -> RicState.ACTIVE.equals(ric2.state()));
-        await().untilAsserted(() -> RicState.ACTIVE.equals(ric3.state()));
+        await().untilAsserted(() -> RicState.IDLE.equals(ric1.state()));
+        await().untilAsserted(() -> RicState.IDLE.equals(ric2.state()));
+        await().untilAsserted(() -> RicState.IDLE.equals(ric3.state()));
 
         verify(a1ClientMock).deletePolicy("baseUrl1", "policyId2");
         verify(a1ClientMock).deletePolicy("baseUrl2", "policyId2");
index 2ee175d..729fc7b 100644 (file)
@@ -29,8 +29,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.oransc.policyagent.repository.Ric.RicState.ACTIVE;
-import static org.oransc.policyagent.repository.Ric.RicState.NOT_REACHABLE;
+import static org.oransc.policyagent.repository.Ric.RicState.IDLE;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -49,6 +48,7 @@ import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
 import org.oransc.policyagent.repository.Services;
 import reactor.core.publisher.Mono;
@@ -77,11 +77,6 @@ public class StartupServiceTest {
 
     @Test
     public void startup_allOk() {
-        Vector<RicConfig> ricConfigs = new Vector<>(2);
-        ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A));
-        ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
-        when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
-
         Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
         Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
         when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
@@ -97,6 +92,12 @@ public class StartupServiceTest {
 
         serviceUnderTest.startup();
 
+        serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
+            ApplicationConfig.RicConfigUpdate.ADDED);
+        serviceUnderTest.onRicConfigUpdate(
+            getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
+            ApplicationConfig.RicConfigUpdate.ADDED);
+
         await().untilAsserted(() -> assertThat(policyTypes.size()).isEqualTo(2));
 
         verify(a1ClientMock).getPolicyTypeIdentities(FIRST_RIC_URL);
@@ -114,7 +115,7 @@ public class StartupServiceTest {
         Ric firstRic = rics.get(FIRST_RIC_NAME);
         assertNotNull(firstRic, "Ric " + FIRST_RIC_NAME + " not added to repository");
         assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics");
-        assertEquals(ACTIVE, firstRic.state(), "Not correct state for ric " + FIRST_RIC_NAME);
+        assertEquals(IDLE, firstRic.state(), "Not correct state for ric " + FIRST_RIC_NAME);
         assertEquals(1, firstRic.getSupportedPolicyTypes().size(),
             "Not correct no of types supported for ric " + FIRST_RIC_NAME);
         assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME),
@@ -126,7 +127,7 @@ public class StartupServiceTest {
         Ric secondRic = rics.get(SECOND_RIC_NAME);
         assertNotNull(secondRic, "Ric " + SECOND_RIC_NAME + " not added to repository");
         assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics");
-        assertEquals(ACTIVE, secondRic.state(), "Not correct state for " + SECOND_RIC_NAME);
+        assertEquals(IDLE, secondRic.state(), "Not correct state for " + SECOND_RIC_NAME);
         assertEquals(2, secondRic.getSupportedPolicyTypes().size(),
             "Not correct no of types supported for ric " + SECOND_RIC_NAME);
         assertTrue(secondRic.isSupportingType(POLICY_TYPE_1_NAME),
@@ -141,11 +142,6 @@ public class StartupServiceTest {
 
     @Test
     public void startup_unableToConnectToGetTypes() {
-        Vector<RicConfig> ricConfigs = new Vector<>(2);
-        ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A));
-        ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
-        when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
-
         Mono<Collection<String>> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
         Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
         doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString());
@@ -161,21 +157,22 @@ public class StartupServiceTest {
             new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services());
 
         serviceUnderTest.startup();
+        serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
+            ApplicationConfig.RicConfigUpdate.ADDED);
+        serviceUnderTest.onRicConfigUpdate(
+            getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
+            ApplicationConfig.RicConfigUpdate.ADDED);
 
         verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1);
         verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2);
 
-        assertEquals(NOT_REACHABLE, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
+        assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
 
-        assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
+        assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
     }
 
     @Test
     public void startup_unableToConnectToGetPolicies() {
-        Vector<RicConfig> ricConfigs = new Vector<>(2);
-        ricConfigs.add(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A));
-        ricConfigs.add(getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C));
-        when(appConfigMock.getRicConfigs()).thenReturn(ricConfigs);
 
         Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
         Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
@@ -192,13 +189,18 @@ public class StartupServiceTest {
             new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services());
 
         serviceUnderTest.startup();
+        serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
+            ApplicationConfig.RicConfigUpdate.ADDED);
+        serviceUnderTest.onRicConfigUpdate(
+            getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
+            ApplicationConfig.RicConfigUpdate.ADDED);
 
         verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1);
         verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2);
 
-        assertEquals(NOT_REACHABLE, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
+        assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
 
-        assertEquals(ACTIVE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
+        assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
     }
 
     @SafeVarargs
index 1a93b4d..aca29f5 100644 (file)
@@ -42,23 +42,27 @@ public class MockA1Client implements A1Client {
 
     @Override
     public Mono<Collection<String>> getPolicyTypeIdentities(String nearRtRicUrl) {
-        Vector<String> result = new Vector<>();
-        for (PolicyType p : this.policyTypes.getAll()) {
-            result.add(p.name());
+        synchronized (this.policyTypes) {
+            Vector<String> result = new Vector<>();
+            for (PolicyType p : this.policyTypes.getAll()) {
+                result.add(p.name());
+            }
+            return Mono.just(result);
         }
-        return Mono.just(result);
     }
 
     @Override
     public Mono<Collection<String>> getPolicyIdentities(String nearRtRicUrl) {
-        Vector<String> result = new Vector<>();
-        for (Policy policy : getPolicies(nearRtRicUrl).getAll()) {
-            if (policy.ric().getConfig().baseUrl().equals(nearRtRicUrl)) {
-                result.add(policy.id());
+        synchronized (this.policies) {
+            Vector<String> result = new Vector<>();
+            for (Policy policy : getPolicies(nearRtRicUrl).getAll()) {
+                if (policy.ric().getConfig().baseUrl().equals(nearRtRicUrl)) {
+                    result.add(policy.id());
+                }
             }
-        }
 
-        return Mono.just(result);
+            return Mono.just(result);
+        }
     }
 
     @Override