Minor changes, aesthetics.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I7dc76691f45d30555be66511e1b78c6e5231d01f
package org.oran.dmaapadapter.tasks;
import lombok.Getter;
+import lombok.ToString;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
private Disposable subscription;
private final ErrorStats errorStats = new ErrorStats();
+ @ToString
+ public static class DataToConsumer {
+ public final String key;
+ public final String value;
+
+ public DataToConsumer(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
private class ErrorStats {
private int consumerFaultCounter = 0;
private boolean irrecoverableError = false; // eg. overflow
this.job = job;
}
- public synchronized void start(Flux<TopicListener.Output> input) {
+ public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
stop();
this.errorStats.resetIrrecoverableErrors();
this.subscription = handleReceivedMessage(input, job) //
stop();
}
- protected abstract Mono<String> sendToClient(TopicListener.Output output);
+ protected abstract Mono<String> sendToClient(DataToConsumer output);
public synchronized void stop() {
if (this.subscription != null) {
return this.subscription != null;
}
- private Flux<TopicListener.Output> handleReceivedMessage(Flux<TopicListener.Output> inputFlux, Job job) {
- Flux<TopicListener.Output> result =
- inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) //
- .filter(t -> !t.value.isEmpty()); //
+ private Flux<DataToConsumer> handleReceivedMessage(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+ Flux<DataToConsumer> result = inputFlux.map(input -> new DataToConsumer(input.key, job.filter(input.value))) //
+ .filter(t -> !t.value.isEmpty()); //
if (job.isBuffered()) {
result = result.map(input -> quoteNonJson(input.value, job)) //
.bufferTimeout( //
job.getParameters().getBufferTimeout().getMaxSize(), //
job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(buffered -> new TopicListener.Output("", buffered.toString()));
+ .map(buffered -> new DataToConsumer("", buffered.toString()));
}
return result;
}
private final ApplicationConfig applicationConfig;
private final InfoType type;
private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
- private Flux<Output> output;
+ private Flux<DataFromTopic> dataFromDmaap;
public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
AsyncRestClientFactory restclientFactory =
}
@Override
- public Flux<Output> getOutput() {
- if (this.output == null) {
- this.output = createOutput();
+ public Flux<DataFromTopic> getFlux() {
+ if (this.dataFromDmaap == null) {
+ this.dataFromDmaap = startFetchFromDmaap();
}
- return this.output;
+ return this.dataFromDmaap;
}
- private Flux<Output> createOutput() {
+ private Flux<DataFromTopic> startFetchFromDmaap() {
return Flux.range(0, Integer.MAX_VALUE) //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
.doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
.doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
.publish() //
.autoConnect() //
- .map(input -> new Output("", input)); //
+ .map(input -> new DataFromTopic("", input)); //
}
private String getDmaapUrl() {
}
@Override
- protected Mono<String> sendToClient(TopicListener.Output output) {
+ protected Mono<String> sendToClient(DataToConsumer output) {
Job job = this.getJob();
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
}
@Override
- protected Mono<String> sendToClient(TopicListener.Output data) {
+ protected Mono<String> sendToClient(DataToConsumer data) {
Job job = this.getJob();
logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
}
@Override
- public synchronized void start(Flux<TopicListener.Output> input) {
+ public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
super.start(input);
SenderOptions<String, String> senderOptions = senderOptions(appConfig);
this.sender = KafkaSender.create(senderOptions);
return SenderOptions.create(props);
}
- private SenderRecord<String, String, Integer> senderRecord(TopicListener.Output output, Job infoJob) {
+ private SenderRecord<String, String, Integer> senderRecord(DataToConsumer output, Job infoJob) {
int correlationMetadata = 2;
String topic = infoJob.getParameters().getKafkaOutputTopic();
return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private Flux<Output> output;
+ private Flux<DataFromTopic> dataFromTopic;
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
}
@Override
- public Flux<Output> getOutput() {
- if (this.output == null) {
- this.output = createOutput();
+ public Flux<DataFromTopic> getFlux() {
+ if (this.dataFromTopic == null) {
+ this.dataFromTopic = startReceiveFromTopic();
}
- return this.output;
+ return this.dataFromTopic;
}
- private Flux<Output> createOutput() {
+ private Flux<DataFromTopic> startReceiveFromTopic() {
logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
return KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
.doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
.publish() //
.autoConnect() //
- .map(input -> new Output(input.key(), input.value())); //
+ .map(input -> new DataFromTopic(input.key(), input.value())); //
}
private ReceiverOptions<String, String> kafkaInputProperties() {
public interface TopicListener {
@ToString
- public static class Output {
+ public static class DataFromTopic {
public final String key;
public final String value;
- public Output(String key, String value) {
+ public DataFromTopic(String key, String value) {
this.key = key;
this.value = value;
}
}
- public Flux<Output> getOutput();
+ public Flux<DataFromTopic> getFlux();
}
private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
TopicListener topicListener = topicListeners.get(job.getType().getId());
DataConsumer consumer = createConsumer(job);
- consumer.start(topicListener.getOutput());
+ consumer.start(topicListener.getFlux());
consumers.put(job.getType().getId(), job.getId(), consumer);
}
// Handle received data from Kafka, check that it has been posted to the
// consumer
- kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data")));
+ kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data")));
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
private static class KafkaReceiver {
public final String OUTPUT_TOPIC;
- private TopicListener.Output receivedKafkaOutput;
+ private TopicListener.DataFromTopic receivedKafkaOutput;
private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
int count = 0;
InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
- topicListener.getOutput() //
+ topicListener.getFlux() //
.doOnNext(this::set) //
.doFinally(sig -> logger.info("Finally " + sig)) //
.subscribe();
}
- private void set(TopicListener.Output receivedKafkaOutput) {
+ private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
this.receivedKafkaOutput = receivedKafkaOutput;
this.count++;
logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
void reset() {
count = 0;
- this.receivedKafkaOutput = new TopicListener.Output("", "");
+ this.receivedKafkaOutput = new TopicListener.DataFromTopic("", "");
}
}