X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2FBeanFactory.java;h=faf57426629739652b9244971d4a1facf90ce71d;hb=b2d6339441c650962e34502e7527ca0835fa342f;hp=c9ba93fc0eb93a5bbdd3afd7c6ea51d9292cf039;hpb=6e323a6d8f04cec11b5f3f1a7fcb7c5f6b10247d;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java index c9ba93fc..faf57426 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java @@ -27,7 +27,8 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.DmaapMessageConsumer; +import org.oran.dmaapadapter.tasks.DmaapTopicConsumer; +import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; @@ -37,6 +38,7 @@ import org.springframework.context.annotation.Configuration; @Configuration public class BeanFactory { + private InfoTypes infoTypes; @Value("${server.http-port}") private int httpPort = 0; @@ -47,16 +49,24 @@ public class BeanFactory { } @Bean - public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) { + public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs, + @Autowired KafkaTopicConsumers kafkaConsumers) { + if (infoTypes != null) { + return infoTypes; + } + Collection types = appConfig.getTypes(); // Start a consumer for each type for (InfoType type : types) { - DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs); - topicConsumer.start(); + if (type.isDmaapTopicDefined()) { + DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs); + topicConsumer.start(); + } } - - return new InfoTypes(types); + infoTypes = new InfoTypes(types); + kafkaConsumers.start(infoTypes); + return infoTypes; } @Bean