Merge "Update mrstub with nginx"
authorHenrik Andersson <henrik.b.andersson@est.tech>
Thu, 14 May 2020 06:14:19 +0000 (06:14 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Thu, 14 May 2020 06:14:19 +0000 (06:14 +0000)
29 files changed:
policy-agent/config/application.yaml
policy-agent/docs/api.yaml
policy-agent/pom.xml
policy-agent/src/main/java/org/oransc/policyagent/aspect/LogAspect.java
policy-agent/src/main/java/org/oransc/policyagent/clients/SdncOscA1Client.java
policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapRequestMessage.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Policy.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/clients/A1ClientHelper.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java
policy-agent/src/test/resources/test_application_configuration.json
sdnc-a1-controller/northbound/nonrt-ric-api/provider/pom.xml
sdnc-a1-controller/northbound/nonrt-ric-api/provider/src/main/java/org/o_ran_sc/nonrtric/sdnc_a1/northbound/provider/NonrtRicApiProvider.java
sdnc-a1-controller/northbound/nonrt-ric-api/provider/src/test/java/org/o_ran_sc/nonrtric/sdnc_a1/northbound/provider/NonrtRicApiProviderTest.java [moved from sdnc-a1-controller/northbound/nonrt-ric-api/provider/src/test/java/org/o_ran_sc/nonrtric/sdnc_a1/northbound/NonrtRicApiProviderTest.java with 66% similarity]
sdnc-a1-controller/northbound/nonrt-ric-api/provider/src/test/java/org/o_ran_sc/nonrtric/sdnc_a1/northbound/restadapter/RestAdapterImplTest.java [new file with mode: 0644]

index b128494..4010219 100644 (file)
@@ -9,7 +9,7 @@ management:
   endpoints:
     web:
       exposure:
-        include: "loggers,logfile,health,info,metrics"
+        include: "loggers,logfile,health,info,metrics,threaddump"
 
 logging:
   level:
index f42c3e0..f3dd06c 100644 (file)
@@ -112,6 +112,12 @@ paths:
           description: service
           required: true
           type: string
+        - name: transient
+          in: query
+          description: transient
+          required: false
+          type: boolean
+          default: false
         - name: type
           in: query
           description: type
index 8ed3368..ed1cbf1 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.2.4.RELEASE</version>
+        <version>2.2.7.RELEASE</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric</groupId>
index f78d6e7..93b2ec0 100644 (file)
@@ -40,13 +40,13 @@ public class LogAspect {
 
     @Around("execution(* org.oransc.policyagent..*(..)))")
     public void executimeTime(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
-        MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
-        String className = methodSignature.getDeclaringType().getSimpleName();
-        String methodName = methodSignature.getName();
         final StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         proceedingJoinPoint.proceed();
         stopWatch.stop();
+        MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
+        String className = methodSignature.getDeclaringType().getSimpleName();
+        String methodName = methodSignature.getName();
         logger.trace("Execution time of {}.{}: {} ms", className, methodName, stopWatch.getTotalTimeMillis());
     }
 
index 2daa4d6..a5735f7 100644 (file)
@@ -78,14 +78,12 @@ public class SdncOscA1Client implements A1Client {
     private final A1ProtocolType protocolType;
 
     /**
-     * Constructor
-     * 
+     * Constructor that creates the REST client to use.
+     *
      * @param protocolType the southbound protocol of the controller. Supported
      *        protocols are SDNC_OSC_STD_V1_1 and SDNC_OSC_OSC_V1
-     * @param ricConfig
-     * @param controllerBaseUrl the base URL of the SDNC controller
-     * @param username username to accesss the SDNC controller
-     * @param password password to accesss the SDNC controller
+     * @param ricConfig the configuration of the Ric to communicate with
+     * @param controllerConfig the configuration of the SDNC controller to use
      */
     public SdncOscA1Client(A1ProtocolType protocolType, RicConfig ricConfig, ControllerConfig controllerConfig) {
         this(protocolType, ricConfig, controllerConfig,
@@ -93,6 +91,15 @@ public class SdncOscA1Client implements A1Client {
         logger.debug("SdncOscA1Client for ric: {}, a1Controller: {}", ricConfig.name(), controllerConfig);
     }
 
+    /**
+     * Constructor where the REST client to use is provided.
+     *
+     * @param protocolType the southbound protocol of the controller. Supported
+     *        protocols are SDNC_OSC_STD_V1_1 and SDNC_OSC_OSC_V1
+     * @param ricConfig the configuration of the Ric to communicate with
+     * @param controllerConfig the configuration of the SDNC controller to use
+     * @param restClient the REST client to use
+     */
     public SdncOscA1Client(A1ProtocolType protocolType, RicConfig ricConfig, ControllerConfig controllerConfig,
         AsyncRestClient restClient) {
         this.restClient = restClient;
index 8418749..5495710 100644 (file)
@@ -70,8 +70,6 @@ public class ApplicationConfigParser {
         Properties dmaapConsumerConfig = new Properties();
 
         JsonObject agentConfigJson = root.getAsJsonObject(CONFIG);
-        List<RicConfig> ricConfigs = parseRics(agentConfigJson);
-        Map<String, ControllerConfig> controllerConfigs = parseControllerConfigs(agentConfigJson);
 
         JsonObject json = agentConfigJson.getAsJsonObject("streams_publishes");
         if (json != null) {
@@ -83,6 +81,8 @@ public class ApplicationConfigParser {
             dmaapConsumerConfig = parseDmaapConfig(json);
         }
 
+        List<RicConfig> ricConfigs = parseRics(agentConfigJson);
+        Map<String, ControllerConfig> controllerConfigs = parseControllerConfigs(agentConfigJson);
         checkConfigurationConsistency(ricConfigs, controllerConfigs);
 
         return ImmutableConfigParserResult.builder() //
index 6d3f594..49d7702 100644 (file)
@@ -209,6 +209,7 @@ public class PolicyController {
         @RequestParam(name = "id", required = true) String instanceId, //
         @RequestParam(name = "ric", required = true) String ricName, //
         @RequestParam(name = "service", required = true) String service, //
+        @RequestParam(name = "transient", required = false, defaultValue = "false") boolean isTransient, //
         @RequestBody Object jsonBody) {
 
         String jsonString = gson.toJson(jsonBody);
@@ -225,6 +226,7 @@ public class PolicyController {
             .ric(ric) //
             .ownerServiceName(service) //
             .lastModified(getTimeStampUtc()) //
+            .isTransient(isTransient) //
             .build();
 
         final boolean isCreate = this.policies.get(policy.id()) == null;
index dd60d39..f13ffeb 100644 (file)
@@ -33,7 +33,6 @@ import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
-import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,13 +40,16 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 /**
- * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the
- * connection with the Kafka open until requests are sent in.
+ * The class fetches incoming requests from DMAAP. It uses the timeout parameter
+ * that lets the MessageRouter keep the connection with the Kafka open until
+ * requests are sent in.
  *
- * If there is no DMaaP configuration in the application configuration, then this service will regularly check the
- * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the
- * service will stop polling and resume checking for configuration.
+ * <p>
+ * this service will regularly check the configuration and start polling DMaaP
+ * if the configuration is added. If the DMaaP configuration is removed, then
+ * the service will stop polling and resume checking for configuration.
  *
+ * <p>
  * Each received request is processed by {@link DmaapMessageHandler}.
  */
 @Component
@@ -59,6 +61,9 @@ public class DmaapMessageConsumer {
 
     private final ApplicationConfig applicationConfig;
 
+    private DmaapMessageHandler dmaapMessageHandler = null;
+    private MRConsumer messageRouterConsumer = null;
+
     @Value("${server.port}")
     private int localServerPort;
 
@@ -67,31 +72,31 @@ public class DmaapMessageConsumer {
         this.applicationConfig = applicationConfig;
     }
 
+    /**
+     * Starts the consumer. If there is a DMaaP configuration, it will start polling
+     * for messages. Otherwise it will check regularly for the configuration.
+     *
+     * @return the running thread, for test purposes.
+     */
     public Thread start() {
-        Thread thread = new Thread(() -> this.checkConfigLoop());
+        Thread thread = new Thread(this::messageHandlingLoop);
         thread.start();
         return thread;
     }
 
-    private void checkConfigLoop() {
-        while (!isStopped()) {
-            if (isDmaapConfigured()) {
-                messageHandlingLoop();
-            } else {
-                sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
-            }
-        }
-    }
-
     private void messageHandlingLoop() {
-        while (!isStopped() && isDmaapConfigured()) {
+        while (!isStopped()) {
             try {
-                Iterable<String> dmaapMsgs = fetchAllMessages();
-                if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
-                    logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
-                    for (String msg : dmaapMsgs) {
-                        processMsg(msg);
+                if (isDmaapConfigured()) {
+                    Iterable<String> dmaapMsgs = fetchAllMessages();
+                    if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
+                        logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
+                        for (String msg : dmaapMsgs) {
+                            processMsg(msg);
+                        }
                     }
+                } else {
+                    sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
                 }
             } catch (Exception e) {
                 logger.warn("Cannot fetch because of {}", e.getMessage());
@@ -132,13 +137,15 @@ public class DmaapMessageConsumer {
         getDmaapMessageHandler().handleDmaapMsg(msg);
     }
 
-    private DmaapMessageHandler getDmaapMessageHandler() throws IOException {
-        String agentBaseUrl = "https://localhost:" + this.localServerPort;
-        AsyncRestClient agentClient = createRestClient(agentBaseUrl);
-        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
-        MRBatchingPublisher producer = getMessageRouterPublisher(dmaapPublisherProperties);
-
-        return createDmaapMessageHandler(agentClient, producer);
+    protected DmaapMessageHandler getDmaapMessageHandler() throws IOException {
+        if (this.dmaapMessageHandler == null) {
+            String agentBaseUrl = "https://localhost:" + this.localServerPort;
+            AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
+            Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+            MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+            this.dmaapMessageHandler = new DmaapMessageHandler(producer, agentClient);
+        }
+        return this.dmaapMessageHandler;
     }
 
     protected void sleep(Duration duration) {
@@ -150,18 +157,10 @@ public class DmaapMessageConsumer {
     }
 
     protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
-        return MRClientFactory.createConsumer(dmaapConsumerProperties);
-    }
-
-    protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
-        return new DmaapMessageHandler(producer, agentClient);
-    }
-
-    protected AsyncRestClient createRestClient(String agentBaseUrl) {
-        return new AsyncRestClient(agentBaseUrl);
+        if (this.messageRouterConsumer == null) {
+            this.messageRouterConsumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+        }
+        return this.messageRouterConsumer;
     }
 
-    protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
-        return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
-    }
 }
index 3c44f08..3d5da62 100644 (file)
@@ -17,6 +17,7 @@
  * limitations under the License.
  * ========================LICENSE_END===================================
  */
+
 package org.oransc.policyagent.dmaap;
 
 import com.google.gson.Gson;
@@ -56,10 +57,12 @@ public class DmaapMessageHandler {
     }
 
     public void handleDmaapMsg(String msg) {
-        this.createTask(msg) //
-            .subscribe(message -> logger.debug("handleDmaapMsg: {}", message), //
-                throwable -> logger.warn("handleDmaapMsg failure {}", throwable.getMessage()), //
-                () -> logger.debug("handleDmaapMsg complete"));
+        try {
+            String result = this.createTask(msg).block();
+            logger.debug("handleDmaapMsg: {}", result);
+        } catch (Exception throwable) {
+            logger.warn("handleDmaapMsg failure {}", throwable.getMessage());
+        }
     }
 
     Mono<String> createTask(String msg) {
index 354fdaf..c2d0d4c 100644 (file)
@@ -35,8 +35,6 @@ public interface DmaapRequestMessage {
         PUT, GET, DELETE, POST
     }
 
-    String type();
-
     String correlationId();
 
     String target();
index 5148226..e96d250 100644 (file)
@@ -37,4 +37,6 @@ public interface Policy {
     public PolicyType type();
 
     public String lastModified();
+
+    public boolean isTransient();
 }
index 12bc55a..89c8d63 100644 (file)
@@ -83,7 +83,7 @@ public class RefreshConfigTask {
     /**
      * The time between refreshes of the configuration.
      */
-    public static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+    static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
 
     final ApplicationConfig appConfig;
     @Getter(AccessLevel.PROTECTED)
index 42d9ab6..5d78207 100644 (file)
@@ -198,8 +198,16 @@ public class RicSynchronizationTask {
             .flatMapMany(notUsed -> Flux.just(policy));
     }
 
+    private boolean checkTransient(Policy policy) {
+        if (policy.isTransient()) {
+            this.policies.remove(policy);
+        }
+        return policy.isTransient();
+    }
+
     private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
         return Flux.fromIterable(policies.getForRic(ric.name())) //
+            .filter(policy -> !checkTransient(policy)) //
             .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
     }
 
index d274d5a..0fd4a33 100644 (file)
@@ -274,13 +274,23 @@ public class ApplicationTest {
         testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND);
     }
 
-    private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId) {
+    private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId,
+        boolean isTransient) {
+        String url;
         if (policyTypeName.isEmpty()) {
-            return "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName;
+            url = "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName;
         } else {
-            return "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName + "&type="
+            url = "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName + "&type="
                 + policyTypeName;
         }
+        if (isTransient) {
+            url += "&transient=true";
+        }
+        return url;
+    }
+
+    private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId) {
+        return putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId, false);
     }
 
     @Test
@@ -293,7 +303,8 @@ public class ApplicationTest {
         putService(serviceName);
         addPolicyType(policyTypeName, ricName);
 
-        String url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId);
+        // PUT a transient policy
+        String url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId, true);
         final String policyBody = jsonString();
         this.rics.getRic(ricName).setState(Ric.RicState.AVAILABLE);
 
@@ -304,6 +315,13 @@ public class ApplicationTest {
         assertThat(policy.id()).isEqualTo(policyInstanceId);
         assertThat(policy.ownerServiceName()).isEqualTo(serviceName);
         assertThat(policy.ric().name()).isEqualTo("ric1");
+        assertThat(policy.isTransient()).isEqualTo(true);
+
+        // Put a non transient policy
+        url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId);
+        restClient().put(url, policyBody).block();
+        policy = policies.getPolicy(policyInstanceId);
+        assertThat(policy.isTransient()).isEqualTo(false);
 
         url = "/policies";
         String rsp = restClient().get(url).block();
@@ -632,12 +650,15 @@ public class ApplicationTest {
 
     private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
         addRic(ric);
-        Policy p = ImmutablePolicy.builder().id(id) //
+        Policy p = ImmutablePolicy.builder() //
+            .id(id) //
             .json(jsonString()) //
             .ownerServiceName(service) //
             .ric(rics.getRic(ric)) //
             .type(addPolicyType(typeName, ric)) //
-            .lastModified("lastModified").build();
+            .lastModified("lastModified") //
+            .isTransient(false) //
+            .build();
         policies.put(p);
         return p;
     }
@@ -687,7 +708,7 @@ public class ApplicationTest {
         addPolicyType("type1", "ric");
         addPolicyType("type2", "ric");
 
-        for (int i = 0; i < 100; ++i) {
+        for (int i = 0; i < 10; ++i) {
             Thread t =
                 new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes),
                     "TestThread_" + i);
index f8f7ca3..2d57e52 100644 (file)
@@ -34,6 +34,7 @@ import org.oransc.policyagent.utils.MockA1Client;
 import org.oransc.policyagent.utils.MockA1ClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
 
 /**
  * Invoke operations over the NBI and start synchronizations in a separate
@@ -59,11 +60,26 @@ class ConcurrencyTestRunnable implements Runnable {
         this.webClient = new AsyncRestClient(baseUrl);
     }
 
+    private void printStatusInfo() {
+        try {
+            String url = "/actuator/metrics/jvm.threads.live";
+            ResponseEntity<String> result = webClient.getForEntity(url).block();
+            System.out.println(Thread.currentThread() + result.getBody());
+
+            url = "/rics";
+            result = webClient.getForEntity(url).block();
+            System.out.println(Thread.currentThread() + result.getBody());
+
+        } catch (Exception e) {
+            logger.error(Thread.currentThread() + "Concurrency test printStatusInfo exception " + e.toString());
+        }
+    }
+
     @Override
     public void run() {
         try {
-            for (int i = 0; i < 100; ++i) {
-                if (i % 10 == 0) {
+            for (int i = 0; i < 500; ++i) {
+                if (i % 100 == 0) {
                     createInconsistency();
                     this.supervision.checkAllRics();
                 }
@@ -77,6 +93,7 @@ class ConcurrencyTestRunnable implements Runnable {
             }
         } catch (Exception e) {
             logger.error("Concurrency test exception " + e.toString());
+            printStatusInfo();
         }
     }
 
@@ -90,6 +107,7 @@ class ConcurrencyTestRunnable implements Runnable {
             .ric(ric) //
             .ownerServiceName("") //
             .lastModified("") //
+            .isTransient(false) //
             .build();
     }
 
index 7bdd796..1636418 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.oransc.policyagent;
 
+import static org.awaitility.Awaitility.await;
+
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 
@@ -49,6 +51,7 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.web.server.LocalServerPort;
 import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.util.StringUtils;
 
 @ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@@ -143,14 +146,25 @@ public class MockPolicyAgent {
     private int port;
 
     private void keepServerAlive() throws InterruptedException, IOException {
-        logger.info("Keeping server alive!");
-        Thread.sleep(1000);
+        waitForConfigurationToBeLoaded();
         loadInstances();
+        logger.info("Keeping server alive!");
         synchronized (this) {
             this.wait();
         }
     }
 
+    private void waitForConfigurationToBeLoaded() throws IOException {
+        String json = getConfigJsonFromFile();
+        try {
+            int noOfRicsInConfigFile = StringUtils.countOccurrencesOf(json, "baseUrl");
+            await().until(() -> rics.size() == noOfRicsInConfigFile);
+        } catch (Exception e) {
+            logger.info("Loaded rics: {}, and no of rics in config file: {} never matched!", rics.size(),
+                StringUtils.countOccurrencesOf(json, "baseUrl"));
+        }
+    }
+
     private static String title(String jsonSchema) {
         JsonObject parsedSchema = (JsonObject) JsonParser.parseString(jsonSchema);
         String title = parsedSchema.get("title").getAsString();
@@ -160,8 +174,7 @@ public class MockPolicyAgent {
     private void loadInstances() throws IOException {
         PolicyType unnamedPolicyType = policyTypes.get("");
         Ric ric = rics.get("ric1");
-        File jsonFile = getFile("test_application_configuration.json");
-        String json = new String(Files.readAllBytes(jsonFile.toPath()));
+        String json = getConfigJsonFromFile();
 
         Policy policy = ImmutablePolicy.builder() //
             .id("typelessPolicy") //
@@ -170,13 +183,19 @@ public class MockPolicyAgent {
             .ric(ric) //
             .type(unnamedPolicyType) //
             .lastModified("now") //
+            .isTransient(false) //
             .build();
         this.policies.put(policy);
     }
 
+    private String getConfigJsonFromFile() throws IOException {
+        File jsonFile = getFile("test_application_configuration.json");
+        String json = new String(Files.readAllBytes(jsonFile.toPath()));
+        return json;
+    }
+
     @Test
-    @SuppressWarnings("squid:S2699") // Tests should include assertions. This test is only for keeping the server
-                                     // alive,
+    @SuppressWarnings("squid:S2699") // Tests should include assertions. This test is only for keeping the server alive,
                                      // so it will only be confusing to add an assertion.
     public void runMock() throws Exception {
         keepServerAlive();
index 79c8b11..722fea7 100644 (file)
@@ -63,6 +63,7 @@ public class A1ClientHelper {
             .ric(createRic(nearRtRicUrl)) //
             .type(createPolicyType(type)) //
             .lastModified("now") //
+            .isTransient(false) //
             .build();
     }
 
index fc4e7ce..7b338cc 100644 (file)
@@ -23,11 +23,9 @@ package org.oransc.policyagent.dmaap;
 import static ch.qos.logback.classic.Level.WARN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -49,12 +47,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.oransc.policyagent.utils.LoggingUtils;
 import org.springframework.http.HttpStatus;
 
@@ -86,7 +81,7 @@ public class DmaapMessageConsumerTest {
         messageConsumerUnderTest.start().join();
 
         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
-        orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+        orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
     }
 
@@ -103,7 +98,7 @@ public class DmaapMessageConsumerTest {
 
         InOrder orderVerifier = inOrder(messageConsumerUnderTest);
         orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
-        orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+        orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
     }
 
     @Test
@@ -116,7 +111,7 @@ public class DmaapMessageConsumerTest {
         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
         response.setActualMessages(Collections.emptyList());
 
-        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
@@ -134,7 +129,7 @@ public class DmaapMessageConsumerTest {
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
-        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
 
@@ -159,11 +154,10 @@ public class DmaapMessageConsumerTest {
 
     @Test
     public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
-        Properties properties = setUpMrConfig();
-
+        setUpMrConfig();
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
 
@@ -174,20 +168,10 @@ public class DmaapMessageConsumerTest {
         response.setActualMessages(messages);
         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
 
-        doReturn(messageHandlerMock).when(messageConsumerUnderTest)
-            .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
-
-        AsyncRestClient restClientMock = mock(AsyncRestClient.class);
-        doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
-
-        MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
-        doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
+        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
 
         messageConsumerUnderTest.start().join();
 
-        verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
-        verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
-
         verify(messageHandlerMock).handleDmaapMsg(responseMessage);
         verifyNoMoreInteractions(messageHandlerMock);
     }
index 52147a8..6b3457b 100644 (file)
@@ -89,7 +89,8 @@ public class DmaapMessageHandlerTest {
         Optional<JsonObject> payload =
             ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
                 : Optional.empty());
-        return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") //
+        return ImmutableDmaapRequestMessage.builder() //
+            .apiVersion("apiVersion") //
             .correlationId("correlationId") //
             .operation(operation) //
             .originatorId("originatorId") //
@@ -97,7 +98,6 @@ public class DmaapMessageHandlerTest {
             .requestId("requestId") //
             .target("target") //
             .timestamp("timestamp") //
-            .type("type") //
             .url(URL) //
             .build();
     }
index ee7e04f..5c1cc14 100644 (file)
@@ -41,15 +41,16 @@ public class LockTest {
         try {
             Thread.sleep(100);
         } catch (InterruptedException e) {
+            // Do nothing.
         }
     }
 
     private void asynchUnlock(Lock lock) {
-        Thread t = new Thread(() -> {
+        Thread thread = new Thread(() -> {
             sleep();
             lock.unlockBlocking();
         });
-        t.start();
+        thread.start();
     }
 
     @Test
index cb91133..6edd4e3 100644 (file)
@@ -310,6 +310,7 @@ public class RefreshConfigTaskTest {
             .ric(ric) //
             .json("{}") //
             .ownerServiceName("ownerServiceName") //
+            .isTransient(false) //
             .build();
         return policy;
     }
index 73ca351..a42142b 100644 (file)
@@ -78,6 +78,7 @@ public class RicSupervisionTest {
         .ric(RIC_1) //
         .type(POLICY_TYPE_1) //
         .lastModified("now") //
+        .isTransient(false) //
         .build();
 
     private static final Policy POLICY_2 = ImmutablePolicy.builder() //
@@ -87,6 +88,7 @@ public class RicSupervisionTest {
         .ric(RIC_1) //
         .type(POLICY_TYPE_1) //
         .lastModified("now") //
+        .isTransient(false) //
         .build();
 
     @Mock
index f6d2ade..c8ebe27 100644 (file)
@@ -80,14 +80,19 @@ public class RicSynchronizationTaskTest {
         .controllerName("controllerName") //
         .build());
 
-    private static final Policy POLICY_1 = ImmutablePolicy.builder() //
-        .id("policyId1") //
-        .json("") //
-        .ownerServiceName("service") //
-        .ric(RIC_1) //
-        .type(POLICY_TYPE_1) //
-        .lastModified("now") //
-        .build();
+    private static Policy createPolicy(boolean isTransient) {
+        return ImmutablePolicy.builder() //
+            .id("policyId1") //
+            .json("") //
+            .ownerServiceName("service") //
+            .ric(RIC_1) //
+            .type(POLICY_TYPE_1) //
+            .lastModified("now") //
+            .isTransient(isTransient) //
+            .build();
+    }
+
+    private static final Policy POLICY_1 = createPolicy(false);
 
     private static final String SERVICE_1_NAME = "service1";
     private static final String SERVICE_1_CALLBACK_URL = "callbackUrl";
@@ -196,6 +201,9 @@ public class RicSynchronizationTaskTest {
     public void ricIdleAndHavePolicies_thenSynchronizationWithRecreationOfPolicies() {
         RIC_1.setState(RicState.AVAILABLE);
 
+        Policy transientPolicy = createPolicy(true);
+
+        policies.put(transientPolicy);
         policies.put(POLICY_1);
 
         setUpCreationOfA1Client();
@@ -214,7 +222,7 @@ public class RicSynchronizationTaskTest {
         verifyNoMoreInteractions(a1ClientMock);
 
         assertThat(policyTypes.size()).isEqualTo(0);
-        assertThat(policies.size()).isEqualTo(1);
+        assertThat(policies.size()).isEqualTo(1); // The transient policy shall be deleted
         assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
     }
 
index f26083a..90a3581 100644 (file)
@@ -88,6 +88,7 @@ public class ServiceSupervisionTest {
         .ric(ric) //
         .type(policyType) //
         .lastModified("lastModified") //
+        .isTransient(false) //
         .build();
 
     @Test
index 446c061..0122fb9 100644 (file)
                "kista_4"
             ]
          }
-      ]
+      ],
+      "streams_publishes":{
+         "dmaap_publisher":{
+            "type":"message_router",
+            "dmaap_info":{
+               "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+            }
+         }
+      },
+      "streams_subscribes":{
+         "dmaap_subscriber":{
+            "type":"message_router",
+            "dmaap_info":{
+               "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+            }
+         }
+      }
    }
-}
\ No newline at end of file
+}
index 2a586ba..e37d656 100644 (file)
             <version>1.10.19</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>3.14.6</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.json</groupId>
             <artifactId>json</artifactId>
index 9a625b1..392b1d6 100644 (file)
@@ -78,6 +78,7 @@ public class NonrtRicApiProvider implements AutoCloseable, A1ADAPTERAPIService {
   protected static final String NO_SERVICE_LOGIC_ACTIVE = "No service logic active for ";
   private static final String NON_NULL_PARAM = "non-null";
   private static final String NULL_PARAM = "null";
+  private static final String REST_CLIENT_RESPONSE_EXCEPTION_MSG = "Caught RestClientResponseException: {}";
 
   private final Logger log = LoggerFactory.getLogger(NonrtRicApiProvider.class);
   private final ExecutorService executor;
@@ -96,7 +97,6 @@ public class NonrtRicApiProvider implements AutoCloseable, A1ADAPTERAPIService {
     setNotificationService(notificationPublishService);
     setRpcRegistry(rpcProviderRegistry);
     initialize();
-
   }
 
   public void initialize() {
@@ -157,80 +157,80 @@ public class NonrtRicApiProvider implements AutoCloseable, A1ADAPTERAPIService {
   @Override
   public ListenableFuture<RpcResult<PutA1PolicyOutput>> putA1Policy(PutA1PolicyInput input) {
     log.info("Start of putPolicy");
-    PutA1PolicyOutputBuilder responseBuilder = new PutA1PolicyOutputBuilder();
+    PutA1PolicyOutputBuilder putPolicyResponseBuilder = new PutA1PolicyOutputBuilder();
 
     try {
         final Uri uri = input.getNearRtRicUrl();
         log.info("PUT Request input.GetA1Policy() : {} ", uri);
-        ResponseEntity<String> response = restAdapter.put(uri.getValue(), input.getBody(), String.class);
-        if (response.hasBody()) {
-            log.info("Response PutA1Policy : {} ", response.getBody());
-            responseBuilder.setBody(response.getBody());
+        ResponseEntity<String> putPolicyResponse = restAdapter.put(uri.getValue(), input.getBody(), String.class);
+        if (putPolicyResponse.hasBody()) {
+            log.info("Response PutA1Policy : {} ", putPolicyResponse.getBody());
+            putPolicyResponseBuilder.setBody(putPolicyResponse.getBody());
         }
-        responseBuilder.setHttpStatus(response.getStatusCodeValue());
+        putPolicyResponseBuilder.setHttpStatus(putPolicyResponse.getStatusCodeValue());
     } catch (RestClientResponseException ex) {
-        log.error("Caught RestClientResponseException: {}", ex.getMessage());
+        log.error(REST_CLIENT_RESPONSE_EXCEPTION_MSG, ex.getMessage());
         if (ex.getResponseBodyAsByteArray() != null) {
-            responseBuilder.setBody(ex.getResponseBodyAsString());
+            putPolicyResponseBuilder.setBody(ex.getResponseBodyAsString());
         }
-        responseBuilder.setHttpStatus(ex.getRawStatusCode());
+        putPolicyResponseBuilder.setHttpStatus(ex.getRawStatusCode());
     }
 
     log.info("End of PutA1Policy");
     RpcResult<PutA1PolicyOutput> rpcResult = RpcResultBuilder.<PutA1PolicyOutput>status(true)
-        .withResult(responseBuilder.build()).build();
+        .withResult(putPolicyResponseBuilder.build()).build();
     return Futures.immediateFuture(rpcResult);
   }
 
   @Override
   public ListenableFuture<RpcResult<DeleteA1PolicyOutput>> deleteA1Policy(DeleteA1PolicyInput input) {
     log.info("Start of DeleteA1Policy");
-    DeleteA1PolicyOutputBuilder responseBuilder = new DeleteA1PolicyOutputBuilder();
+    DeleteA1PolicyOutputBuilder deletePolicyResponseBuilder = new DeleteA1PolicyOutputBuilder();
 
     try {
         final Uri uri = input.getNearRtRicUrl();
-        ResponseEntity<Object> response = restAdapter.delete(uri.getValue());
-        if (response.hasBody()) {
-            log.info("Response DeleteA1Policy : {} ", response.getBody());
-            responseBuilder.setBody(response.getBody().toString());
+        ResponseEntity<Object> deletePolicyResponse = restAdapter.delete(uri.getValue());
+        if (deletePolicyResponse.hasBody()) {
+            log.info("Response DeleteA1Policy : {} ", deletePolicyResponse.getBody());
+            deletePolicyResponseBuilder.setBody(deletePolicyResponse.getBody().toString());
         }
-        responseBuilder.setHttpStatus(response.getStatusCodeValue());
+        deletePolicyResponseBuilder.setHttpStatus(deletePolicyResponse.getStatusCodeValue());
     } catch (RestClientResponseException ex) {
-        log.error("Caught RestClientResponseException: {}", ex.getMessage());
+        log.error(REST_CLIENT_RESPONSE_EXCEPTION_MSG, ex.getMessage());
         if (ex.getResponseBodyAsByteArray() != null) {
-            responseBuilder.setBody(ex.getResponseBodyAsString());
+            deletePolicyResponseBuilder.setBody(ex.getResponseBodyAsString());
         }
-        responseBuilder.setHttpStatus(ex.getRawStatusCode());
+        deletePolicyResponseBuilder.setHttpStatus(ex.getRawStatusCode());
     }
 
     log.info("End of DeleteA1Policy");
     RpcResult<DeleteA1PolicyOutput> rpcResult = RpcResultBuilder.<DeleteA1PolicyOutput>status(true)
-        .withResult(responseBuilder.build()).build();
+        .withResult(deletePolicyResponseBuilder.build()).build();
     return Futures.immediateFuture(rpcResult);
   }
 
-  private GetA1PolicyOutput getA1(GetA1PolicyInput input) {
+  protected GetA1PolicyOutput getA1(GetA1PolicyInput input) {
     log.info("Start of getA1");
-    GetA1PolicyOutputBuilder responseBuilder = new GetA1PolicyOutputBuilder();
+    GetA1PolicyOutputBuilder getPolicyResponseBuilder = new GetA1PolicyOutputBuilder();
 
     try {
         final Uri uri = input.getNearRtRicUrl();
-        ResponseEntity<String> response = restAdapter.get(uri.getValue(), String.class);
-        if (response.hasBody()) {
-            log.info("Response getA1 : {} ", response.getBody());
-            responseBuilder.setBody(response.getBody());
+        ResponseEntity<String> getPolicyResponse = restAdapter.get(uri.getValue(), String.class);
+        if (getPolicyResponse.hasBody()) {
+            log.info("Response getA1 : {} ", getPolicyResponse.getBody());
+            getPolicyResponseBuilder.setBody(getPolicyResponse.getBody());
         }
-        responseBuilder.setHttpStatus(response.getStatusCodeValue());
+        getPolicyResponseBuilder.setHttpStatus(getPolicyResponse.getStatusCodeValue());
     } catch (RestClientResponseException ex) {
-        log.error("Caught RestClientResponseException: {}", ex.getMessage());
+        log.error(REST_CLIENT_RESPONSE_EXCEPTION_MSG, ex.getMessage());
         if (ex.getResponseBodyAsByteArray() != null) {
-            responseBuilder.setBody(ex.getResponseBodyAsString());
+            getPolicyResponseBuilder.setBody(ex.getResponseBodyAsString());
         }
-        responseBuilder.setHttpStatus(ex.getRawStatusCode());
+        getPolicyResponseBuilder.setHttpStatus(ex.getRawStatusCode());
     }
 
     log.info("End of getA1");
-    return responseBuilder.build();
+    return getPolicyResponseBuilder.build();
   }
 
   @Override
@@ -243,31 +243,31 @@ public class NonrtRicApiProvider implements AutoCloseable, A1ADAPTERAPIService {
 
   @Override
   public ListenableFuture<RpcResult<GetA1PolicyStatusOutput>> getA1PolicyStatus(GetA1PolicyStatusInput input) {
-    GetA1PolicyInputBuilder getInputBuilder = new GetA1PolicyInputBuilder();
-    getInputBuilder.setNearRtRicUrl(input.getNearRtRicUrl());
-    GetA1PolicyOutput getOutput = getA1(getInputBuilder.build());
+    GetA1PolicyInputBuilder getPolicyStatusInputBuilder = new GetA1PolicyInputBuilder();
+    getPolicyStatusInputBuilder.setNearRtRicUrl(input.getNearRtRicUrl());
+    GetA1PolicyOutput getOutput = getA1(getPolicyStatusInputBuilder.build());
 
-    GetA1PolicyStatusOutputBuilder outputBuilder = new GetA1PolicyStatusOutputBuilder();
-    outputBuilder.setBody(getOutput.getBody());
-    outputBuilder.setHttpStatus(getOutput.getHttpStatus());
+    GetA1PolicyStatusOutputBuilder getPolicyStatusoutputBuilder = new GetA1PolicyStatusOutputBuilder();
+    getPolicyStatusoutputBuilder.setBody(getOutput.getBody());
+    getPolicyStatusoutputBuilder.setHttpStatus(getOutput.getHttpStatus());
 
     return Futures.immediateFuture(RpcResultBuilder.<GetA1PolicyStatusOutput>status(true) //
-        .withResult(outputBuilder.build()) //
+        .withResult(getPolicyStatusoutputBuilder.build()) //
         .build());
   }
 
   @Override
   public ListenableFuture<RpcResult<GetA1PolicyTypeOutput>> getA1PolicyType(GetA1PolicyTypeInput input) {
-    GetA1PolicyInputBuilder getInputBuilder = new GetA1PolicyInputBuilder();
-    getInputBuilder.setNearRtRicUrl(input.getNearRtRicUrl());
-    GetA1PolicyOutput getOutput = getA1(getInputBuilder.build());
+    GetA1PolicyInputBuilder getPolicyTypeInputBuilder = new GetA1PolicyInputBuilder();
+    getPolicyTypeInputBuilder.setNearRtRicUrl(input.getNearRtRicUrl());
+    GetA1PolicyOutput getOutput = getA1(getPolicyTypeInputBuilder.build());
 
-    GetA1PolicyTypeOutputBuilder outputBuilder = new GetA1PolicyTypeOutputBuilder();
-    outputBuilder.setBody(getOutput.getBody());
-    outputBuilder.setHttpStatus(getOutput.getHttpStatus());
+    GetA1PolicyTypeOutputBuilder getPolicyTypeOutputBuilder = new GetA1PolicyTypeOutputBuilder();
+    getPolicyTypeOutputBuilder.setBody(getOutput.getBody());
+    getPolicyTypeOutputBuilder.setHttpStatus(getOutput.getHttpStatus());
 
     return Futures.immediateFuture(RpcResultBuilder.<GetA1PolicyTypeOutput>status(true) //
-        .withResult(outputBuilder.build()) //
+        .withResult(getPolicyTypeOutputBuilder.build()) //
         .build());
   }
 
  * ============LICENSE_END=========================================================
  */
 
-package org.o_ran_sc.nonrtric.sdnc_a1.northbound;
+package org.o_ran_sc.nonrtric.sdnc_a1.northbound.provider;
 
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
-
 import java.util.concurrent.ExecutionException;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -32,7 +30,6 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.o_ran_sc.nonrtric.sdnc_a1.northbound.provider.NonrtRicApiProvider;
 import org.o_ran_sc.nonrtric.sdnc_a1.northbound.restadapter.RestAdapter;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
@@ -53,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestClientResponseException;
 
 /**
  * This class Tests all the methods in NonrtRicApiProvider
@@ -82,7 +80,7 @@ public class NonrtRicApiProviderTest extends AbstractConcurrentDataBrokerTest {
   }
 
   @Test
-  public void testGetA1Policy() throws InterruptedException, ExecutionException {
+  public void testGetA1PolicySuccess() throws InterruptedException, ExecutionException {
     GetA1PolicyInputBuilder inputBuilder = new GetA1PolicyInputBuilder();
     inputBuilder.setNearRtRicUrl(nearRtRicUrl);
     Whitebox.setInternalState(nonrtRicApiProvider, "restAdapter", restAdapter);
@@ -95,7 +93,7 @@ public class NonrtRicApiProviderTest extends AbstractConcurrentDataBrokerTest {
   }
 
   @Test
-  public void testGetA1PolicyType() throws InterruptedException, ExecutionException {
+  public void testGetA1PolicyTypeSuccess() throws InterruptedException, ExecutionException {
     GetA1PolicyTypeInputBuilder inputBuilder = new GetA1PolicyTypeInputBuilder();
     inputBuilder.setNearRtRicUrl(nearRtRicUrl);
     Whitebox.setInternalState(nonrtRicApiProvider, "restAdapter", restAdapter);
@@ -108,7 +106,7 @@ public class NonrtRicApiProviderTest extends AbstractConcurrentDataBrokerTest {
   }
 
   @Test
-  public void testGetA1PolicyStatus() throws InterruptedException, ExecutionException {
+  public void testGetA1PolicyStatusSuccess() throws InterruptedException, ExecutionException {
     GetA1PolicyStatusInputBuilder inputBuilder = new GetA1PolicyStatusInputBuilder();
     inputBuilder.setNearRtRicUrl(nearRtRicUrl);
     Whitebox.setInternalState(nonrtRicApiProvider, "restAdapter", restAdapter);
@@ -121,7 +119,21 @@ public class NonrtRicApiProviderTest extends AbstractConcurrentDataBrokerTest {
   }
 
   @Test
-  public void testPutA1Policy() throws InterruptedException, ExecutionException {
+  public void testGetA1Failure() throws InterruptedException, ExecutionException {
+    GetA1PolicyInputBuilder inputBuilder = new GetA1PolicyInputBuilder();
+    inputBuilder.setNearRtRicUrl(nearRtRicUrl);
+    Whitebox.setInternalState(nonrtRicApiProvider, "restAdapter", restAdapter);
+    String returnedBody = "GET failed";
+    int returnedStatusCode = 404;
+    when(restAdapter.get(eq(nearRtRicUrl.getValue()), eq(String.class)))
+    .thenThrow(new RestClientResponseException(null, returnedStatusCode, null, null, returnedBody.getBytes(), null));
+    GetA1PolicyOutput result = nonrtRicApiProvider.getA1(inputBuilder.build());
+    Assert.assertEquals(returnedBody, result.getBody());
+    Assert.assertTrue(returnedStatusCode == result.getHttpStatus());
+  }
+
+  @Test
+  public void testPutA1PolicySuccess() throws InterruptedException, ExecutionException {
     PutA1PolicyInputBuilder inputBuilder = new PutA1PolicyInputBuilder();
     String testPolicy = "{}";
     inputBuilder.setNearRtRicUrl(nearRtRicUrl);
@@ -136,15 +148,49 @@ public class NonrtRicApiProviderTest extends AbstractConcurrentDataBrokerTest {
   }
 
   @Test
-  public void testDeleteA1() throws InterruptedException, ExecutionException {
+  public void testPutA1PolicyFailure() throws InterruptedException, ExecutionException {
+    PutA1PolicyInputBuilder inputBuilder = new PutA1PolicyInputBuilder();
+    String testPolicy = "{}";
+    inputBuilder.setNearRtRicUrl(nearRtRicUrl);
+    inputBuilder.setBody(testPolicy);
+    Whitebox.setInternalState(nonrtRicApiProvider, "restAdapter", restAdapter);
+    String returnedBody = "PUT failed";
+    int returnedStatusCode = 400;
+    when(restAdapter.put(eq(nearRtRicUrl.getValue()), eq(testPolicy), eq(String.class)))
+    .thenThrow(new RestClientResponseException(null, returnedStatusCode, null, null, returnedBody.getBytes(), null));
+    PutA1PolicyOutput result = nonrtRicApiProvider.putA1Policy(inputBuilder.build()).get().getResult();
+    Assert.assertEquals(returnedBody, result.getBody());
+    Assert.assertTrue(returnedStatusCode == result.getHttpStatus());
+  }
+
+  @Test
+  public void testDeleteA1Success() throws InterruptedException, ExecutionException {
     DeleteA1PolicyInputBuilder inputBuilder = new DeleteA1PolicyInputBuilder();
     inputBuilder.setNearRtRicUrl(nearRtRicUrl);
     Whitebox.setInternalState(nonrtRicApiProvider, "restAdapter", restAdapter);
+    ResponseEntity<Object> getResponseNoContent = new ResponseEntity<>(HttpStatus.NO_CONTENT);
+    String returnedBody = "returned body";
+    ResponseEntity<Object> getResponseOk = new ResponseEntity<>(returnedBody, HttpStatus.OK);
+    when(restAdapter.delete(nearRtRicUrl.getValue())).thenReturn(getResponseNoContent).thenReturn(getResponseOk);
+    DeleteA1PolicyOutput resultNoContent = nonrtRicApiProvider.deleteA1Policy(inputBuilder.build()).get().getResult();
+    Assert.assertTrue(HttpStatus.NO_CONTENT.value() == resultNoContent.getHttpStatus());
+    DeleteA1PolicyOutput resultOk = nonrtRicApiProvider.deleteA1Policy(inputBuilder.build()).get().getResult();
+    Assert.assertEquals(returnedBody, resultOk.getBody());
+    Assert.assertTrue(HttpStatus.OK.value() == resultOk.getHttpStatus());
+  }
 
-    ResponseEntity<Object> getResponse = new ResponseEntity<>(HttpStatus.NO_CONTENT);
-    when(restAdapter.delete(nearRtRicUrl.getValue())).thenReturn(getResponse);
+  @Test
+  public void testDeleteA1Failure() throws InterruptedException, ExecutionException {
+    DeleteA1PolicyInputBuilder inputBuilder = new DeleteA1PolicyInputBuilder();
+    inputBuilder.setNearRtRicUrl(nearRtRicUrl);
+    Whitebox.setInternalState(nonrtRicApiProvider, "restAdapter", restAdapter);
+    String returnedBody = "DELETE failed";
+    int returnedStatusCode = 404;
+    when(restAdapter.delete(nearRtRicUrl.getValue()))
+    .thenThrow(new RestClientResponseException(null, returnedStatusCode, null, null, returnedBody.getBytes(), null));
     DeleteA1PolicyOutput result = nonrtRicApiProvider.deleteA1Policy(inputBuilder.build()).get().getResult();
-    Assert.assertTrue(HttpStatus.NO_CONTENT.value() == result.getHttpStatus());
+    Assert.assertEquals(returnedBody, result.getBody());
+    Assert.assertTrue(returnedStatusCode == result.getHttpStatus());
   }
 
 }
diff --git a/sdnc-a1-controller/northbound/nonrt-ric-api/provider/src/test/java/org/o_ran_sc/nonrtric/sdnc_a1/northbound/restadapter/RestAdapterImplTest.java b/sdnc-a1-controller/northbound/nonrt-ric-api/provider/src/test/java/org/o_ran_sc/nonrtric/sdnc_a1/northbound/restadapter/RestAdapterImplTest.java
new file mode 100644 (file)
index 0000000..8e1d806
--- /dev/null
@@ -0,0 +1,122 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2020 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.o_ran_sc.nonrtric.sdnc_a1.northbound.restadapter;
+
+import java.io.IOException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestClientException;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+
+public class RestAdapterImplTest {
+    private static MockWebServer mockWebServer;
+    private static RestAdapter adapterUnderTest;
+
+    private static final String VALID_PROTOCOL = "http";
+    private static final String INVALID_PROTOCOL = "ftp";
+    private static final String REQUEST_URL = "/test";
+    private static final String TEST_BODY = "test";
+    private static final int SUCCESS_CODE = 200;
+    private static final int ERROR_CODE = 500;
+
+    @Before
+    public void init() throws IOException {
+        mockWebServer = new MockWebServer();
+        mockWebServer.start();
+        adapterUnderTest = new RestAdapterImpl();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        mockWebServer.shutdown();
+    }
+
+    @Test
+    public void testInvalidUrlOrProtocol() throws InterruptedException {
+        ResponseEntity<String> response = adapterUnderTest.get("://localhost:" + mockWebServer.getPort() + REQUEST_URL,
+                String.class);
+        Assert.assertTrue(HttpStatus.BAD_REQUEST.value() == response.getStatusCodeValue());
+        response = adapterUnderTest.get(INVALID_PROTOCOL + "://localhost:" + mockWebServer.getPort() + REQUEST_URL,
+                String.class);
+        Assert.assertTrue(HttpStatus.BAD_REQUEST.value() == response.getStatusCodeValue());
+    }
+
+    @Test
+    public void testGetNoError() throws InterruptedException {
+        mockWebServer.enqueue(new MockResponse().setResponseCode(SUCCESS_CODE).setBody(TEST_BODY));
+        ResponseEntity<String> response = adapterUnderTest.get(VALID_PROTOCOL + "://localhost:"
+                + mockWebServer.getPort() + REQUEST_URL, String.class);
+        RecordedRequest recordedRequest = mockWebServer.takeRequest();
+        Assert.assertEquals(TEST_BODY, response.getBody());
+        Assert.assertTrue(SUCCESS_CODE == response.getStatusCodeValue());
+        Assert.assertEquals("GET", recordedRequest.getMethod());
+        Assert.assertEquals(REQUEST_URL, recordedRequest.getPath());
+    }
+
+    @Test(expected = RestClientException.class)
+    public void testGetError() {
+        mockWebServer.enqueue(new MockResponse().setResponseCode(ERROR_CODE));
+        adapterUnderTest.get(VALID_PROTOCOL + "://localhost:" + mockWebServer.getPort() + REQUEST_URL, String.class);
+    }
+
+    @Test
+    public void testPutNoError() throws InterruptedException {
+        mockWebServer.enqueue(new MockResponse().setResponseCode(SUCCESS_CODE).setBody(TEST_BODY));
+        ResponseEntity<String> response = adapterUnderTest.put(VALID_PROTOCOL + "://localhost:"
+                + mockWebServer.getPort() + REQUEST_URL, TEST_BODY, String.class);
+        RecordedRequest recordedRequest = mockWebServer.takeRequest();
+        Assert.assertEquals(TEST_BODY, response.getBody());
+        Assert.assertTrue(SUCCESS_CODE == response.getStatusCodeValue());
+        Assert.assertEquals("PUT", recordedRequest.getMethod());
+        Assert.assertEquals(REQUEST_URL, recordedRequest.getPath());
+        Assert.assertEquals(TEST_BODY, recordedRequest.getBody().readUtf8());
+    }
+
+    @Test(expected = RestClientException.class)
+    public void testPutError() {
+        mockWebServer.enqueue(new MockResponse().setResponseCode(ERROR_CODE));
+        adapterUnderTest.put(VALID_PROTOCOL + "://localhost:" + mockWebServer.getPort() + REQUEST_URL, TEST_BODY,
+                String.class);
+    }
+
+    @Test
+    public void testDeleteNoError() throws InterruptedException {
+        mockWebServer.enqueue(new MockResponse().setResponseCode(SUCCESS_CODE));
+        ResponseEntity<String> response = adapterUnderTest.delete(VALID_PROTOCOL + "://localhost:"
+                + mockWebServer.getPort() + REQUEST_URL);
+        RecordedRequest recordedRequest = mockWebServer.takeRequest();
+        Assert.assertTrue(SUCCESS_CODE == response.getStatusCodeValue());
+        Assert.assertEquals("DELETE", recordedRequest.getMethod());
+        Assert.assertEquals(REQUEST_URL, recordedRequest.getPath());
+    }
+
+    @Test(expected = RestClientException.class)
+    public void testDeleteError() {
+        mockWebServer.enqueue(new MockResponse().setResponseCode(ERROR_CODE));
+        adapterUnderTest.delete(VALID_PROTOCOL + "://localhost:" + mockWebServer.getPort() + REQUEST_URL);
+    }
+}