NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / repository / Jobs.java
index 6e2b326..8a38824 100644 (file)
@@ -26,8 +26,10 @@ import java.util.Map;
 import java.util.Vector;
 
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -36,8 +38,11 @@ public class Jobs {
 
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
+    private final KafkaTopicConsumers kafkaConsumers;
 
-    public Jobs() {}
+    public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) {
+        this.kafkaConsumers = kafkaConsumers;
+    }
 
     public synchronized Job getJob(String id) throws ServiceException {
         Job job = allJobs.get(id);
@@ -52,9 +57,10 @@ public class Jobs {
     }
 
     public synchronized void put(Job job) {
-        logger.debug("Put service: {}", job.getId());
+        logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
         jobsByType.put(job.getType().getId(), job.getId(), job);
+        kafkaConsumers.addJob(job);
     }
 
     public synchronized Iterable<Job> getAll() {
@@ -72,6 +78,7 @@ public class Jobs {
     public synchronized void remove(Job job) {
         this.allJobs.remove(job.getId());
         jobsByType.remove(job.getType().getId(), job.getId());
+        kafkaConsumers.removeJob(job);
     }
 
     public synchronized int size() {