NONRTRIC - dmaap adapter characteristic improvement 79/8779/1
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 15 Jul 2022 11:56:59 +0000 (13:56 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 15 Jul 2022 11:56:59 +0000 (13:56 +0200)
Minor changes, renamed some classes.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I1de5f8d88877b7a8f1693576ac52ca3bf5b5be51

src/main/java/org/oran/dmaapadapter/tasks/HttpJobDataDistributor.java [moved from src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java with 92% similarity]
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java [moved from src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java with 95% similarity]
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataDistributor.java [moved from src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java with 95% similarity]
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

@@ -31,10 +31,10 @@ import reactor.core.publisher.Mono;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class HttpDataConsumer extends DataConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(HttpDataConsumer.class);
+public class HttpJobDataDistributor extends JobDataDistributor {
+    private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
 
-    public HttpDataConsumer(Job job) {
+    public HttpJobDataDistributor(Job job) {
         super(job);
     }
 
@@ -37,8 +37,8 @@ import reactor.core.publisher.Mono;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public abstract class DataConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(DataConsumer.class);
+public abstract class JobDataDistributor {
+    private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
     @Getter
     private final Job job;
     private Disposable subscription;
@@ -81,7 +81,7 @@ public abstract class DataConsumer {
         }
     }
 
-    protected DataConsumer(Job job) {
+    protected JobDataDistributor(Job job) {
         this.job = job;
     }
 
@@ -91,7 +91,7 @@ public abstract class DataConsumer {
         this.subscription = handleReceivedMessage(input, job) //
                 .flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
-                .subscribe(this::handleConsumerSentOk, //
+                .subscribe(this::handleSentOk, //
                         this::handleExceptionInStream, //
                         () -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
     }
@@ -147,7 +147,7 @@ public abstract class DataConsumer {
         }
     }
 
-    private void handleConsumerSentOk(String data) {
+    private void handleSentOk(String data) {
         this.errorStats.handleOkFromConsumer();
     }
 
@@ -42,13 +42,13 @@ import reactor.kafka.sender.SenderRecord;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaDataConsumer extends DataConsumer {
-    private static final Logger logger = LoggerFactory.getLogger(KafkaDataConsumer.class);
+public class KafkaJobDataDistributor extends JobDataDistributor {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class);
 
     private KafkaSender<String, String> sender;
     private final ApplicationConfig appConfig;
 
-    public KafkaDataConsumer(Job job, ApplicationConfig appConfig) {
+    public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
         super(job);
         this.appConfig = appConfig;
     }
index 4f3148d..fcc94ee 100644 (file)
@@ -49,7 +49,7 @@ public class TopicListeners {
     private final Map<String, TopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
 
     @Getter
-    private final MultiMap<DataConsumer> dataConsumers = new MultiMap<>(); // Key is typeId, jobId
+    private final MultiMap<JobDataDistributor> dataDistributors = new MultiMap<>(); // Key is typeId, jobId
 
     private final ApplicationConfig appConfig;
 
@@ -85,35 +85,36 @@ public class TopicListeners {
         removeJob(job);
         logger.debug("Job added {}", job.getId());
         if (job.getType().isKafkaTopicDefined()) {
-            addConsumer(job, dataConsumers, kafkaTopicListeners);
+            addConsumer(job, dataDistributors, kafkaTopicListeners);
         }
 
         if (job.getType().isDmaapTopicDefined()) {
-            addConsumer(job, dataConsumers, dmaapTopicListeners);
+            addConsumer(job, dataDistributors, dmaapTopicListeners);
         }
     }
 
-    private DataConsumer createConsumer(Job job) {
-        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaDataConsumer(job, appConfig)
-                : new HttpDataConsumer(job);
+    private JobDataDistributor createConsumer(Job job) {
+        return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
+                : new HttpJobDataDistributor(job);
     }
 
-    private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
+    private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
+            Map<String, TopicListener> topicListeners) {
         TopicListener topicListener = topicListeners.get(job.getType().getId());
-        DataConsumer consumer = createConsumer(job);
-        consumer.start(topicListener.getFlux());
-        consumers.put(job.getType().getId(), job.getId(), consumer);
+        JobDataDistributor distributor = createConsumer(job);
+        distributor.start(topicListener.getFlux());
+        distributors.put(job.getType().getId(), job.getId(), distributor);
     }
 
     public synchronized void removeJob(Job job) {
-        removeJob(job, dataConsumers);
+        removeJob(job, dataDistributors);
     }
 
-    private static void removeJob(Job job, MultiMap<DataConsumer> consumers) {
-        DataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
-        if (consumer != null) {
+    private static void removeJob(Job job, MultiMap<JobDataDistributor> distributors) {
+        JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId());
+        if (distributor != null) {
             logger.debug("Job removed {}", job.getId());
-            consumer.stop();
+            distributor.stop();
         }
     }
 
index 7eaf7ab..2d0d621 100644 (file)
@@ -54,7 +54,7 @@ import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.repository.filters.PmReport;
 import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.tasks.DataConsumer;
+import org.oran.dmaapadapter.tasks.JobDataDistributor;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
@@ -296,7 +296,7 @@ class ApplicationTest {
         this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        DataConsumer kafkaConsumer = this.topicListeners.getDataConsumers().get(TYPE_ID, JOB_ID);
+        JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID);
 
         // Handle received data from Kafka, check that it has been posted to the
         // consumer
index bc650f7..2de10fd 100644 (file)
@@ -216,7 +216,7 @@ class IntegrationWithKafka {
             this.icsSimulatorController.deleteJob(job.getId(), restClient());
         }
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty());
 
         this.consumerController.testResults.reset();
         this.icsSimulatorController.testResults.reset();