From: PatrikBuhr Date: Fri, 15 Jul 2022 11:56:59 +0000 (+0200) Subject: NONRTRIC - dmaap adapter characteristic improvement X-Git-Tag: 1.2.0~28 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=0d8e3125563be34400da1ab7e1c923d8a684d2f7;p=nonrtric%2Fplt%2Fdmaapadapter.git NONRTRIC - dmaap adapter characteristic improvement Minor changes, renamed some classes. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I1de5f8d88877b7a8f1693576ac52ca3bf5b5be51 --- diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java similarity index 92% rename from src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java rename to src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java index b2ade98..69cc6ea 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java @@ -31,10 +31,10 @@ import reactor.core.publisher.Mono; * owner via REST calls. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -public class HttpDataConsumer extends DataConsumer { - private static final Logger logger = LoggerFactory.getLogger(HttpDataConsumer.class); +public class HttpJobDataDistributor extends JobDataDistributor { + private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class); - public HttpDataConsumer(Job job) { + public HttpJobDataDistributor(Job job) { super(job); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java similarity index 95% rename from src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java rename to src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 991ecc5..60e271e 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -37,8 +37,8 @@ import reactor.core.publisher.Mono; * owner via REST calls. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -public abstract class DataConsumer { - private static final Logger logger = LoggerFactory.getLogger(DataConsumer.class); +public abstract class JobDataDistributor { + private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class); @Getter private final Job job; private Disposable subscription; @@ -81,7 +81,7 @@ public abstract class DataConsumer { } } - protected DataConsumer(Job job) { + protected JobDataDistributor(Job job) { this.job = job; } @@ -91,7 +91,7 @@ public abstract class DataConsumer { this.subscription = handleReceivedMessage(input, job) // .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // - .subscribe(this::handleConsumerSentOk, // + .subscribe(this::handleSentOk, // this::handleExceptionInStream, // () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId())); } @@ -147,7 +147,7 @@ public abstract class DataConsumer { } } - private void handleConsumerSentOk(String data) { + private void handleSentOk(String data) { this.errorStats.handleOkFromConsumer(); } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java similarity index 95% rename from src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java rename to src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java index 94a7aeb..8740c94 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java @@ -42,13 +42,13 @@ import reactor.kafka.sender.SenderRecord; * owner via REST calls. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -public class KafkaDataConsumer extends DataConsumer { - private static final Logger logger = LoggerFactory.getLogger(KafkaDataConsumer.class); +public class KafkaJobDataDistributor extends JobDataDistributor { + private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class); private KafkaSender sender; private final ApplicationConfig appConfig; - public KafkaDataConsumer(Job job, ApplicationConfig appConfig) { + public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) { super(job); this.appConfig = appConfig; } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index 4f3148d..fcc94ee 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -49,7 +49,7 @@ public class TopicListeners { private final Map dmaapTopicListeners = new HashMap<>(); // Key is typeId @Getter - private final MultiMap dataConsumers = new MultiMap<>(); // Key is typeId, jobId + private final MultiMap dataDistributors = new MultiMap<>(); // Key is typeId, jobId private final ApplicationConfig appConfig; @@ -85,35 +85,36 @@ public class TopicListeners { removeJob(job); logger.debug("Job added {}", job.getId()); if (job.getType().isKafkaTopicDefined()) { - addConsumer(job, dataConsumers, kafkaTopicListeners); + addConsumer(job, dataDistributors, kafkaTopicListeners); } if (job.getType().isDmaapTopicDefined()) { - addConsumer(job, dataConsumers, dmaapTopicListeners); + addConsumer(job, dataDistributors, dmaapTopicListeners); } } - private DataConsumer createConsumer(Job job) { - return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaDataConsumer(job, appConfig) - : new HttpDataConsumer(job); + private JobDataDistributor createConsumer(Job job) { + return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig) + : new HttpJobDataDistributor(job); } - private void addConsumer(Job job, MultiMap consumers, Map topicListeners) { + private void addConsumer(Job job, MultiMap distributors, + Map topicListeners) { TopicListener topicListener = topicListeners.get(job.getType().getId()); - DataConsumer consumer = createConsumer(job); - consumer.start(topicListener.getFlux()); - consumers.put(job.getType().getId(), job.getId(), consumer); + JobDataDistributor distributor = createConsumer(job); + distributor.start(topicListener.getFlux()); + distributors.put(job.getType().getId(), job.getId(), distributor); } public synchronized void removeJob(Job job) { - removeJob(job, dataConsumers); + removeJob(job, dataDistributors); } - private static void removeJob(Job job, MultiMap consumers) { - DataConsumer consumer = consumers.remove(job.getType().getId(), job.getId()); - if (consumer != null) { + private static void removeJob(Job job, MultiMap distributors) { + JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId()); + if (distributor != null) { logger.debug("Job removed {}", job.getId()); - consumer.stop(); + distributor.stop(); } } diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 7eaf7ab..2d0d621 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -54,7 +54,7 @@ import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.repository.filters.PmReport; import org.oran.dmaapadapter.repository.filters.PmReportFilter; -import org.oran.dmaapadapter.tasks.DataConsumer; +import org.oran.dmaapadapter.tasks.JobDataDistributor; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; @@ -296,7 +296,7 @@ class ApplicationTest { this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - DataConsumer kafkaConsumer = this.topicListeners.getDataConsumers().get(TYPE_ID, JOB_ID); + JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID); // Handle received data from Kafka, check that it has been posted to the // consumer diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index bc650f7..2de10fd 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -216,7 +216,7 @@ class IntegrationWithKafka { this.icsSimulatorController.deleteJob(job.getId(), restClient()); } await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty()); + await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty()); this.consumerController.testResults.reset(); this.icsSimulatorController.testResults.reset();