import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private final Many<String> output;
+ private Many<String> output;
+ private Disposable topicReceiverTask;
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
-
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
- this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
this.type = type;
- startKafkaTopicReceiver();
}
public Many<String> getOutput() {
return this.output;
}
- private Disposable startKafkaTopicReceiver() {
- return KafkaReceiver.create(kafkaInputProperties()) //
+ public void start() {
+ stop();
+ final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+ this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+ logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
+ topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
.doOnNext(this::onReceivedData) //
.subscribe(null, //
() -> logger.warn("KafkaTopicReceiver stopped"));
}
- private void onReceivedData(ConsumerRecord<Integer, String> input) {
+ private void stop() {
+ if (topicReceiverTask != null) {
+ topicReceiverTask.dispose();
+ topicReceiverTask = null;
+ }
+ }
+
+ private void onReceivedData(ConsumerRecord<String, String> input) {
logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
}
logger.error("KafkaTopicReceiver error: {}", t.getMessage());
}
- private ReceiverOptions<Integer, String> kafkaInputProperties() {
+ private ReceiverOptions<String, String> kafkaInputProperties() {
Map<String, Object> consumerProps = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return ReceiverOptions.<Integer, String>create(consumerProps)
+ return ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
}