NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaTopicListener.java
@@ -30,82 +30,56 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
-import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
 import reactor.core.publisher.Sinks.Many;
 import reactor.kafka.receiver.KafkaReceiver;
 import reactor.kafka.receiver.ReceiverOptions;
 
 /**
- * The class fetches incoming requests from DMAAP and sends them further to the
- * consumers that has a job for this InformationType.
+ * The class streams incoming requests from a Kafka topic and sends them further
+ * to a multi cast sink, which several other streams can connect to.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaTopicConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
+public class KafkaTopicListener {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
-    private final Many<String> consumerDistributor;
+    private final Many<String> output;
 
-    public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
+    public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
 
         final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
-        this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
 
         this.type = type;
         startKafkaTopicReceiver();
     }
 
+    public Many<String> getOutput() {
+        return this.output;
+    }
+
     private Disposable startKafkaTopicReceiver() {
         return KafkaReceiver.create(kafkaInputProperties()) //
                 .receive() //
                 .doOnNext(this::onReceivedData) //
                 .subscribe(null, //
-                        throwable -> logger.error("KafkaTopicReceiver error: {}", throwable.getMessage()), //
+                        this::onReceivedError, //
                         () -> logger.warn("KafkaTopicReceiver stopped"));
     }
 
     private void onReceivedData(ConsumerRecord<Integer, String> input) {
         logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
-        consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
-    }
-
-    public Disposable startDistributeToConsumer(Job job) {
-        final int CONCURRENCY = 10; // Has to be 1 to guarantee correct order.
-
-        return getMessagesFromKafka(job) //
-                .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
-                .flatMap(body -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
-                .onErrorResume(this::handleConsumerErrorResponse) //
-                .subscribe(null, //
-                        throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
-                        () -> logger.warn("KafkaMessageConsumer stopped {}", job.getType().getId()));
-    }
-
-    private Flux<String> getMessagesFromKafka(Job job) {
-        if (job.isBuffered()) {
-            return consumerDistributor.asFlux() //
-                    .filter(job::isFilterMatch) //
-                    .bufferTimeout( //
-                            job.getParameters().getBufferTimeout().getMaxSize(), //
-                            job.getParameters().getBufferTimeout().getMaxTime()) //
-                    .map(Object::toString);
-        } else {
-            return consumerDistributor.asFlux() //
-                    .filter(job::isFilterMatch);
-        }
+        output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
     }
 
-    private Mono<String> handleConsumerErrorResponse(Throwable t) {
-        logger.warn("error from CONSUMER {}", t.getMessage());
-        return Mono.empty();
+    private void onReceivedError(Throwable t) {
+        logger.error("KafkaTopicReceiver error: {}", t.getMessage());
     }
 
     private ReceiverOptions<Integer, String> kafkaInputProperties() {