NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaTopicConsumer.java
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java
new file mode 100644 (file)
index 0000000..6079edf
--- /dev/null
@@ -0,0 +1,130 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+
+/**
+ * The class fetches incoming requests from DMAAP and sends them further to the
+ * consumers that has a job for this InformationType.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class KafkaTopicConsumer {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
+    private final AsyncRestClient consumerRestClient;
+    private final ApplicationConfig applicationConfig;
+    private final InfoType type;
+    private final Many<String> consumerDistributor;
+
+    public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
+        this.applicationConfig = applicationConfig;
+
+        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10;
+        this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+
+        AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+        this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
+                : restclientFactory.createRestClientNoHttpProxy("");
+        this.type = type;
+        startKafkaTopicReceiver();
+    }
+
+    private Disposable startKafkaTopicReceiver() {
+        return KafkaReceiver.create(kafkaInputProperties()) //
+                .receive() //
+                .flatMap(this::onReceivedData) //
+                .subscribe(null, //
+                        throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
+                        () -> logger.warn("KafkaMessageConsumer stopped"));
+    }
+
+    private Flux<String> onReceivedData(ConsumerRecord<Integer, String> input) {
+        logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
+        consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
+        return consumerDistributor.asFlux();
+    }
+
+    public Disposable startDistributeToConsumer(Job job) {
+        return getMessagesFromKafka(job) //
+                .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
+                .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) //
+                .onErrorResume(this::handleConsumerErrorResponse) //
+                .subscribe(null, //
+                        throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
+                        () -> logger.warn("KafkaMessageConsumer stopped {}", job.getType().getId()));
+    }
+
+    private Flux<String> getMessagesFromKafka(Job job) {
+        if (job.isBuffered()) {
+            return consumerDistributor.asFlux() //
+                    .filter(job::isFilterMatch) //
+                    .bufferTimeout(job.getParameters().bufferTimeout.maxSize,
+                            Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) //
+                    .flatMap(o -> Flux.just(o.toString()));
+        } else {
+            return consumerDistributor.asFlux() //
+                    .filter(job::isFilterMatch);
+        }
+    }
+
+    private Mono<String> handleConsumerErrorResponse(Throwable t) {
+        logger.warn("error from CONSUMER {}", t.getMessage());
+        return Mono.empty();
+    }
+
+    private ReceiverOptions<Integer, String> kafkaInputProperties() {
+        Map<String, Object> consumerProps = new HashMap<>();
+        if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
+            logger.error("No kafka boostrap server is setup");
+        }
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        return ReceiverOptions.<Integer, String>create(consumerProps)
+                .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
+    }
+
+}