X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FDmaapMessageConsumer.java;h=b7c4ec66689e82e961438d09c4f9e1aa40a1fc13;hb=b43cca1ae1da9826ee934c61f69f2270bd8b4d08;hp=f82d7f68df87d0fa5f215c7c2fc4bf1729b5dc5d;hpb=022b9ca1a9b53e83e27379c8a4785fb6e1683e89;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java index f82d7f68..b7c4ec66 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -43,7 +43,8 @@ public class DmaapMessageConsumer { private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class); private final ApplicationConfig applicationConfig; - private final AsyncRestClient restClient; + private final AsyncRestClient dmaapRestClient; + private final AsyncRestClient consumerRestClient; private final InfoType type; private final Jobs jobs; private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); @@ -82,7 +83,9 @@ public class DmaapMessageConsumer { public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { this.applicationConfig = applicationConfig; AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); - this.restClient = restclientFactory.createRestClientNoHttpProxy(""); + this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); + this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("") + : restclientFactory.createRestClientNoHttpProxy(""); this.type = type; this.jobs = jobs; } @@ -116,7 +119,7 @@ public class DmaapMessageConsumer { protected Mono getFromMessageRouter(String topicUrl) { logger.trace("getFromMessageRouter {}", topicUrl); - return restClient.get(topicUrl) // + return dmaapRestClient.get(topicUrl) // .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) // .onErrorResume(this::handleDmaapErrorResponse); // @@ -129,7 +132,7 @@ public class DmaapMessageConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) - .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) // + .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); }