import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
-import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
/**
- * The class fetches incoming requests from DMAAP and sends them further to the
- * consumers that has a job for this InformationType.
+ * The class streams incoming requests from a Kafka topic and sends them further
+ * to a multi cast sink, which several other streams can connect to.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaTopicConsumer {
- private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
+public class KafkaTopicListener {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private final Many<String> consumerDistributor;
+ private final Many<String> output;
- public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
+ public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
- this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+ this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
this.type = type;
startKafkaTopicReceiver();
}
+ public Many<String> getOutput() {
+ return this.output;
+ }
+
private Disposable startKafkaTopicReceiver() {
return KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
.doOnNext(this::onReceivedData) //
.subscribe(null, //
- throwable -> logger.error("KafkaTopicReceiver error: {}", throwable.getMessage()), //
+ this::onReceivedError, //
() -> logger.warn("KafkaTopicReceiver stopped"));
}
private void onReceivedData(ConsumerRecord<Integer, String> input) {
logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
- consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
- }
-
- public Disposable startDistributeToConsumer(Job job) {
- final int CONCURRENCY = 10; // Has to be 1 to guarantee correct order.
-
- return getMessagesFromKafka(job) //
- .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
- .flatMap(body -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
- .onErrorResume(this::handleConsumerErrorResponse) //
- .subscribe(null, //
- throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
- () -> logger.warn("KafkaMessageConsumer stopped {}", job.getType().getId()));
- }
-
- private Flux<String> getMessagesFromKafka(Job job) {
- if (job.isBuffered()) {
- return consumerDistributor.asFlux() //
- .filter(job::isFilterMatch) //
- .bufferTimeout( //
- job.getParameters().getBufferTimeout().getMaxSize(), //
- job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(Object::toString);
- } else {
- return consumerDistributor.asFlux() //
- .filter(job::isFilterMatch);
- }
+ output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
}
- private Mono<String> handleConsumerErrorResponse(Throwable t) {
- logger.warn("error from CONSUMER {}", t.getMessage());
- return Mono.empty();
+ private void onReceivedError(Throwable t) {
+ logger.error("KafkaTopicReceiver error: {}", t.getMessage());
}
private ReceiverOptions<Integer, String> kafkaInputProperties() {