NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / DmaapMessageConsumer.java
index f82d7f6..b7c4ec6 100644 (file)
@@ -43,7 +43,8 @@ public class DmaapMessageConsumer {
     private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
     private final ApplicationConfig applicationConfig;
-    private final AsyncRestClient restClient;
+    private final AsyncRestClient dmaapRestClient;
+    private final AsyncRestClient consumerRestClient;
     private final InfoType type;
     private final Jobs jobs;
     private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
@@ -82,7 +83,9 @@ public class DmaapMessageConsumer {
     public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
         this.applicationConfig = applicationConfig;
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
-        this.restClient = restclientFactory.createRestClientNoHttpProxy("");
+        this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
+        this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
+                : restclientFactory.createRestClientNoHttpProxy("");
         this.type = type;
         this.jobs = jobs;
     }
@@ -116,7 +119,7 @@ public class DmaapMessageConsumer {
 
     protected Mono<String> getFromMessageRouter(String topicUrl) {
         logger.trace("getFromMessageRouter {}", topicUrl);
-        return restClient.get(topicUrl) //
+        return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
                 .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
                 .onErrorResume(this::handleDmaapErrorResponse); //
@@ -129,7 +132,7 @@ public class DmaapMessageConsumer {
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
                 .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
-                .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
+                .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }