X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FDmaapMessageConsumer.java;h=f82d7f68df87d0fa5f215c7c2fc4bf1729b5dc5d;hb=9b6fec053ae15d9ea67d7c85db0acb45cf9effe8;hp=1a260e92eb773bd68c41eed8be05059a36792ce4;hpb=e372f940d2e57562d23e08ecb797f580800dc719;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java index 1a260e92..f82d7f68 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -88,38 +88,38 @@ public class DmaapMessageConsumer { } public void start() { - infiniteSubmitter.stop(); - - createTask().subscribe(// - value -> logger.debug("DmaapMessageConsumer next: {}", value), // - throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("DmaapMessageConsumer stopped") // - ); - } - - protected Flux createTask() { - final int CONCURRENCY = 5; - return infiniteSubmitter.start() // + infiniteSubmitter.start() // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // - .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. - .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // - .flatMap(this::handleReceivedMessage, CONCURRENCY); + .flatMap(this::handleReceivedMessage, 5) // + .subscribe(// + value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), // + throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // + () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) // + ); } private String getDmaapUrl() { + return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl(); } - private Mono handleErrorResponse(Throwable t) { - logger.debug("error from DMAAP {}", t.getMessage()); + private Mono handleDmaapErrorResponse(Throwable t) { + logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl()); return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) // .flatMap(notUsed -> Mono.empty()); } + private Mono handleConsumerErrorResponse(Throwable t) { + logger.warn("error from CONSUMER {}", t.getMessage()); + return Mono.empty(); + } + protected Mono getFromMessageRouter(String topicUrl) { logger.trace("getFromMessageRouter {}", topicUrl); return restClient.get(topicUrl) // - .onErrorResume(this::handleErrorResponse); + .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. + .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) // + .onErrorResume(this::handleDmaapErrorResponse); // } protected Flux handleReceivedMessage(String body) { @@ -130,7 +130,7 @@ public class DmaapMessageConsumer { return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) // - .onErrorResume(this::handleErrorResponse); + .onErrorResume(this::handleConsumerErrorResponse); } }