Each DmaapProducer starts a thread that is never terminated.
This class works around that by only creating one DmaapProducer
and will reuse that.
Change-Id: I3f8f9369a948f1cb5e3ae119c62c03bf4f5d180a
Issue-ID: NONRTRIC-210
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
endpoints:
web:
exposure:
- include: "loggers,logfile,health,info,metrics"
+ include: "loggers,logfile,health,info,metrics,threaddump"
logging:
level:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.2.4.RELEASE</version>
+ <version>2.2.7.RELEASE</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric</groupId>
import org.springframework.stereotype.Component;
/**
- * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the
- * connection with the Kafka open until requests are sent in.
+ * The class fetches incoming requests from DMAAP. It uses the timeout parameter
+ * that lets the MessageRouter keep the connection with the Kafka open until
+ * requests are sent in.
*
* <p>
- * If there is no DMaaP configuration in the application configuration, then this service will regularly check the
- * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the
- * service will stop polling and resume checking for configuration.
+ * this service will regularly check the configuration and start polling DMaaP
+ * if the configuration is added. If the DMaaP configuration is removed, then
+ * the service will stop polling and resume checking for configuration.
*
* <p>
* Each received request is processed by {@link DmaapMessageHandler}.
private final ApplicationConfig applicationConfig;
+ private DmaapMessageHandler dmaapMessageHandler = null;
+ private MRConsumer messageRouterConsumer = null;
+
@Value("${server.port}")
private int localServerPort;
getDmaapMessageHandler().handleDmaapMsg(msg);
}
- private DmaapMessageHandler getDmaapMessageHandler() throws IOException {
- String agentBaseUrl = "https://localhost:" + this.localServerPort;
- AsyncRestClient agentClient = createRestClient(agentBaseUrl);
- Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
- MRBatchingPublisher producer = getMessageRouterPublisher(dmaapPublisherProperties);
-
- return createDmaapMessageHandler(agentClient, producer);
+ protected DmaapMessageHandler getDmaapMessageHandler() throws IOException {
+ if (this.dmaapMessageHandler == null) {
+ String agentBaseUrl = "https://localhost:" + this.localServerPort;
+ AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
+ Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+ MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+ this.dmaapMessageHandler = new DmaapMessageHandler(producer, agentClient);
+ }
+ return this.dmaapMessageHandler;
}
protected void sleep(Duration duration) {
}
protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
- return MRClientFactory.createConsumer(dmaapConsumerProperties);
- }
-
- protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
- return new DmaapMessageHandler(producer, agentClient);
- }
-
- protected AsyncRestClient createRestClient(String agentBaseUrl) {
- return new AsyncRestClient(agentBaseUrl);
+ if (this.messageRouterConsumer == null) {
+ this.messageRouterConsumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+ }
+ return this.messageRouterConsumer;
}
- protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
- return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
- }
}
PUT, GET, DELETE, POST
}
- String type();
-
String correlationId();
String target();
addPolicyType("type1", "ric");
addPolicyType("type2", "ric");
- for (int i = 0; i < 100; ++i) {
+ for (int i = 0; i < 10; ++i) {
Thread t =
new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes),
"TestThread_" + i);
import org.oransc.policyagent.utils.MockA1ClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.ResponseEntity;
/**
* Invoke operations over the NBI and start synchronizations in a separate
this.webClient = new AsyncRestClient(baseUrl);
}
+ private void printStatusInfo() {
+ try {
+ String url = "/actuator/metrics/jvm.threads.live";
+ ResponseEntity<String> result = webClient.getForEntity(url).block();
+ System.out.println(Thread.currentThread() + result.getBody());
+
+ url = "/rics";
+ result = webClient.getForEntity(url).block();
+ System.out.println(Thread.currentThread() + result.getBody());
+
+ } catch (Exception e) {
+ logger.error(Thread.currentThread() + "Concurrency test printStatusInfo exception " + e.toString());
+ }
+ }
+
@Override
public void run() {
try {
- for (int i = 0; i < 100; ++i) {
- if (i % 10 == 0) {
+ for (int i = 0; i < 500; ++i) {
+ if (i % 100 == 0) {
createInconsistency();
this.supervision.checkAllRics();
}
}
} catch (Exception e) {
logger.error("Concurrency test exception " + e.toString());
+ printStatusInfo();
}
}
import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.tasks.RefreshConfigTask;
import org.oransc.policyagent.utils.LoggingUtils;
@Test
public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
- Properties properties = setUpMrConfig();
-
+ setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
response.setActualMessages(messages);
when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
- doReturn(messageHandlerMock).when(messageConsumerUnderTest)
- .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
-
- AsyncRestClient restClientMock = mock(AsyncRestClient.class);
- doReturn(restClientMock).when(messageConsumerUnderTest).createRestClient(anyString());
-
- MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
- doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
messageConsumerUnderTest.start().join();
- verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
- verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
-
verify(messageHandlerMock).handleDmaapMsg(responseMessage);
verifyNoMoreInteractions(messageHandlerMock);
}
Optional<JsonObject> payload =
((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
: Optional.empty());
- return ImmutableDmaapRequestMessage.builder().apiVersion("apiVersion") //
+ return ImmutableDmaapRequestMessage.builder() //
+ .apiVersion("apiVersion") //
.correlationId("correlationId") //
.operation(operation) //
.originatorId("originatorId") //
.requestId("requestId") //
.target("target") //
.timestamp("timestamp") //
- .type("type") //
.url(URL) //
.build();
}
"kista_4"
]
}
- ]
+ ],
+ "streams_publishes":{
+ "dmaap_publisher":{
+ "type":"message_router",
+ "dmaap_info":{
+ "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+ }
+ }
+ },
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "type":"message_router",
+ "dmaap_info":{
+ "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+ }
+ }
+ }
}
-}
\ No newline at end of file
+}