X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FDmaapMessageConsumer.java;h=b7c4ec66689e82e961438d09c4f9e1aa40a1fc13;hb=9a843152776a80f03e8b8175bb72b35358b7c7d6;hp=1a260e92eb773bd68c41eed8be05059a36792ce4;hpb=766e2a9fa843e55bb32f17e59d7d0ccae7cfc22e;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java index 1a260e92..b7c4ec66 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -43,7 +43,8 @@ public class DmaapMessageConsumer { private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class); private final ApplicationConfig applicationConfig; - private final AsyncRestClient restClient; + private final AsyncRestClient dmaapRestClient; + private final AsyncRestClient consumerRestClient; private final InfoType type; private final Jobs jobs; private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); @@ -82,44 +83,46 @@ public class DmaapMessageConsumer { public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { this.applicationConfig = applicationConfig; AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); - this.restClient = restclientFactory.createRestClientNoHttpProxy(""); + this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); + this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("") + : restclientFactory.createRestClientNoHttpProxy(""); this.type = type; this.jobs = jobs; } public void start() { - infiniteSubmitter.stop(); - - createTask().subscribe(// - value -> logger.debug("DmaapMessageConsumer next: {}", value), // - throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("DmaapMessageConsumer stopped") // - ); - } - - protected Flux createTask() { - final int CONCURRENCY = 5; - return infiniteSubmitter.start() // + infiniteSubmitter.start() // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // - .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. - .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // - .flatMap(this::handleReceivedMessage, CONCURRENCY); + .flatMap(this::handleReceivedMessage, 5) // + .subscribe(// + value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), // + throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // + () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) // + ); } private String getDmaapUrl() { + return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl(); } - private Mono handleErrorResponse(Throwable t) { - logger.debug("error from DMAAP {}", t.getMessage()); + private Mono handleDmaapErrorResponse(Throwable t) { + logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl()); return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) // .flatMap(notUsed -> Mono.empty()); } + private Mono handleConsumerErrorResponse(Throwable t) { + logger.warn("error from CONSUMER {}", t.getMessage()); + return Mono.empty(); + } + protected Mono getFromMessageRouter(String topicUrl) { logger.trace("getFromMessageRouter {}", topicUrl); - return restClient.get(topicUrl) // - .onErrorResume(this::handleErrorResponse); + return dmaapRestClient.get(topicUrl) // + .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. + .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) // + .onErrorResume(this::handleDmaapErrorResponse); // } protected Flux handleReceivedMessage(String body) { @@ -129,8 +132,8 @@ public class DmaapMessageConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) - .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) // - .onErrorResume(this::handleErrorResponse); + .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) // + .onErrorResume(this::handleConsumerErrorResponse); } }