NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / BeanFactory.java
index c9ba93f..faf5742 100644 (file)
@@ -27,7 +27,8 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DmaapMessageConsumer;
+import org.oran.dmaapadapter.tasks.DmaapTopicConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
@@ -37,6 +38,7 @@ import org.springframework.context.annotation.Configuration;
 
 @Configuration
 public class BeanFactory {
+    private InfoTypes infoTypes;
 
     @Value("${server.http-port}")
     private int httpPort = 0;
@@ -47,16 +49,24 @@ public class BeanFactory {
     }
 
     @Bean
-    public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) {
+    public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs,
+            @Autowired KafkaTopicConsumers kafkaConsumers) {
+        if (infoTypes != null) {
+            return infoTypes;
+        }
+
         Collection<InfoType> types = appConfig.getTypes();
 
         // Start a consumer for each type
         for (InfoType type : types) {
-            DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs);
-            topicConsumer.start();
+            if (type.isDmaapTopicDefined()) {
+                DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs);
+                topicConsumer.start();
+            }
         }
-
-        return new InfoTypes(types);
+        infoTypes = new InfoTypes(types);
+        kafkaConsumers.start(infoTypes);
+        return infoTypes;
     }
 
     @Bean