2 * ========================LICENSE_START=================================
5 * Copyright (C) 2021 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.dmaapadapter.tasks;
23 import java.time.Duration;
24 import java.util.Collections;
25 import java.util.HashMap;
28 import org.apache.kafka.clients.consumer.ConsumerConfig;
29 import org.apache.kafka.clients.consumer.ConsumerRecord;
30 import org.apache.kafka.common.serialization.IntegerDeserializer;
31 import org.apache.kafka.common.serialization.StringDeserializer;
32 import org.oran.dmaapadapter.clients.AsyncRestClient;
33 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
34 import org.oran.dmaapadapter.configuration.ApplicationConfig;
35 import org.oran.dmaapadapter.repository.InfoType;
36 import org.oran.dmaapadapter.repository.Job;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import reactor.core.Disposable;
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
43 import reactor.core.publisher.Sinks;
44 import reactor.core.publisher.Sinks.Many;
45 import reactor.kafka.receiver.KafkaReceiver;
46 import reactor.kafka.receiver.ReceiverOptions;
49 * The class fetches incoming requests from DMAAP and sends them further to the
50 * consumers that has a job for this InformationType.
52 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
53 public class KafkaTopicConsumer {
54 private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
55 private final AsyncRestClient consumerRestClient;
56 private final ApplicationConfig applicationConfig;
57 private final InfoType type;
58 private final Many<String> consumerDistributor;
60 public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
61 this.applicationConfig = applicationConfig;
63 final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10;
64 this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
66 AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
67 this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
68 : restclientFactory.createRestClientNoHttpProxy("");
70 startKafkaTopicReceiver();
73 private Disposable startKafkaTopicReceiver() {
74 return KafkaReceiver.create(kafkaInputProperties()) //
76 .flatMap(this::onReceivedData) //
78 throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
79 () -> logger.warn("KafkaMessageConsumer stopped"));
82 private Flux<String> onReceivedData(ConsumerRecord<Integer, String> input) {
83 logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
84 consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
85 return consumerDistributor.asFlux();
88 public Disposable startDistributeToConsumer(Job job) {
89 return getMessagesFromKafka(job) //
90 .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
91 .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) //
92 .onErrorResume(this::handleConsumerErrorResponse) //
94 throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
95 () -> logger.warn("KafkaMessageConsumer stopped {}", job.getType().getId()));
98 private Flux<String> getMessagesFromKafka(Job job) {
99 if (job.isBuffered()) {
100 return consumerDistributor.asFlux() //
101 .filter(job::isFilterMatch) //
102 .bufferTimeout(job.getParameters().bufferTimeout.maxSize,
103 Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) //
104 .flatMap(o -> Flux.just(o.toString()));
106 return consumerDistributor.asFlux() //
107 .filter(job::isFilterMatch);
111 private Mono<String> handleConsumerErrorResponse(Throwable t) {
112 logger.warn("error from CONSUMER {}", t.getMessage());
116 private ReceiverOptions<Integer, String> kafkaInputProperties() {
117 Map<String, Object> consumerProps = new HashMap<>();
118 if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
119 logger.error("No kafka boostrap server is setup");
121 consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
122 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
123 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
124 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
126 return ReceiverOptions.<Integer, String>create(consumerProps)
127 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));