NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / DmaapMessageConsumer.java
index 1a260e9..f82d7f6 100644 (file)
@@ -88,38 +88,38 @@ public class DmaapMessageConsumer {
     }
 
     public void start() {
-        infiniteSubmitter.stop();
-
-        createTask().subscribe(//
-                value -> logger.debug("DmaapMessageConsumer next: {}", value), //
-                throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
-                () -> logger.warn("DmaapMessageConsumer stopped") //
-        );
-    }
-
-    protected Flux<String> createTask() {
-        final int CONCURRENCY = 5;
-        return infiniteSubmitter.start() //
+        infiniteSubmitter.start() //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
-                .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
-                .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
-                .flatMap(this::handleReceivedMessage, CONCURRENCY);
+                .flatMap(this::handleReceivedMessage, 5) //
+                .subscribe(//
+                        value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), //
+                        throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
+                        () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) //
+                );
     }
 
     private String getDmaapUrl() {
+
         return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
     }
 
-    private Mono<String> handleErrorResponse(Throwable t) {
-        logger.debug("error from DMAAP {}", t.getMessage());
+    private Mono<String> handleDmaapErrorResponse(Throwable t) {
+        logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl());
         return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) //
                 .flatMap(notUsed -> Mono.empty());
     }
 
+    private Mono<String> handleConsumerErrorResponse(Throwable t) {
+        logger.warn("error from CONSUMER {}", t.getMessage());
+        return Mono.empty();
+    }
+
     protected Mono<String> getFromMessageRouter(String topicUrl) {
         logger.trace("getFromMessageRouter {}", topicUrl);
         return restClient.get(topicUrl) //
-                .onErrorResume(this::handleErrorResponse);
+                .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
+                .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
+                .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
     protected Flux<String> handleReceivedMessage(String body) {
@@ -130,7 +130,7 @@ public class DmaapMessageConsumer {
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
                 .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
                 .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
-                .onErrorResume(this::handleErrorResponse);
+                .onErrorResume(this::handleConsumerErrorResponse);
     }
 
 }