NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaTopicConsumer.java
index 6079edf..5590022 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.oran.dmaapadapter.tasks;
 
-import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,8 +28,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.oran.dmaapadapter.clients.AsyncRestClient;
-import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.Job;
@@ -52,7 +49,6 @@ import reactor.kafka.receiver.ReceiverOptions;
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class KafkaTopicConsumer {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
-    private final AsyncRestClient consumerRestClient;
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
     private final Many<String> consumerDistributor;
@@ -60,12 +56,9 @@ public class KafkaTopicConsumer {
     public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
 
-        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10;
+        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
         this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
 
-        AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
-        this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
-                : restclientFactory.createRestClientNoHttpProxy("");
         this.type = type;
         startKafkaTopicReceiver();
     }
@@ -73,22 +66,23 @@ public class KafkaTopicConsumer {
     private Disposable startKafkaTopicReceiver() {
         return KafkaReceiver.create(kafkaInputProperties()) //
                 .receive() //
-                .flatMap(this::onReceivedData) //
+                .doOnNext(this::onReceivedData) //
                 .subscribe(null, //
-                        throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
-                        () -> logger.warn("KafkaMessageConsumer stopped"));
+                        throwable -> logger.error("KafkaTopicReceiver error: {}", throwable.getMessage()), //
+                        () -> logger.warn("KafkaTopicReceiver stopped"));
     }
 
-    private Flux<String> onReceivedData(ConsumerRecord<Integer, String> input) {
+    private void onReceivedData(ConsumerRecord<Integer, String> input) {
         logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
         consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
-        return consumerDistributor.asFlux();
     }
 
     public Disposable startDistributeToConsumer(Job job) {
+        final int CONCURRENCY = 10; // Has to be 1 to guarantee correct order.
+
         return getMessagesFromKafka(job) //
                 .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
-                .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) //
+                .flatMap(body -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse) //
                 .subscribe(null, //
                         throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
@@ -99,9 +93,10 @@ public class KafkaTopicConsumer {
         if (job.isBuffered()) {
             return consumerDistributor.asFlux() //
                     .filter(job::isFilterMatch) //
-                    .bufferTimeout(job.getParameters().bufferTimeout.maxSize,
-                            Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) //
-                    .flatMap(o -> Flux.just(o.toString()));
+                    .bufferTimeout( //
+                            job.getParameters().getBufferTimeout().getMaxSize(), //
+                            job.getParameters().getBufferTimeout().getMaxTime()) //
+                    .map(Object::toString);
         } else {
             return consumerDistributor.asFlux() //
                     .filter(job::isFilterMatch);