From 083393d0affc7dca6a5cea89f4f9759801a91591 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Tue, 12 May 2020 13:27:37 +0200 Subject: [PATCH] Bugfix, creating only one DmaapProducer Each DmaapProducer starts a thread that is never terminated. This class works around that by only creating one DmaapProducer and will reuse that. Change-Id: I3f8f9369a948f1cb5e3ae119c62c03bf4f5d180a Issue-ID: NONRTRIC-210 Signed-off-by: PatrikBuhr --- policy-agent/config/application.yaml | 2 +- policy-agent/pom.xml | 2 +- .../policyagent/dmaap/DmaapMessageConsumer.java | 46 +++++++++++----------- .../policyagent/dmaap/DmaapRequestMessage.java | 2 - .../org/oransc/policyagent/ApplicationTest.java | 2 +- .../policyagent/ConcurrencyTestRunnable.java | 21 +++++++++- .../dmaap/DmaapMessageConsumerTest.java | 19 +-------- .../policyagent/dmaap/DmaapMessageHandlerTest.java | 4 +- .../resources/test_application_configuration.json | 20 +++++++++- 9 files changed, 66 insertions(+), 52 deletions(-) diff --git a/policy-agent/config/application.yaml b/policy-agent/config/application.yaml index b1284942..4010219b 100644 --- a/policy-agent/config/application.yaml +++ b/policy-agent/config/application.yaml @@ -9,7 +9,7 @@ management: endpoints: web: exposure: - include: "loggers,logfile,health,info,metrics" + include: "loggers,logfile,health,info,metrics,threaddump" logging: level: diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml index 8ed33684..ed1cbf16 100644 --- a/policy-agent/pom.xml +++ b/policy-agent/pom.xml @@ -26,7 +26,7 @@ org.springframework.boot spring-boot-starter-parent - 2.2.4.RELEASE + 2.2.7.RELEASE org.o-ran-sc.nonrtric diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java index 6312e375..9165af54 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java @@ -41,13 +41,14 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** - * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the - * connection with the Kafka open until requests are sent in. + * The class fetches incoming requests from DMAAP. It uses the timeout parameter + * that lets the MessageRouter keep the connection with the Kafka open until + * requests are sent in. * *

- * If there is no DMaaP configuration in the application configuration, then this service will regularly check the - * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the - * service will stop polling and resume checking for configuration. + * this service will regularly check the configuration and start polling DMaaP + * if the configuration is added. If the DMaaP configuration is removed, then + * the service will stop polling and resume checking for configuration. * *

* Each received request is processed by {@link DmaapMessageHandler}. @@ -61,6 +62,9 @@ public class DmaapMessageConsumer { private final ApplicationConfig applicationConfig; + private DmaapMessageHandler dmaapMessageHandler = null; + private MRConsumer messageRouterConsumer = null; + @Value("${server.port}") private int localServerPort; @@ -140,13 +144,15 @@ public class DmaapMessageConsumer { getDmaapMessageHandler().handleDmaapMsg(msg); } - private DmaapMessageHandler getDmaapMessageHandler() throws IOException { - String agentBaseUrl = "https://localhost:" + this.localServerPort; - AsyncRestClient agentClient = createRestClient(agentBaseUrl); - Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); - MRBatchingPublisher producer = getMessageRouterPublisher(dmaapPublisherProperties); - - return createDmaapMessageHandler(agentClient, producer); + protected DmaapMessageHandler getDmaapMessageHandler() throws IOException { + if (this.dmaapMessageHandler == null) { + String agentBaseUrl = "https://localhost:" + this.localServerPort; + AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl); + Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); + MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties); + this.dmaapMessageHandler = new DmaapMessageHandler(producer, agentClient); + } + return this.dmaapMessageHandler; } protected void sleep(Duration duration) { @@ -158,18 +164,10 @@ public class DmaapMessageConsumer { } protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException { - return MRClientFactory.createConsumer(dmaapConsumerProperties); - } - - protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) { - return new DmaapMessageHandler(producer, agentClient); - } - - protected AsyncRestClient createRestClient(String agentBaseUrl) { - return new AsyncRestClient(agentBaseUrl); + if (this.messageRouterConsumer == null) { + this.messageRouterConsumer = MRClientFactory.createConsumer(dmaapConsumerProperties); + } + return this.messageRouterConsumer; } - protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException { - return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties); - } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapRequestMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapRequestMessage.java index 354fdaf9..c2d0d4c0 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapRequestMessage.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapRequestMessage.java @@ -35,8 +35,6 @@ public interface DmaapRequestMessage { PUT, GET, DELETE, POST } - String type(); - String correlationId(); String target(); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index d274d5a3..a107bdfc 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -687,7 +687,7 @@ public class ApplicationTest { addPolicyType("type1", "ric"); addPolicyType("type2", "ric"); - for (int i = 0; i < 100; ++i) { + for (int i = 0; i < 10; ++i) { Thread t = new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes), "TestThread_" + i); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java index f8f7ca3b..4a707dd2 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java @@ -34,6 +34,7 @@ import org.oransc.policyagent.utils.MockA1Client; import org.oransc.policyagent.utils.MockA1ClientFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; /** * Invoke operations over the NBI and start synchronizations in a separate @@ -59,11 +60,26 @@ class ConcurrencyTestRunnable implements Runnable { this.webClient = new AsyncRestClient(baseUrl); } + private void printStatusInfo() { + try { + String url = "/actuator/metrics/jvm.threads.live"; + ResponseEntity result = webClient.getForEntity(url).block(); + System.out.println(Thread.currentThread() + result.getBody()); + + url = "/rics"; + result = webClient.getForEntity(url).block(); + System.out.println(Thread.currentThread() + result.getBody()); + + } catch (Exception e) { + logger.error(Thread.currentThread() + "Concurrency test printStatusInfo exception " + e.toString()); + } + } + @Override public void run() { try { - for (int i = 0; i < 100; ++i) { - if (i % 10 == 0) { + for (int i = 0; i < 500; ++i) { + if (i % 100 == 0) { createInconsistency(); this.supervision.checkAllRics(); } @@ -77,6 +93,7 @@ class ConcurrencyTestRunnable implements Runnable { } } catch (Exception e) { logger.error("Concurrency test exception " + e.toString()); + printStatusInfo(); } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index fc4e7cea..b9696ade 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -23,11 +23,9 @@ package org.oransc.policyagent.dmaap; import static ch.qos.logback.classic.Level.WARN; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -49,10 +47,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.dmaap.mr.client.MRBatchingPublisher; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.tasks.RefreshConfigTask; import org.oransc.policyagent.utils.LoggingUtils; @@ -159,8 +155,7 @@ public class DmaapMessageConsumerTest { @Test public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { - Properties properties = setUpMrConfig(); - + setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doReturn(false, false, true).when(messageConsumerUnderTest).isStopped(); @@ -174,20 +169,10 @@ public class DmaapMessageConsumerTest { response.setActualMessages(messages); when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); - doReturn(messageHandlerMock).when(messageConsumerUnderTest) - .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class)); - - AsyncRestClient restClientMock = mock(AsyncRestClient.class); - doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString()); - - MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class); - doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class)); + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); messageConsumerUnderTest.start().join(); - verify(messageConsumerUnderTest).createRestClient("https://localhost:0"); - verify(messageConsumerUnderTest).getMessageRouterPublisher(properties); - verify(messageHandlerMock).handleDmaapMsg(responseMessage); verifyNoMoreInteractions(messageHandlerMock); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java index 52147a85..6b3457b4 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java @@ -89,7 +89,8 @@ public class DmaapMessageHandlerTest { Optional payload = ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson()) : Optional.empty()); - return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") // + return ImmutableDmaapRequestMessage.builder() // + .apiVersion("apiVersion") // .correlationId("correlationId") // .operation(operation) // .originatorId("originatorId") // @@ -97,7 +98,6 @@ public class DmaapMessageHandlerTest { .requestId("requestId") // .target("target") // .timestamp("timestamp") // - .type("type") // .url(URL) // .build(); } diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json index 446c0611..0122fb91 100644 --- a/policy-agent/src/test/resources/test_application_configuration.json +++ b/policy-agent/src/test/resources/test_application_configuration.json @@ -18,6 +18,22 @@ "kista_4" ] } - ] + ], + "streams_publishes":{ + "dmaap_publisher":{ + "type":"message_router", + "dmaap_info":{ + "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE" + } + } + }, + "streams_subscribes":{ + "dmaap_subscriber":{ + "type":"message_router", + "dmaap_info":{ + "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent" + } + } + } } -} \ No newline at end of file +} -- 2.16.6