Fixed concurrency problems 25/2625/4
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 2 Mar 2020 07:30:16 +0000 (08:30 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 3 Mar 2020 13:26:47 +0000 (14:26 +0100)
Added fixes of concurrecy problems, when consistency check is done, policies cannot be created/deleted
Added a unit test that tests concurrent execution
Improved trace
PUT policy, return HTTP status CREATE for new policy (OK for updated)

Improved the recovery handling and the consistency monitoring

Change-Id: I2504fce35dc6e3371c7d4cf389528ce75eda442d
Issue-ID: NONRTRIC-107
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
28 files changed:
policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java
policy-agent/src/main/java/org/oransc/policyagent/clients/JsonHelper.java
policy-agent/src/main/java/org/oransc/policyagent/clients/OscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOnapA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/clients/StdA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/RicConfig.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java [new file with mode: 0644]
policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/clients/StdA1ClientTest.java
policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java

index ff9c8ff..ace486b 100644 (file)
@@ -21,6 +21,7 @@
 package org.oransc.policyagent.clients;
 
 import java.util.List;
+
 import org.oransc.policyagent.repository.Policy;
 
 import reactor.core.publisher.Flux;
index 0a7c3a1..0d27bd6 100644 (file)
@@ -32,6 +32,7 @@ import reactor.core.publisher.Mono;
 public class AsyncRestClient {
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final WebClient client;
+    private final String baseUrl;
 
     public class AsyncRestClientException extends Exception {
 
@@ -44,9 +45,11 @@ public class AsyncRestClient {
 
     public AsyncRestClient(String baseUrl) {
         this.client = WebClient.create(baseUrl);
+        this.baseUrl = baseUrl;
     }
 
     public Mono<String> post(String uri, String body) {
+        logger.debug("POST uri = '{}{}''", baseUrl, uri);
         return client.post() //
             .uri(uri) //
             .contentType(MediaType.APPLICATION_JSON) //
@@ -58,6 +61,7 @@ public class AsyncRestClient {
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
+        logger.debug("POST (auth) uri = '{}{}''", baseUrl, uri);
         return client.post() //
             .uri(uri) //
             .headers(headers -> headers.setBasicAuth(username, password)) //
@@ -70,7 +74,7 @@ public class AsyncRestClient {
     }
 
     public Mono<String> put(String uri, String body) {
-        logger.debug("PUT uri = '{}''", uri);
+        logger.debug("PUT uri = '{}{}''", baseUrl, uri);
         return client.put() //
             .uri(uri) //
             .contentType(MediaType.APPLICATION_JSON) //
@@ -82,7 +86,7 @@ public class AsyncRestClient {
     }
 
     public Mono<String> get(String uri) {
-        logger.debug("GET uri = '{}''", uri);
+        logger.debug("GET uri = '{}{}''", baseUrl, uri);
         return client.get() //
             .uri(uri) //
             .retrieve() //
@@ -92,7 +96,7 @@ public class AsyncRestClient {
     }
 
     public Mono<String> delete(String uri) {
-        logger.debug("DELETE uri = '{}''", uri);
+        logger.debug("DELETE uri = '{}{}''", baseUrl, uri);
         return client.delete() //
             .uri(uri) //
             .retrieve() //
index 2e7963b..68ffb10 100644 (file)
@@ -23,11 +23,14 @@ package org.oransc.policyagent.clients;
 import com.google.gson.FieldNamingPolicy;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
 import java.util.ArrayList;
 import java.util.List;
+
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
index 3216a48..e7a1587 100644 (file)
@@ -22,12 +22,14 @@ package org.oransc.policyagent.clients;
 
 import java.lang.invoke.MethodHandles;
 import java.util.List;
+
 import org.json.JSONObject;
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.web.util.UriComponentsBuilder;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
index 9b3bb71..b9d7f35 100644 (file)
@@ -23,10 +23,12 @@ package org.oransc.policyagent.clients;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
+
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -96,8 +98,8 @@ public class SdncOnapA1Client implements A1Client {
         String inputJsonString = JsonHelper.createInputJsonString(inputParams);
         logger.debug("POST putPolicy inputJsonString = {}", inputJsonString);
 
-        return restClient.postWithAuthHeader(URL_PREFIX + "createPolicyInstance", inputJsonString,
-            a1ControllerUsername, a1ControllerPassword);
+        return restClient.postWithAuthHeader(URL_PREFIX + "createPolicyInstance", inputJsonString, a1ControllerUsername,
+            a1ControllerPassword);
     }
 
     @Override
index 9bd0f6c..ac52d6a 100644 (file)
@@ -22,10 +22,12 @@ package org.oransc.policyagent.clients;
 
 import java.lang.invoke.MethodHandles;
 import java.util.List;
+
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
index 1715d9d..73adece 100644 (file)
 package org.oransc.policyagent.clients;
 
 import java.util.List;
+
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.repository.Policy;
 import org.springframework.web.util.UriComponentsBuilder;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
index 1dd850b..deacb44 100644 (file)
@@ -25,9 +25,12 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
+
 import lombok.Getter;
+
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
index 578ff12..c69c39d 100644 (file)
@@ -23,6 +23,7 @@ package org.oransc.policyagent.configuration;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -31,8 +32,11 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+
 import javax.validation.constraints.NotNull;
+
 import lombok.Getter;
+
 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.springframework.http.MediaType;
index ea27524..d6d65fc 100644 (file)
@@ -22,6 +22,7 @@ package org.oransc.policyagent.controllers;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -30,6 +31,7 @@ import io.swagger.annotations.ApiResponses;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
@@ -150,14 +152,20 @@ public class PolicyController {
     @DeleteMapping("/policy")
     @ApiOperation(value = "Delete a policy", response = Object.class)
     @ApiResponses(value = {@ApiResponse(code = 204, message = "Policy deleted", response = Object.class)})
-    public Mono<ResponseEntity<Void>> deletePolicy( //
+    public Mono<ResponseEntity<Object>> deletePolicy( //
         @RequestParam(name = "instance", required = true) String id) {
         Policy policy = policies.get(id);
         if (policy != null && policy.ric().getState() == Ric.RicState.IDLE) {
-            policies.remove(policy);
+            Ric ric = policy.ric();
             return a1ClientFactory.createA1Client(policy.ric()) //
+                .doOnNext(notUsed -> ric.getLock().lockBlocking()) //
+                .doOnNext(notUsed -> policies.remove(policy)) //
                 .flatMap(client -> client.deletePolicy(policy)) //
+                .doOnNext(notUsed -> ric.getLock().unlock()) //
+                .doOnError(notUsed -> ric.getLock().unlock()) //
                 .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)));
+        } else if (policy != null) {
+            return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
         } else {
             return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
         }
@@ -166,7 +174,7 @@ public class PolicyController {
     @PutMapping(path = "/policy")
     @ApiOperation(value = "Put a policy", response = String.class)
     @ApiResponses(value = {@ApiResponse(code = 200, message = "Policy created or updated")})
-    public Mono<ResponseEntity<String>> putPolicy( //
+    public Mono<ResponseEntity<Object>> putPolicy( //
         @RequestParam(name = "type", required = true) String typeName, //
         @RequestParam(name = "instance", required = true) String instanceId, //
         @RequestParam(name = "ric", required = true) String ricName, //
@@ -187,13 +195,22 @@ public class PolicyController {
                 .lastModified(getTimeStampUtc()) //
                 .build();
 
-            return validateModifiedPolicy(policy) //
-                .flatMap(x -> a1ClientFactory.createA1Client(ric)) //
+            final boolean isCreate = this.policies.get(policy.id()) == null;
+
+            return Mono.just(policy) //
+                .doOnNext(notUsed -> ric.getLock().lockBlocking()) //
+                .flatMap(p -> validateModifiedPolicy(policy)) //
+                .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
                 .flatMap(client -> client.putPolicy(policy)) //
                 .doOnNext(notUsed -> policies.put(policy)) //
-                .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.OK)));
+                .doOnNext(notUsed -> ric.getLock().unlock()) //
+                .doOnError(t -> ric.getLock().unlock()) //
+                .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+                .onErrorResume(t -> Mono.just(new ResponseEntity<>(t.getMessage(), HttpStatus.METHOD_NOT_ALLOWED)));
         }
-        return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+
+        return ric == null && type == null ? Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND))
+            : Mono.just(new ResponseEntity<>(HttpStatus.CONFLICT)); // Recovering
     }
 
     private Mono<Object> validateModifiedPolicy(Policy policy) {
@@ -201,7 +218,9 @@ public class PolicyController {
         Policy current = this.policies.get(policy.id());
         if (current != null) {
             if (!current.ric().name().equals(policy.ric().name())) {
-                return Mono.error(new Exception("Policy cannot change RIC or service"));
+                return Mono.error(new Exception("Policy cannot change RIC, policyId: " + current.id() + //
+                    ", RIC name: " + current.ric().name() + //
+                    ", new name: " + policy.ric().name()));
             }
         }
         return Mono.just("OK");
index b95c196..f6fc850 100644 (file)
@@ -27,9 +27,11 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Rics;
index 3f775a5..464511f 100644 (file)
@@ -31,6 +31,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
@@ -68,7 +69,7 @@ public class ServiceController {
     @ApiResponses(
         value = {@ApiResponse(code = 200, message = "OK", response = ServiceStatus.class, responseContainer = "List")})
     public ResponseEntity<String> getServices(//
-        @RequestParam(name = "serviceName", required = false) String name) {
+        @RequestParam(name = "name", required = false) String name) {
 
         Collection<ServiceStatus> servicesStatus = new ArrayList<>();
         synchronized (this.services) {
@@ -104,7 +105,7 @@ public class ServiceController {
     @ApiResponses(value = {@ApiResponse(code = 200, message = "OK")})
     @DeleteMapping("/services")
     public ResponseEntity<String> deleteService(//
-        @RequestParam(name = "serviceName", required = true) String serviceName) {
+        @RequestParam(name = "name", required = true) String serviceName) {
         try {
             Service service = removeService(serviceName);
             // Remove the policies from the repo and let the consistency monitoring
@@ -122,7 +123,7 @@ public class ServiceController {
             @ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
     @PostMapping("/services/keepalive")
     public ResponseEntity<String> keepAliveService(//
-        @RequestParam(name = "serviceName", required = true) String serviceName) {
+        @RequestParam(name = "name", required = true) String serviceName) {
         try {
             services.getService(serviceName).ping();
             return new ResponseEntity<>("OK", HttpStatus.OK);
index 6b4f0f4..15d9952 100644 (file)
@@ -22,7 +22,9 @@ package org.oransc.policyagent.dmaap;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
 import java.io.IOException;
+
 import org.onap.dmaap.mr.client.MRBatchingPublisher;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java
new file mode 100644 (file)
index 0000000..68ea5a7
--- /dev/null
@@ -0,0 +1,140 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.repository;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+
+/**
+ * A resource lock. The caller thread will be blocked until the lock is granted.
+ * Exclusive means that the caller takes exclusive ownership of the resurce. Non
+ * exclusive lock means that several users can lock the resource (for shared
+ * usage).
+ */
+public class Lock {
+    private static final Logger logger = LoggerFactory.getLogger(Lock.class);
+
+    private boolean isExclusive = false;
+    private int cnt = 0;
+
+    public static enum LockType {
+        EXCLUSIVE, SHARED
+    }
+
+    public synchronized void lockBlocking(LockType locktype) {
+        while (!tryLock(locktype)) {
+            this.waitForUnlock();
+        }
+    }
+
+    public synchronized void lockBlocking() {
+        lockBlocking(LockType.SHARED);
+    }
+
+    public synchronized Mono<Lock> lock(LockType lockType) {
+        if (tryLock(lockType)) {
+            return Mono.just(this);
+        } else {
+            return Mono.create(monoSink -> addToQueue(monoSink, lockType));
+        }
+    }
+
+    public synchronized void unlock() {
+        if (disable()) {
+            return;
+        }
+        if (cnt <= 0) {
+            cnt = -1; // Might as well stop, to make it easier to find the problem
+            throw new RuntimeException("Number of unlocks must match the number of locks");
+        }
+        this.cnt--;
+        if (cnt == 0) {
+            isExclusive = false;
+        }
+        this.processQueuedEntries();
+        this.notifyAll();
+    }
+
+    private void processQueuedEntries() {
+        for (Iterator<QueueEntry> i = queue.iterator(); i.hasNext();) {
+            QueueEntry e = i.next();
+            if (tryLock(e.lockType)) {
+                i.remove();
+                e.callback.success(this);
+            }
+        }
+    }
+
+    static class QueueEntry {
+        final MonoSink<Lock> callback;
+        final LockType lockType;
+
+        QueueEntry(MonoSink<Lock> callback, LockType lockType) {
+            this.callback = callback;
+            this.lockType = lockType;
+        }
+    }
+
+    private final List<QueueEntry> queue = new LinkedList<>();
+
+    private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
+        queue.add(new QueueEntry(callback, lockType));
+    }
+
+    private void waitForUnlock() {
+        try {
+            this.wait();
+        } catch (InterruptedException e) {
+            logger.warn("waitForUnlock interrupted", e);
+        }
+    }
+
+    private boolean disable() {
+        return true;
+    }
+
+    private boolean tryLock(LockType lockType) {
+        if (disable()) {
+            return true;
+        }
+        if (this.isExclusive) {
+            return false;
+        }
+        if (lockType == LockType.EXCLUSIVE && cnt > 0) {
+            return false;
+        }
+        cnt++;
+        this.isExclusive = lockType == LockType.EXCLUSIVE;
+        return true;
+    }
+
+    public synchronized int getLockCounter() {
+        return this.cnt;
+    }
+
+}
index c910dd5..54f876a 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+
 import org.oransc.policyagent.exceptions.ServiceException;
 
 public class Policies {
index 6eece5e..4291d6e 100644 (file)
@@ -26,8 +26,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
+
 import lombok.Getter;
 import lombok.Setter;
+
 import org.oransc.policyagent.clients.A1Client.A1ProtocolType;
 import org.oransc.policyagent.configuration.RicConfig;
 
@@ -35,6 +37,7 @@ import org.oransc.policyagent.configuration.RicConfig;
  * Represents the dynamic information about a NearRealtime-RIC.
  */
 public class Ric {
+
     private final RicConfig ricConfig;
     private final List<String> managedElementIds;
 
@@ -44,6 +47,8 @@ public class Ric {
     @Setter
     private A1ProtocolType protocolVersion = A1ProtocolType.UNKNOWN;
 
+    @Getter
+    private final Lock lock = new Lock();
 
     /**
      * Creates the Ric. Initial state is {@link RicState.NOT_INITIATED}.
@@ -52,7 +57,8 @@ public class Ric {
      */
     public Ric(RicConfig ricConfig) {
         this.ricConfig = ricConfig;
-        this.managedElementIds = new ArrayList<>(ricConfig.managedElementIds());
+        this.managedElementIds = new ArrayList<>(ricConfig.managedElementIds()); // TODO, this is config why is it
+                                                                                 // copied here?
     }
 
     public String name() {
index ce318dd..f75db21 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collection;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
@@ -41,7 +42,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Regularly checks the existing rics towards the local repository to keep it consistent.
+ * Regularly checks the existing rics towards the local repository to keep it
+ * consistent.
  */
 @Component
 @EnableScheduling
@@ -78,8 +80,10 @@ public class RepositorySupervision {
             return Flux.fromIterable(rics.getRics()) //
                 .flatMap(this::createRicData) //
                 .flatMap(this::checkRicState) //
+                .doOnNext(ricData -> ricData.ric.getLock().lockBlocking(LockType.EXCLUSIVE)) //
                 .flatMap(this::checkRicPolicies) //
-                .flatMap(this::checkRicPolicyTypes);
+                .doOnNext(ricData -> ricData.ric.getLock().unlock()) //
+                .flatMap(this::checkRicPolicyTypes); //
         }
     }
 
@@ -111,22 +115,28 @@ public class RepositorySupervision {
 
     private Mono<RicData> checkRicPolicies(RicData ric) {
         return ric.a1Client.getPolicyIdentities() //
-            .onErrorResume(t -> Mono.empty()) //
+            .onErrorResume(t -> {
+                ric.ric.getLock().unlock();
+                return Mono.empty();
+            }) //
             .flatMap(ricP -> validateInstances(ricP, ric));
     }
 
     private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
         synchronized (this.policies) {
             if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
+                ric.ric.getLock().unlock();
                 return startSynchronization(ric);
             }
-        }
-        for (String policyId : ricPolicies) {
-            if (!policies.containsPolicy(policyId)) {
-                return startSynchronization(ric);
+
+            for (String policyId : ricPolicies) {
+                if (!policies.containsPolicy(policyId)) {
+                    ric.ric.getLock().unlock();
+                    return startSynchronization(ric);
+                }
             }
+            return Mono.just(ric);
         }
-        return Mono.just(ric);
     }
 
     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
@@ -165,4 +175,4 @@ public class RepositorySupervision {
     RicSynchronizationTask createSynchronizationTask() {
         return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
     }
-}
\ No newline at end of file
+}
index bcfda48..d759991 100644 (file)
@@ -22,13 +22,13 @@ package org.oransc.policyagent.tasks;
 
 import static org.oransc.policyagent.repository.Ric.RicState;
 
-import java.util.Collection;
 import java.util.Vector;
 
 import org.oransc.policyagent.clients.A1Client;
 import org.oransc.policyagent.clients.A1ClientFactory;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.PolicyType;
@@ -43,12 +43,16 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Synchronizes the content of a RIC with the content in the repository.
- * This means:
- * - load all policy types
- * - send all policy instances to the RIC
- * --- if that fails remove all policy instances
- * - Notify subscribing services
+ * Synchronizes the content of a RIC with the content in the repository. This
+ * means:
+ * <p>
+ * load all policy types
+ * <p>
+ * send all policy instances to the RIC
+ * <p>
+ * if that fails remove all policy instances
+ * <p>
+ * Notify subscribing services
  */
 public class RicSynchronizationTask {
 
@@ -78,6 +82,8 @@ public class RicSynchronizationTask {
             }
             ric.setState(RicState.SYNCHRONIZING);
         }
+        ric.getLock().lockBlocking(LockType.EXCLUSIVE); // Make sure no NBI updates are running
+        ric.getLock().unlock();
         this.a1ClientFactory.createA1Client(ric)//
             .flatMapMany(client -> startSynchronization(ric, client)) //
             .subscribe(x -> logger.debug("Synchronize: {}", x), //
@@ -87,13 +93,9 @@ public class RicSynchronizationTask {
 
     private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
         Flux<PolicyType> recoverTypes = synchronizePolicyTypes(ric, a1Client);
-        Collection<Policy> policiesForRic = policies.getForRic(ric.name());
-        Flux<?> policiesDeletedInRic = Flux.empty();
-        Flux<?> policiesRecreatedInRic = Flux.empty();
-        if (!policiesForRic.isEmpty()) {
-            policiesDeletedInRic = a1Client.deleteAllPolicies();
-            policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
-        }
+        Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
+        Flux<?> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+
         return Flux.concat(recoverTypes, policiesDeletedInRic, policiesRecreatedInRic);
     }
 
@@ -123,17 +125,17 @@ public class RicSynchronizationTask {
     @SuppressWarnings("squid:S2629")
     private void onSynchronizationError(Ric ric, Throwable t) {
         logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
+        // If recovery fails, try to remove all instances
         deleteAllPoliciesInRepository(ric);
 
-        Flux<PolicyType> typesRecoveredForRic = this.a1ClientFactory.createA1Client(ric) //
+        Flux<PolicyType> recoverTypes = this.a1ClientFactory.createA1Client(ric) //
             .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
+        Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
+            .flatMapMany(a1Client -> a1Client.deleteAllPolicies()) //
+            .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
 
-        // If recovery fails, try to remove all instances
-        Flux<?> policiesDeletedInRic = this.a1ClientFactory.createA1Client(ric) //
-            .flatMapMany(A1Client::deleteAllPolicies);
-
-        Flux.merge(typesRecoveredForRic, policiesDeletedInRic) //
-            .subscribe(x -> logger.debug("Brute recover: {}", x), //
+        Flux.concat(recoverTypes, deletePoliciesInRic) //
+            .subscribe(x -> logger.debug("Brute recover: " + x), //
                 throwable -> onRecoveryError(ric, throwable), //
                 () -> onSynchronizationComplete(ric));
     }
@@ -149,8 +151,8 @@ public class RicSynchronizationTask {
     }
 
     private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
-        ric.clearSupportedPolicyTypes();
         return a1Client.getPolicyTypeIdentities() //
+            .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
             .flatMapMany(Flux::fromIterable) //
             .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
             .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
index 9d45f05..4ea6ff9 100644 (file)
@@ -28,10 +28,17 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+
 import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.oransc.policyagent.configuration.ApplicationConfig;
@@ -43,6 +50,7 @@ import org.oransc.policyagent.controllers.ServiceStatus;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.ImmutablePolicy;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
+import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.PolicyType;
@@ -63,6 +71,7 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.HttpStatus.Series;
 import org.springframework.http.MediaType;
@@ -97,8 +106,8 @@ public class ApplicationTest {
     Services services;
 
     private static Gson gson = new GsonBuilder() //
-        .serializeNulls() //
-        .create(); //
+            .serializeNulls() //
+            .create(); //
 
     public static class MockApplicationConfig extends ApplicationConfig {
         @Override
@@ -124,20 +133,10 @@ public class ApplicationTest {
             return new MockA1ClientFactory(this.policyTypes);
         }
 
-        @Bean
-        public Policies getPolicies() {
-            return new Policies();
-        }
-
         @Bean
         public PolicyTypes getPolicyTypes() {
             return this.policyTypes;
         }
-
-        @Bean
-        public Rics getRics() {
-            return new Rics();
-        }
     }
 
     @LocalServerPort
@@ -150,7 +149,7 @@ public class ApplicationTest {
         @Override
         public boolean hasError(ClientHttpResponse httpResponse) throws IOException {
             return (httpResponse.getStatusCode().series() == Series.CLIENT_ERROR
-                || httpResponse.getStatusCode().series() == Series.SERVER_ERROR);
+                    || httpResponse.getStatusCode().series() == Series.SERVER_ERROR);
         }
 
         @Override
@@ -159,9 +158,30 @@ public class ApplicationTest {
         }
     }
 
+    private void setRestErrorhandler() {
+        restTemplate.setErrorHandler(new RestTemplateResponseErrorHandler());
+    }
+
+    @BeforeEach
+    public void reset() {
+        rics.clear();
+        policies.clear();
+        policyTypes.clear();
+        services.clear();
+    }
+
+    @AfterEach
+    public void verifyNoRicLocks() {
+        for (Ric ric : this.rics.getRics()) {
+            ric.getLock().lockBlocking(LockType.EXCLUSIVE);
+            ric.getLock().unlock();
+            assertThat(ric.getLock().getLockCounter()).isEqualTo(0);
+            assertThat(ric.getState()).isEqualTo(Ric.RicState.IDLE);
+        }
+    }
+
     @Test
     public void testGetRics() throws Exception {
-        reset();
         addRic("kista_1");
         String url = baseUrl() + "/rics";
         String rsp = this.restTemplate.getForObject(url, String.class);
@@ -175,7 +195,7 @@ public class ApplicationTest {
 
     @Test
     public void testRecovery() throws Exception {
-        reset();
+        addRic("ric").setState(Ric.RicState.UNDEFINED);
         String ricName = "ric";
         Policy policy2 = addPolicy("policyId2", "typeName", "service", ricName);
 
@@ -196,7 +216,6 @@ public class ApplicationTest {
 
     @Test
     public void testGetRicForManagedElement_thenReturnCorrectRic() throws Exception {
-        reset();
         addRic("notCorrectRic1");
         addRic("notCorrectRic2");
         addRic("notCorrectRic3");
@@ -216,24 +235,15 @@ public class ApplicationTest {
     }
 
     @Test
-    public void testGetRicForManagedElementThatDoesNotExist_thenReturnEmpty() throws Exception {
-        reset();
-        addRic("notCorrectRic1");
-        addRic("notCorrectRic2");
-        addRic("notCorrectRic3");
-        addRic("notCorrectRic4");
-        addRic("notCorrectRic5");
-        addRic("notCorrectRic6");
-
+    public void testGetRicForManagedElementThatDoesNotExist() throws Exception {
+        this.setRestErrorhandler();
         String url = baseUrl() + "/ric?managedElementId=kista_1";
-        String rsp = this.restTemplate.getForObject(url, String.class);
-
-        assertThat(rsp).isNull();
+        ResponseEntity<String> entity = this.restTemplate.getForEntity(url, String.class);
+        assertThat(entity.getStatusCode().equals(HttpStatus.NOT_FOUND));
     }
 
     @Test
     public void testPutPolicy() throws Exception {
-        reset();
         String serviceName = "service1";
         String ricName = "ric1";
         String policyTypeName = "type1";
@@ -243,7 +253,7 @@ public class ApplicationTest {
         addPolicyType(policyTypeName, ricName);
 
         String url = baseUrl() + "/policy?type=" + policyTypeName + "&instance=" + policyInstanceId + "&ric=" + ricName
-            + "&service=" + serviceName;
+                + "&service=" + serviceName;
         final String json = jsonString();
         this.rics.getRic(ricName).setState(Ric.RicState.IDLE);
 
@@ -265,13 +275,15 @@ public class ApplicationTest {
     public void testRefuseToUpdatePolicy() throws Exception {
         // Test that only the json can be changed for a already created policy
         // In this case service is attempted to be changed
-        reset();
-        this.addRic("ric1").setState(Ric.RicState.IDLE);
-        this.addRic("ricXXX").setState(Ric.RicState.IDLE);
+        this.addRic("ric1");
+        this.addRic("ricXXX");
 
         this.addPolicy("instance1", "type1", "service1", "ric1");
+        this.setRestErrorhandler();
         String urlWrongRic = baseUrl() + "/policy?type=type1&instance=instance1&ric=ricXXX&service=service1";
-        this.restTemplate.put(urlWrongRic, createJsonHttpEntity(jsonString()));
+        ResponseEntity<String> entity = this.putForEntity(urlWrongRic, jsonString());
+        assertThat(entity.getStatusCode().equals(HttpStatus.METHOD_NOT_ALLOWED));
+
         Policy policy = policies.getPolicy("instance1");
         assertThat(policy.ric().name()).isEqualTo("ric1"); // Not changed
     }
@@ -293,10 +305,8 @@ public class ApplicationTest {
 
     @Test
     public void testDeletePolicy() throws Exception {
-        reset();
         String url = baseUrl() + "/policy?instance=id";
-        Policy policy = addPolicy("id", "typeName", "service1", "ric1");
-        policy.ric().setState(Ric.RicState.IDLE);
+        addPolicy("id", "typeName", "service1", "ric1");
         assertThat(policies.size()).isEqualTo(1);
 
         this.restTemplate.delete(url);
@@ -306,7 +316,6 @@ public class ApplicationTest {
 
     @Test
     public void testGetPolicySchemas() throws Exception {
-        reset();
         addPolicyType("type1", "ric1");
         addPolicyType("type2", "ric2");
 
@@ -328,7 +337,6 @@ public class ApplicationTest {
 
     @Test
     public void testGetPolicySchema() throws Exception {
-        reset();
         addPolicyType("type1", "ric1");
         addPolicyType("type2", "ric2");
 
@@ -341,7 +349,6 @@ public class ApplicationTest {
 
     @Test
     public void testGetPolicyTypes() throws Exception {
-        reset();
         addPolicyType("type1", "ric1");
         addPolicyType("type2", "ric2");
 
@@ -394,7 +401,6 @@ public class ApplicationTest {
 
     @Test
     public void testPutAndGetService() throws Exception {
-        reset();
         // PUT
         putService("name");
 
@@ -414,27 +420,26 @@ public class ApplicationTest {
         System.out.println(rsp);
 
         // Keep alive
-        url = baseUrl() + "/services/keepalive?serviceName=name";
-        rsp = this.restTemplate.postForObject(url, null, String.class);
-        assertThat(rsp.contains("OK")).isTrue();
+        url = baseUrl() + "/services/keepalive?name=name";
+        ResponseEntity<String> entity = this.restTemplate.postForEntity(url, null, String.class);
+        assertThat(entity.getStatusCode().equals(HttpStatus.OK));
 
         // DELETE
         assertThat(services.size()).isEqualTo(1);
-        url = baseUrl() + "/services?serviceName=name";
+        url = baseUrl() + "/services?name=name";
         this.restTemplate.delete(url);
         assertThat(services.size()).isEqualTo(0);
 
         // Keep alive, no registerred service
-        url = baseUrl() + "/services/keepalive?serviceName=nameXXX";
-        ResponseEntity<String> entity = this.restTemplate.postForEntity(url, null, String.class);
+        url = baseUrl() + "/services/keepalive?name=name";
+        setRestErrorhandler();
+        entity = this.restTemplate.postForEntity(url, null, String.class);
         assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
     }
 
     @Test
     public void testGetPolicyStatus() throws Exception {
-        reset();
-        Policy policy = addPolicy("id", "typeName", "service1", "ric1");
-        policy.ric().setState(Ric.RicState.IDLE);
+        addPolicy("id", "typeName", "service1", "ric1");
         assertThat(policies.size()).isEqualTo(1);
 
         String url = baseUrl() + "/policy_status?instance=id";
@@ -442,40 +447,14 @@ public class ApplicationTest {
         assertThat(rsp.equals("OK")).isTrue();
     }
 
-    private PolicyType addPolicyType(String policyTypeName, String ricName) {
-        PolicyType type = ImmutablePolicyType.builder() //
-            .name(policyTypeName) //
-            .schema("{\"title\":\"" + policyTypeName + "\"}") //
-            .build();
-
-        policyTypes.put(type);
-        addRic(ricName).addSupportedPolicyType(type);
-        return type;
-    }
-
-    private Ric addRic(String ricName) {
-        if (rics.get(ricName) != null) {
-            return rics.get(ricName);
-        }
-        Vector<String> mes = new Vector<>();
-        RicConfig conf = ImmutableRicConfig.builder() //
-            .name(ricName) //
-            .baseUrl(ricName) //
-            .managedElementIds(mes) //
-            .build();
-        Ric ric = new Ric(conf);
-        this.rics.put(ric);
-        return ric;
-    }
-
     private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
         addRic(ric);
         Policy p = ImmutablePolicy.builder().id(id) //
-            .json(jsonString()) //
-            .ownerServiceName(service) //
-            .ric(rics.getRic(ric)) //
-            .type(addPolicyType(typeName, ric)) //
-            .lastModified("lastModified").build();
+                .json(jsonString()) //
+                .ownerServiceName(service) //
+                .ric(rics.getRic(ric)) //
+                .type(addPolicyType(typeName, ric)) //
+                .lastModified("lastModified").build();
         policies.put(p);
         return p;
     }
@@ -501,29 +480,105 @@ public class ApplicationTest {
         return "http://localhost:" + port;
     }
 
-    private void reset() {
-        rics.clear();
-        policies.clear();
-        policyTypes.clear();
-        services.clear();
-        assertThat(policies.size()).isEqualTo(0);
-        restTemplate.setErrorHandler(new RestTemplateResponseErrorHandler());
-    }
-
     private String jsonString() {
         return "{\n  \"servingCellNrcgi\": \"1\"\n }";
     }
 
+    private static class ConcurrencyTestRunnable implements Runnable {
+        private final RestTemplate restTemplate = new RestTemplate();
+        private final String baseUrl;
+        static AtomicInteger nextCount = new AtomicInteger(0);
+        private final int count;
+        private final RepositorySupervision supervision;
+
+        ConcurrencyTestRunnable(String baseUrl, RepositorySupervision supervision) {
+            this.baseUrl = baseUrl;
+            this.count = nextCount.incrementAndGet();
+            this.supervision = supervision;
+        }
+
+        public void run() {
+            for (int i = 0; i < 100; ++i) {
+                if (i % 10 == 0) {
+                    this.supervision.checkAllRics();
+                }
+                String name = "policy:" + count + ":" + i;
+                putPolicy(name);
+                deletePolicy(name);
+            }
+        }
+
+        private void putPolicy(String name) {
+            String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric1&service=service1";
+            this.restTemplate.put(putUrl, createJsonHttpEntity("{}"));
+        }
+
+        private void deletePolicy(String name) {
+            String deleteUrl = baseUrl + "/policy?instance=" + name;
+            this.restTemplate.delete(deleteUrl);
+        }
+    }
+
+    @Test
+    public void testConcurrency() throws Exception {
+        final Instant startTime = Instant.now();
+        List<Thread> threads = new ArrayList<>();
+        addRic("ric1");
+        addPolicyType("type1", "ric1");
+
+        for (int i = 0; i < 100; ++i) {
+            Thread t = new Thread(new ConcurrencyTestRunnable(baseUrl(), this.supervision), "TestThread_" + i);
+            t.start();
+            threads.add(t);
+        }
+        for (Thread t : threads) {
+            t.join();
+        }
+        assertThat(policies.size()).isEqualTo(0);
+        System.out.println("Concurrency test took " + Duration.between(startTime, Instant.now()));
+    }
+
     private MockA1Client getA1Client(String ricName) throws ServiceException {
         return a1ClientFactory.getOrCreateA1Client(ricName);
     }
 
-    private HttpEntity<String> createJsonHttpEntity(String content) {
+    private PolicyType addPolicyType(String policyTypeName, String ricName) {
+        PolicyType type = ImmutablePolicyType.builder() //
+                .name(policyTypeName) //
+                .schema("{\"title\":\"" + policyTypeName + "\"}") //
+                .build();
+
+        policyTypes.put(type);
+        addRic(ricName).addSupportedPolicyType(type);
+        return type;
+    }
+
+    private Ric addRic(String ricName) {
+        if (rics.get(ricName) != null) {
+            return rics.get(ricName);
+        }
+        Vector<String> mes = new Vector<>();
+        RicConfig conf = ImmutableRicConfig.builder() //
+                .name(ricName) //
+                .baseUrl(ricName) //
+                .managedElementIds(mes) //
+                .build();
+        Ric ric = new Ric(conf);
+        ric.setState(Ric.RicState.IDLE);
+        this.rics.put(ric);
+        return ric;
+    }
+
+    private static HttpEntity<String> createJsonHttpEntity(String content) {
         HttpHeaders headers = new HttpHeaders();
         headers.setContentType(MediaType.APPLICATION_JSON);
         return new HttpEntity<String>(content, headers);
     }
 
+    private ResponseEntity<String> putForEntity(String url, String jsonBody) {
+        return restTemplate.exchange(url, HttpMethod.PUT, createJsonHttpEntity(jsonBody), String.class);
+    }
+
     private static <T> List<T> parseList(String jsonString, Class<T> clazz) {
         List<T> result = new ArrayList<>();
         JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray();
@@ -542,5 +597,4 @@ public class ApplicationTest {
         }
         return result;
     }
-
 }
index 4f4a9be..1ea677c 100644 (file)
@@ -22,10 +22,12 @@ package org.oransc.policyagent;
 
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Files;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.oransc.policyagent.configuration.ApplicationConfig;
index f4e6821..d5dd3b7 100644 (file)
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.oransc.policyagent.repository.Policy;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
index 5136a70..02c84db 100644 (file)
@@ -121,7 +121,8 @@ public class ApplicationConfigParserTest {
         private JsonObject fake_info_object;
 
         public String toString() {
-            return String.format("[dmaap_publisher=%s, fake_info_object=%s]", dmaap_publisher.toString(), fake_info_object.toString());
+            return String.format("[dmaap_publisher=%s, fake_info_object=%s]", dmaap_publisher.toString(),
+                fake_info_object.toString());
         }
     }
 
@@ -147,7 +148,8 @@ public class ApplicationConfigParserTest {
         private JsonObject fake_info_object;
 
         public String toString() {
-            return String.format("[dmaap_subscriber=%s, fake_info_object=%s]", dmaap_subscriber.toString(), fake_info_object.toString());
+            return String.format("[dmaap_subscriber=%s, fake_info_object=%s]", dmaap_subscriber.toString(),
+                fake_info_object.toString());
         }
     }
 
@@ -227,8 +229,8 @@ public class ApplicationConfigParserTest {
             "Wrong error message when the streams publishes' URL has incorrect syntax");
     }
 
-    public JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes, String dmaapPublisherOrSubscriber)
-        throws Exception {
+    public JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes,
+        String dmaapPublisherOrSubscriber) throws Exception {
         return jsonRootObject.getAsJsonObject("config").getAsJsonObject(streamsPublishesOrSubscribes)
             .getAsJsonObject(dmaapPublisherOrSubscriber).getAsJsonObject("dmaap_info");
     }
index 859708a..7b54e06 100644 (file)
@@ -30,12 +30,14 @@ import static org.mockito.Mockito.verify;
 import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 import com.google.gson.JsonIOException;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,6 +47,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.Vector;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -58,6 +61,7 @@ import org.oransc.policyagent.configuration.ApplicationConfigParser;
 import org.oransc.policyagent.configuration.ImmutableRicConfig;
 import org.oransc.policyagent.configuration.RicConfig;
 import org.oransc.policyagent.utils.LoggingUtils;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -65,7 +69,6 @@ import reactor.test.StepVerifier;
 @ExtendWith(MockitoExtension.class)
 public class RefreshConfigTaskTest {
 
-
     private RefreshConfigTask refreshTaskUnderTest;
 
     @Spy
index fd3dbc0..ea09bb6 100644 (file)
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Vector;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -309,4 +310,4 @@ public class RepositorySupervisionTest {
             when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.error((Exception) returnValue));
         }
     }
-}
\ No newline at end of file
+}
index b05349f..eae00ea 100644 (file)
@@ -35,9 +35,11 @@ import static org.mockito.Mockito.when;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -58,6 +60,7 @@ import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Service;
 import org.oransc.policyagent.repository.Services;
 import org.oransc.policyagent.utils.LoggingUtils;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -150,7 +153,7 @@ public class RicSynchronizationTaskTest {
 
         synchronizerUnderTest.run(RIC_1);
 
-        verify(a1ClientMock).getPolicyTypeIdentities();
+        verify(a1ClientMock, times(1)).getPolicyTypeIdentities();
         verifyNoMoreInteractions(a1ClientMock);
 
         verify(synchronizerUnderTest).run(RIC_1);
@@ -303,6 +306,7 @@ public class RicSynchronizationTaskTest {
 
     private void setUpCreationOfA1Client() {
         when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
+        doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
     }
 
     private AsyncRestClient setUpCreationOfAsyncRestClient(RicSynchronizationTask synchronizerUnderTest) {
index 381c2d1..a984528 100644 (file)
@@ -31,8 +31,10 @@ import static org.mockito.Mockito.when;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import java.time.Duration;
 import java.util.Collections;
+
 import org.awaitility.Durations;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
index ae9385f..eeecc1c 100644 (file)
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
 import static org.oransc.policyagent.repository.Ric.RicState.IDLE;
 
 import com.google.common.collect.ImmutableList;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -93,6 +94,7 @@ public class StartupServiceTest {
         Mono<List<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
         doReturn(policyTypes1, policyTypes2).when(a1ClientMock).getPolicyTypeIdentities();
         doReturn(Mono.just("Schema")).when(a1ClientMock).getPolicyTypeSchema(anyString());
+        doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
@@ -107,16 +109,18 @@ public class StartupServiceTest {
             getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
             ApplicationConfig.RicConfigUpdate.ADDED);
 
-        await().untilAsserted(() -> assertThat(policyTypes.size()).isEqualTo(2));
+        Ric firstRic = rics.get(FIRST_RIC_NAME);
+        Ric secondRic = rics.get(SECOND_RIC_NAME);
+        await().untilAsserted(() -> assertThat(firstRic.getState()).isEqualTo(IDLE));
+        await().untilAsserted(() -> assertThat(secondRic.getState()).isEqualTo(IDLE));
 
         assertTrue(policyTypes.contains(POLICY_TYPE_1_NAME), POLICY_TYPE_1_NAME + " not added to PolicyTypes.");
         assertTrue(policyTypes.contains(POLICY_TYPE_2_NAME), POLICY_TYPE_2_NAME + " not added to PolicyTypes.");
         assertEquals(2, rics.size(), "Correct number of Rics not added to Rics");
 
-        Ric firstRic = rics.get(FIRST_RIC_NAME);
         assertNotNull(firstRic, "Ric " + FIRST_RIC_NAME + " not added to repository");
         assertEquals(FIRST_RIC_NAME, firstRic.name(), FIRST_RIC_NAME + " not added to Rics");
-        assertEquals(IDLE, firstRic.getState(), "Not correct state for ric " + FIRST_RIC_NAME);
+
         assertEquals(1, firstRic.getSupportedPolicyTypes().size(),
             "Not correct no of types supported for ric " + FIRST_RIC_NAME);
         assertTrue(firstRic.isSupportingType(POLICY_TYPE_1_NAME),
@@ -125,10 +129,8 @@ public class StartupServiceTest {
             "Not correct no of managed nodes for ric " + FIRST_RIC_NAME);
         assertTrue(firstRic.isManaging(MANAGED_NODE_A), MANAGED_NODE_A + " not managed by ric " + FIRST_RIC_NAME);
 
-        Ric secondRic = rics.get(SECOND_RIC_NAME);
         assertNotNull(secondRic, "Ric " + SECOND_RIC_NAME + " not added to repository");
         assertEquals(SECOND_RIC_NAME, secondRic.name(), SECOND_RIC_NAME + " not added to Rics");
-        assertEquals(IDLE, secondRic.getState(), "Not correct state for " + SECOND_RIC_NAME);
         assertEquals(2, secondRic.getSupportedPolicyTypes().size(),
             "Not correct no of types supported for ric " + SECOND_RIC_NAME);
         assertTrue(secondRic.isSupportingType(POLICY_TYPE_1_NAME),