Bugfix, creating only one DmaapProducer 71/3671/2
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 12 May 2020 11:27:37 +0000 (13:27 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 12 May 2020 14:10:26 +0000 (16:10 +0200)
Each DmaapProducer starts a thread that is never terminated.
This class works around that by only creating one DmaapProducer
and will reuse that.

Change-Id: I3f8f9369a948f1cb5e3ae119c62c03bf4f5d180a
Issue-ID: NONRTRIC-210
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
policy-agent/config/application.yaml
policy-agent/pom.xml
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapRequestMessage.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
policy-agent/src/test/resources/test_application_configuration.json

index b128494..4010219 100644 (file)
@@ -9,7 +9,7 @@ management:
   endpoints:
     web:
       exposure:
-        include: "loggers,logfile,health,info,metrics"
+        include: "loggers,logfile,health,info,metrics,threaddump"
 
 logging:
   level:
index 8ed3368..ed1cbf1 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
-        <version>2.2.4.RELEASE</version>
+        <version>2.2.7.RELEASE</version>
         <relativePath />
     </parent>
     <groupId>org.o-ran-sc.nonrtric</groupId>
index 6312e37..9165af5 100644 (file)
@@ -41,13 +41,14 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 /**
- * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the
- * connection with the Kafka open until requests are sent in.
+ * The class fetches incoming requests from DMAAP. It uses the timeout parameter
+ * that lets the MessageRouter keep the connection with the Kafka open until
+ * requests are sent in.
  *
  * <p>
- * If there is no DMaaP configuration in the application configuration, then this service will regularly check the
- * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the
- * service will stop polling and resume checking for configuration.
+ * this service will regularly check the configuration and start polling DMaaP
+ * if the configuration is added. If the DMaaP configuration is removed, then
+ * the service will stop polling and resume checking for configuration.
  *
  * <p>
  * Each received request is processed by {@link DmaapMessageHandler}.
@@ -61,6 +62,9 @@ public class DmaapMessageConsumer {
 
     private final ApplicationConfig applicationConfig;
 
+    private DmaapMessageHandler dmaapMessageHandler = null;
+    private MRConsumer messageRouterConsumer = null;
+
     @Value("${server.port}")
     private int localServerPort;
 
@@ -140,13 +144,15 @@ public class DmaapMessageConsumer {
         getDmaapMessageHandler().handleDmaapMsg(msg);
     }
 
-    private DmaapMessageHandler getDmaapMessageHandler() throws IOException {
-        String agentBaseUrl = "https://localhost:" + this.localServerPort;
-        AsyncRestClient agentClient = createRestClient(agentBaseUrl);
-        Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
-        MRBatchingPublisher producer = getMessageRouterPublisher(dmaapPublisherProperties);
-
-        return createDmaapMessageHandler(agentClient, producer);
+    protected DmaapMessageHandler getDmaapMessageHandler() throws IOException {
+        if (this.dmaapMessageHandler == null) {
+            String agentBaseUrl = "https://localhost:" + this.localServerPort;
+            AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
+            Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+            MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+            this.dmaapMessageHandler = new DmaapMessageHandler(producer, agentClient);
+        }
+        return this.dmaapMessageHandler;
     }
 
     protected void sleep(Duration duration) {
@@ -158,18 +164,10 @@ public class DmaapMessageConsumer {
     }
 
     protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
-        return MRClientFactory.createConsumer(dmaapConsumerProperties);
-    }
-
-    protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
-        return new DmaapMessageHandler(producer, agentClient);
-    }
-
-    protected AsyncRestClient createRestClient(String agentBaseUrl) {
-        return new AsyncRestClient(agentBaseUrl);
+        if (this.messageRouterConsumer == null) {
+            this.messageRouterConsumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+        }
+        return this.messageRouterConsumer;
     }
 
-    protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
-        return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
-    }
 }
index 354fdaf..c2d0d4c 100644 (file)
@@ -35,8 +35,6 @@ public interface DmaapRequestMessage {
         PUT, GET, DELETE, POST
     }
 
-    String type();
-
     String correlationId();
 
     String target();
index d274d5a..a107bdf 100644 (file)
@@ -687,7 +687,7 @@ public class ApplicationTest {
         addPolicyType("type1", "ric");
         addPolicyType("type2", "ric");
 
-        for (int i = 0; i < 100; ++i) {
+        for (int i = 0; i < 10; ++i) {
             Thread t =
                 new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes),
                     "TestThread_" + i);
index f8f7ca3..4a707dd 100644 (file)
@@ -34,6 +34,7 @@ import org.oransc.policyagent.utils.MockA1Client;
 import org.oransc.policyagent.utils.MockA1ClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
 
 /**
  * Invoke operations over the NBI and start synchronizations in a separate
@@ -59,11 +60,26 @@ class ConcurrencyTestRunnable implements Runnable {
         this.webClient = new AsyncRestClient(baseUrl);
     }
 
+    private void printStatusInfo() {
+        try {
+            String url = "/actuator/metrics/jvm.threads.live";
+            ResponseEntity<String> result = webClient.getForEntity(url).block();
+            System.out.println(Thread.currentThread() + result.getBody());
+
+            url = "/rics";
+            result = webClient.getForEntity(url).block();
+            System.out.println(Thread.currentThread() + result.getBody());
+
+        } catch (Exception e) {
+            logger.error(Thread.currentThread() + "Concurrency test printStatusInfo exception " + e.toString());
+        }
+    }
+
     @Override
     public void run() {
         try {
-            for (int i = 0; i < 100; ++i) {
-                if (i % 10 == 0) {
+            for (int i = 0; i < 500; ++i) {
+                if (i % 100 == 0) {
                     createInconsistency();
                     this.supervision.checkAllRics();
                 }
@@ -77,6 +93,7 @@ class ConcurrencyTestRunnable implements Runnable {
             }
         } catch (Exception e) {
             logger.error("Concurrency test exception " + e.toString());
+            printStatusInfo();
         }
     }
 
index fc4e7ce..b9696ad 100644 (file)
@@ -23,11 +23,9 @@ package org.oransc.policyagent.dmaap;
 import static ch.qos.logback.classic.Level.WARN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -49,10 +47,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
 import org.onap.dmaap.mr.client.MRConsumer;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.oransc.policyagent.utils.LoggingUtils;
@@ -159,8 +155,7 @@ public class DmaapMessageConsumerTest {
 
     @Test
     public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
-        Properties properties = setUpMrConfig();
-
+        setUpMrConfig();
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
         doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
@@ -174,20 +169,10 @@ public class DmaapMessageConsumerTest {
         response.setActualMessages(messages);
         when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
 
-        doReturn(messageHandlerMock).when(messageConsumerUnderTest)
-            .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
-
-        AsyncRestClient restClientMock = mock(AsyncRestClient.class);
-        doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
-
-        MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
-        doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
+        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
 
         messageConsumerUnderTest.start().join();
 
-        verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
-        verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
-
         verify(messageHandlerMock).handleDmaapMsg(responseMessage);
         verifyNoMoreInteractions(messageHandlerMock);
     }
index 52147a8..6b3457b 100644 (file)
@@ -89,7 +89,8 @@ public class DmaapMessageHandlerTest {
         Optional<JsonObject> payload =
             ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
                 : Optional.empty());
-        return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") //
+        return ImmutableDmaapRequestMessage.builder() //
+            .apiVersion("apiVersion") //
             .correlationId("correlationId") //
             .operation(operation) //
             .originatorId("originatorId") //
@@ -97,7 +98,6 @@ public class DmaapMessageHandlerTest {
             .requestId("requestId") //
             .target("target") //
             .timestamp("timestamp") //
-            .type("type") //
             .url(URL) //
             .build();
     }
index 446c061..0122fb9 100644 (file)
                "kista_4"
             ]
          }
-      ]
+      ],
+      "streams_publishes":{
+         "dmaap_publisher":{
+            "type":"message_router",
+            "dmaap_info":{
+               "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+            }
+         }
+      },
+      "streams_subscribes":{
+         "dmaap_subscriber":{
+            "type":"message_router",
+            "dmaap_info":{
+               "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+            }
+         }
+      }
    }
-}
\ No newline at end of file
+}