package org.oran.dmaapadapter.tasks;
-import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.oran.dmaapadapter.clients.AsyncRestClient;
-import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.Job;
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class KafkaTopicConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
- private final AsyncRestClient consumerRestClient;
private final ApplicationConfig applicationConfig;
private final InfoType type;
private final Many<String> consumerDistributor;
public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10;
+ final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
- AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
- this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
- : restclientFactory.createRestClientNoHttpProxy("");
this.type = type;
startKafkaTopicReceiver();
}
private Disposable startKafkaTopicReceiver() {
return KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
- .flatMap(this::onReceivedData) //
+ .doOnNext(this::onReceivedData) //
.subscribe(null, //
- throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
- () -> logger.warn("KafkaMessageConsumer stopped"));
+ throwable -> logger.error("KafkaTopicReceiver error: {}", throwable.getMessage()), //
+ () -> logger.warn("KafkaTopicReceiver stopped"));
}
- private Flux<String> onReceivedData(ConsumerRecord<Integer, String> input) {
+ private void onReceivedData(ConsumerRecord<Integer, String> input) {
logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
- return consumerDistributor.asFlux();
}
public Disposable startDistributeToConsumer(Job job) {
+ final int CONCURRENCY = 10; // Has to be 1 to guarantee correct order.
+
return getMessagesFromKafka(job) //
.doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
- .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) //
+ .flatMap(body -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse) //
.subscribe(null, //
throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
if (job.isBuffered()) {
return consumerDistributor.asFlux() //
.filter(job::isFilterMatch) //
- .bufferTimeout(job.getParameters().bufferTimeout.maxSize,
- Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) //
- .flatMap(o -> Flux.just(o.toString()));
+ .bufferTimeout( //
+ job.getParameters().getBufferTimeout().getMaxSize(), //
+ job.getParameters().getBufferTimeout().getMaxTime()) //
+ .map(Object::toString);
} else {
return consumerDistributor.asFlux() //
.filter(job::isFilterMatch);