X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FDmaapTopicConsumer.java;h=507d9b6b89265819bf8d542f990b0a018226c4f5;hb=6f48adb69090799c74c29204dd2cd1737cc9d6ac;hp=7d5575851ea134417429d0c546c8cddf8120a843;hpb=b2d6339441c650962e34502e7527ca0835fa342f;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index 7d557585..507d9b6b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -38,14 +38,12 @@ import reactor.core.publisher.Mono; * The class fetches incoming requests from DMAAP and sends them further to the * consumers that has a job for this InformationType. */ - public class DmaapTopicConsumer { private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class); private final AsyncRestClient dmaapRestClient; private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); - private final AsyncRestClient consumerRestClient; protected final ApplicationConfig applicationConfig; protected final InfoType type; protected final Jobs jobs; @@ -85,8 +83,6 @@ public class DmaapTopicConsumer { AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); this.applicationConfig = applicationConfig; - this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("") - : restclientFactory.createRestClientNoHttpProxy(""); this.type = type; this.jobs = jobs; } @@ -108,7 +104,8 @@ public class DmaapTopicConsumer { private Mono handleDmaapErrorResponse(Throwable t) { logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl()); - return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(notUsed -> Mono.empty()); + return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) // + .flatMap(notUsed -> Mono.empty()); } private Mono getFromMessageRouter(String topicUrl) { @@ -130,8 +127,8 @@ public class DmaapTopicConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // - .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) - .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) // + .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) // + .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); } }