X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FKafkaTopicListener.java;h=d1045ee01abb0f8ad862038ae0c83b72d49eed3f;hb=c873fa9306739a14d05454d2ec27bc2fde497058;hp=0452b88c1de3b224d322583be6833181feb3047b;hpb=5ee9fd987436011e7130eb05126858cfe54ca545;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 0452b88c..d1045ee0 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -26,7 +26,6 @@ import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; @@ -48,24 +47,25 @@ public class KafkaTopicListener { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class); private final ApplicationConfig applicationConfig; private final InfoType type; - private final Many output; + private Many output; + private Disposable topicReceiverTask; public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { this.applicationConfig = applicationConfig; - - final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; - this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); - this.type = type; - startKafkaTopicReceiver(); + start(); } public Many getOutput() { return this.output; } - private Disposable startKafkaTopicReceiver() { - return KafkaReceiver.create(kafkaInputProperties()) // + public void start() { + stop(); + final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; + this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); + logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId()); + topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) // .receive() // .doOnNext(this::onReceivedData) // .subscribe(null, // @@ -73,7 +73,14 @@ public class KafkaTopicListener { () -> logger.warn("KafkaTopicReceiver stopped")); } - private void onReceivedData(ConsumerRecord input) { + private void stop() { + if (topicReceiverTask != null) { + topicReceiverTask.dispose(); + topicReceiverTask = null; + } + } + + private void onReceivedData(ConsumerRecord input) { logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value()); output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST); } @@ -82,17 +89,17 @@ public class KafkaTopicListener { logger.error("KafkaTopicReceiver error: {}", t.getMessage()); } - private ReceiverOptions kafkaInputProperties() { + private ReceiverOptions kafkaInputProperties() { Map consumerProps = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); } consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor"); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return ReceiverOptions.create(consumerProps) + return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); }