import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DmaapMessageConsumer;
+import org.oran.dmaapadapter.tasks.DmaapTopicConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
@Configuration
public class BeanFactory {
+ private InfoTypes infoTypes;
@Value("${server.http-port}")
private int httpPort = 0;
}
@Bean
- public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) {
+ public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs,
+ @Autowired KafkaTopicConsumers kafkaConsumers) {
+ if (infoTypes != null) {
+ return infoTypes;
+ }
+
Collection<InfoType> types = appConfig.getTypes();
// Start a consumer for each type
for (InfoType type : types) {
- DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs);
- topicConsumer.start();
+ if (type.isDmaapTopicDefined()) {
+ DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs);
+ topicConsumer.start();
+ }
}
-
- return new InfoTypes(types);
+ infoTypes = new InfoTypes(types);
+ kafkaConsumers.start(infoTypes);
+ return infoTypes;
}
@Bean