From 46419e1afda75bd45086925b8814c852373e64cd Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Thu, 14 Jul 2022 16:36:09 +0200 Subject: [PATCH] NONRTRIC - dmaap adapter characteristic improvement Minor changes, aesthetics. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I7dc76691f45d30555be66511e1b78c6e5231d01f --- .../org/oran/dmaapadapter/tasks/DataConsumer.java | 25 ++++++++++++++++------ .../dmaapadapter/tasks/DmaapTopicListener.java | 14 ++++++------ .../oran/dmaapadapter/tasks/HttpDataConsumer.java | 2 +- .../oran/dmaapadapter/tasks/KafkaDataConsumer.java | 6 +++--- .../dmaapadapter/tasks/KafkaTopicListener.java | 14 ++++++------ .../org/oran/dmaapadapter/tasks/TopicListener.java | 6 +++--- .../oran/dmaapadapter/tasks/TopicListeners.java | 2 +- .../org/oran/dmaapadapter/ApplicationTest.java | 2 +- .../oran/dmaapadapter/IntegrationWithKafka.java | 8 +++---- 9 files changed, 45 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java index a412370..991ecc5 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java @@ -21,6 +21,7 @@ package org.oran.dmaapadapter.tasks; import lombok.Getter; +import lombok.ToString; import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; @@ -43,6 +44,17 @@ public abstract class DataConsumer { private Disposable subscription; private final ErrorStats errorStats = new ErrorStats(); + @ToString + public static class DataToConsumer { + public final String key; + public final String value; + + public DataToConsumer(String key, String value) { + this.key = key; + this.value = value; + } + } + private class ErrorStats { private int consumerFaultCounter = 0; private boolean irrecoverableError = false; // eg. overflow @@ -73,7 +85,7 @@ public abstract class DataConsumer { this.job = job; } - public synchronized void start(Flux input) { + public synchronized void start(Flux input) { stop(); this.errorStats.resetIrrecoverableErrors(); this.subscription = handleReceivedMessage(input, job) // @@ -89,7 +101,7 @@ public abstract class DataConsumer { stop(); } - protected abstract Mono sendToClient(TopicListener.Output output); + protected abstract Mono sendToClient(DataToConsumer output); public synchronized void stop() { if (this.subscription != null) { @@ -102,17 +114,16 @@ public abstract class DataConsumer { return this.subscription != null; } - private Flux handleReceivedMessage(Flux inputFlux, Job job) { - Flux result = - inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) // - .filter(t -> !t.value.isEmpty()); // + private Flux handleReceivedMessage(Flux inputFlux, Job job) { + Flux result = inputFlux.map(input -> new DataToConsumer(input.key, job.filter(input.value))) // + .filter(t -> !t.value.isEmpty()); // if (job.isBuffered()) { result = result.map(input -> quoteNonJson(input.value, job)) // .bufferTimeout( // job.getParameters().getBufferTimeout().getMaxSize(), // job.getParameters().getBufferTimeout().getMaxTime()) // - .map(buffered -> new TopicListener.Output("", buffered.toString())); + .map(buffered -> new DataToConsumer("", buffered.toString())); } return result; } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index 69226ca..fc571d5 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -45,7 +45,7 @@ public class DmaapTopicListener implements TopicListener { private final ApplicationConfig applicationConfig; private final InfoType type; private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); - private Flux output; + private Flux dataFromDmaap; public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) { AsyncRestClientFactory restclientFactory = @@ -56,14 +56,14 @@ public class DmaapTopicListener implements TopicListener { } @Override - public Flux getOutput() { - if (this.output == null) { - this.output = createOutput(); + public Flux getFlux() { + if (this.dataFromDmaap == null) { + this.dataFromDmaap = startFetchFromDmaap(); } - return this.output; + return this.dataFromDmaap; } - private Flux createOutput() { + private Flux startFetchFromDmaap() { return Flux.range(0, Integer.MAX_VALUE) // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) // @@ -71,7 +71,7 @@ public class DmaapTopicListener implements TopicListener { .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) // .publish() // .autoConnect() // - .map(input -> new Output("", input)); // + .map(input -> new DataFromTopic("", input)); // } private String getDmaapUrl() { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java index 87a6b67..b2ade98 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java @@ -39,7 +39,7 @@ public class HttpDataConsumer extends DataConsumer { } @Override - protected Mono sendToClient(TopicListener.Output output) { + protected Mono sendToClient(DataToConsumer output) { Job job = this.getJob(); logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output); MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null; diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java index 406c6f3..94a7aeb 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java @@ -54,7 +54,7 @@ public class KafkaDataConsumer extends DataConsumer { } @Override - protected Mono sendToClient(TopicListener.Output data) { + protected Mono sendToClient(DataToConsumer data) { Job job = this.getJob(); logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic()); @@ -67,7 +67,7 @@ public class KafkaDataConsumer extends DataConsumer { } @Override - public synchronized void start(Flux input) { + public synchronized void start(Flux input) { super.start(input); SenderOptions senderOptions = senderOptions(appConfig); this.sender = KafkaSender.create(senderOptions); @@ -93,7 +93,7 @@ public class KafkaDataConsumer extends DataConsumer { return SenderOptions.create(props); } - private SenderRecord senderRecord(TopicListener.Output output, Job infoJob) { + private SenderRecord senderRecord(DataToConsumer output, Job infoJob) { int correlationMetadata = 2; String topic = infoJob.getParameters().getKafkaOutputTopic(); return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 8d36fdd..11c0c28 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -44,7 +44,7 @@ public class KafkaTopicListener implements TopicListener { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class); private final ApplicationConfig applicationConfig; private final InfoType type; - private Flux output; + private Flux dataFromTopic; public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { this.applicationConfig = applicationConfig; @@ -52,14 +52,14 @@ public class KafkaTopicListener implements TopicListener { } @Override - public Flux getOutput() { - if (this.output == null) { - this.output = createOutput(); + public Flux getFlux() { + if (this.dataFromTopic == null) { + this.dataFromTopic = startReceiveFromTopic(); } - return this.output; + return this.dataFromTopic; } - private Flux createOutput() { + private Flux startReceiveFromTopic() { logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId()); return KafkaReceiver.create(kafkaInputProperties()) // .receive() // @@ -69,7 +69,7 @@ public class KafkaTopicListener implements TopicListener { .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) // .publish() // .autoConnect() // - .map(input -> new Output(input.key(), input.value())); // + .map(input -> new DataFromTopic(input.key(), input.value())); // } private ReceiverOptions kafkaInputProperties() { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java index 54254a3..cb7c3de 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -26,15 +26,15 @@ import reactor.core.publisher.Flux; public interface TopicListener { @ToString - public static class Output { + public static class DataFromTopic { public final String key; public final String value; - public Output(String key, String value) { + public DataFromTopic(String key, String value) { this.key = key; this.value = value; } } - public Flux getOutput(); + public Flux getFlux(); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index 6c0f48f..4f3148d 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -101,7 +101,7 @@ public class TopicListeners { private void addConsumer(Job job, MultiMap consumers, Map topicListeners) { TopicListener topicListener = topicListeners.get(job.getType().getId()); DataConsumer consumer = createConsumer(job); - consumer.start(topicListener.getOutput()); + consumer.start(topicListener.getFlux()); consumers.put(job.getType().getId(), job.getId(), consumer); } diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index a3febaf..7eaf7ab 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -300,7 +300,7 @@ class ApplicationTest { // Handle received data from Kafka, check that it has been posted to the // consumer - kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data"))); + kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data"))); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 0fa7a8e..bc650f7 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -158,7 +158,7 @@ class IntegrationWithKafka { private static class KafkaReceiver { public final String OUTPUT_TOPIC; - private TopicListener.Output receivedKafkaOutput; + private TopicListener.DataFromTopic receivedKafkaOutput; private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); int count = 0; @@ -171,13 +171,13 @@ class IntegrationWithKafka { InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false); KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type); - topicListener.getOutput() // + topicListener.getFlux() // .doOnNext(this::set) // .doFinally(sig -> logger.info("Finally " + sig)) // .subscribe(); } - private void set(TopicListener.Output receivedKafkaOutput) { + private void set(TopicListener.DataFromTopic receivedKafkaOutput) { this.receivedKafkaOutput = receivedKafkaOutput; this.count++; logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput); @@ -193,7 +193,7 @@ class IntegrationWithKafka { void reset() { count = 0; - this.receivedKafkaOutput = new TopicListener.Output("", ""); + this.receivedKafkaOutput = new TopicListener.DataFromTopic("", ""); } } -- 2.16.6