X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Frepository%2FJobs.java;h=8a388248a9073c2ab0b127634357c2f7423c59c6;hb=2d522d44bf5f69b091380b57e2879d3b7139bc8f;hp=6e2b3265f4488804132cc18db0cd0a4a7e4b9a72;hpb=18fddb8fc58d52d80e06ba87f43f028d30b68302;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 6e2b3265..8a388248 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -26,8 +26,10 @@ import java.util.Map; import java.util.Vector; import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @@ -36,8 +38,11 @@ public class Jobs { private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); + private final KafkaTopicConsumers kafkaConsumers; - public Jobs() {} + public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) { + this.kafkaConsumers = kafkaConsumers; + } public synchronized Job getJob(String id) throws ServiceException { Job job = allJobs.get(id); @@ -52,9 +57,10 @@ public class Jobs { } public synchronized void put(Job job) { - logger.debug("Put service: {}", job.getId()); + logger.debug("Put job: {}", job.getId()); allJobs.put(job.getId(), job); jobsByType.put(job.getType().getId(), job.getId(), job); + kafkaConsumers.addJob(job); } public synchronized Iterable getAll() { @@ -72,6 +78,7 @@ public class Jobs { public synchronized void remove(Job job) { this.allJobs.remove(job.getId()); jobsByType.remove(job.getType().getId(), job.getId()); + kafkaConsumers.removeJob(job); } public synchronized int size() {