PM Filter 49/8149/3
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 5 May 2022 09:20:20 +0000 (11:20 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 5 May 2022 10:48:34 +0000 (12:48 +0200)
Minor changes.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743
Change-Id: I9cab3d7bf8e4f9d776d186f1d623e7aa0f9efb2a

src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java

index 854cdc5..6b4f253 100644 (file)
@@ -28,7 +28,6 @@ import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
-import org.oran.dmaapadapter.repository.Jobs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,31 +41,30 @@ import reactor.core.publisher.Sinks.Many;
  * The class fetches incoming requests from DMAAP and sends them further to the
  * consumers that has a job for this InformationType.
  */
-public class DmaapTopicListener {
+public class DmaapTopicListener implements TopicListener {
     private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(3);
     private static final Logger logger = LoggerFactory.getLogger(DmaapTopicListener.class);
 
     private final AsyncRestClient dmaapRestClient;
-    protected final ApplicationConfig applicationConfig;
-    protected final InfoType type;
-    protected final Jobs jobs;
+    private final ApplicationConfig applicationConfig;
+    private final InfoType type;
     private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
     private Many<String> output;
     private Disposable topicReceiverTask;
 
-    public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
+    public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
         this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
         this.applicationConfig = applicationConfig;
         this.type = type;
-        this.jobs = jobs;
-
     }
 
+    @Override
     public Many<String> getOutput() {
         return this.output;
     }
 
+    @Override
     public void start() {
         stop();
 
@@ -82,6 +80,7 @@ public class DmaapTopicListener {
                         this::onComplete); //
     }
 
+    @Override
     public void stop() {
         if (topicReceiverTask != null) {
             topicReceiverTask.dispose();
index f32c585..4b58013 100644 (file)
@@ -43,7 +43,7 @@ import reactor.kafka.receiver.ReceiverOptions;
  * to a multi cast sink, which several other streams can connect to.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaTopicListener {
+public class KafkaTopicListener implements TopicListener {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
@@ -55,10 +55,12 @@ public class KafkaTopicListener {
         this.type = type;
     }
 
+    @Override
     public Many<String> getOutput() {
         return this.output;
     }
 
+    @Override
     public void start() {
         stop();
         final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
@@ -72,6 +74,7 @@ public class KafkaTopicListener {
                         () -> logger.warn("KafkaTopicReceiver stopped"));
     }
 
+    @Override
     public void stop() {
         if (topicReceiverTask != null) {
             topicReceiverTask.dispose();
index 24b53c9..f3b663b 100644 (file)
@@ -68,7 +68,7 @@ public class ProducerRegstrationTask {
     private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
     @Getter
     private boolean isRegisteredInIcs = false;
-    private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
+    private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 10;
 
     public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
new file mode 100644 (file)
index 0000000..503f113
--- /dev/null
@@ -0,0 +1,31 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import reactor.core.publisher.Sinks.Many;
+
+public interface TopicListener {
+    public void start();
+
+    public void stop();
+
+    public Many<String> getOutput();
+}
index 1b8a740..5176867 100644 (file)
@@ -44,8 +44,8 @@ import org.springframework.stereotype.Component;
 public class TopicListeners {
     private static final Logger logger = LoggerFactory.getLogger(TopicListeners.class);
 
-    private final Map<String, KafkaTopicListener> kafkaTopicListeners = new HashMap<>(); // Key is typeId
-    private final Map<String, DmaapTopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
+    private final Map<String, TopicListener> kafkaTopicListeners = new HashMap<>(); // Key is typeId
+    private final Map<String, TopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
 
     @Getter
     private final MultiMap<JobDataConsumer> kafkaConsumers = new MultiMap<>(); // Key is typeId, jobId
@@ -61,7 +61,7 @@ public class TopicListeners {
                 kafkaTopicListeners.put(type.getId(), topicConsumer);
             }
             if (type.isDmaapTopicDefined()) {
-                DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type, jobs);
+                DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type);
                 dmaapTopicListeners.put(type.getId(), topicListener);
             }
         }
@@ -83,35 +83,34 @@ public class TopicListeners {
         removeJob(job);
         logger.debug("Job added {}", job.getId());
         if (job.getType().isKafkaTopicDefined()) {
-            KafkaTopicListener topicListener = kafkaTopicListeners.get(job.getType().getId());
-            if (kafkaConsumers.get(job.getType().getId()).isEmpty()) {
-                topicListener.start();
-            }
-            JobDataConsumer subscription = new JobDataConsumer(job);
-            subscription.start(topicListener.getOutput().asFlux());
-            kafkaConsumers.put(job.getType().getId(), job.getId(), subscription);
+            addJob(job, kafkaConsumers, kafkaTopicListeners);
         }
 
         if (job.getType().isDmaapTopicDefined()) {
-            DmaapTopicListener topicListener = dmaapTopicListeners.get(job.getType().getId());
-            if (dmaapConsumers.get(job.getType().getId()).isEmpty()) {
-                topicListener.start();
-            }
-            JobDataConsumer subscription = new JobDataConsumer(job);
-            subscription.start(topicListener.getOutput().asFlux());
-            dmaapConsumers.put(job.getType().getId(), job.getId(), subscription);
+            addJob(job, dmaapConsumers, dmaapTopicListeners);
         }
     }
 
-    public synchronized void removeJob(Job job) {
-        JobDataConsumer consumer = kafkaConsumers.remove(job.getType().getId(), job.getId());
-        if (consumer != null) {
-            logger.debug("Kafka job removed {}", job.getId());
-            consumer.stop();
+    private static void addJob(Job job, MultiMap<JobDataConsumer> consumers,
+            Map<String, TopicListener> topicListeners) {
+        TopicListener topicListener = topicListeners.get(job.getType().getId());
+        if (consumers.get(job.getType().getId()).isEmpty()) {
+            topicListener.start();
         }
-        consumer = this.dmaapConsumers.remove(job.getType().getId(), job.getId());
+        JobDataConsumer subscription = new JobDataConsumer(job);
+        subscription.start(topicListener.getOutput().asFlux());
+        consumers.put(job.getType().getId(), job.getId(), subscription);
+    }
+
+    public synchronized void removeJob(Job job) {
+        removeJob(job, kafkaConsumers);
+        removeJob(job, dmaapConsumers);
+    }
+
+    private static void removeJob(Job job, MultiMap<JobDataConsumer> consumers) {
+        JobDataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
         if (consumer != null) {
-            logger.debug("DMAAP job removed {}", job.getId());
+            logger.debug("Job removed {}", job.getId());
             consumer.stop();
         }
     }
@@ -121,20 +120,22 @@ public class TopicListeners {
         for (String typeId : this.kafkaConsumers.keySet()) {
             for (JobDataConsumer consumer : this.kafkaConsumers.get(typeId)) {
                 if (!consumer.isRunning()) {
-                    restartKafkaTopic(consumer);
+                    restartTopicAndConsumers(this.kafkaTopicListeners, this.kafkaConsumers, consumer);
                 }
             }
         }
     }
 
-    private void restartKafkaTopic(JobDataConsumer consumer) {
+    private static void restartTopicAndConsumers(Map<String, TopicListener> topicListeners,
+            MultiMap<JobDataConsumer> consumers, JobDataConsumer consumer) {
         InfoType type = consumer.getJob().getType();
-        KafkaTopicListener topic = this.kafkaTopicListeners.get(type.getId());
+        TopicListener topic = topicListeners.get(type.getId());
         topic.start();
-        restartConsumersOfType(topic, type);
+        restartConsumersOfType(consumers, topic, type);
     }
 
-    private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
-        this.kafkaConsumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
+    private static void restartConsumersOfType(MultiMap<JobDataConsumer> consumers, TopicListener topic,
+            InfoType type) {
+        consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
     }
 }