NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / repository / Jobs.java
index e3bc61e..0e7743d 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
@@ -30,7 +32,6 @@ import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.repository.Job.Parameters;
-import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -38,16 +39,20 @@ import org.springframework.stereotype.Component;
 
 @Component
 public class Jobs {
+    public interface Observer {
+        void onJobbAdded(Job job);
+
+        void onJobRemoved(Job job);
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
 
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
-    private final KafkaTopicConsumers kafkaConsumers;
     private final AsyncRestClientFactory restclientFactory;
+    private final List<Observer> observers = new ArrayList<>();
 
-    public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) {
-        this.kafkaConsumers = kafkaConsumers;
-
+    public Jobs(@Autowired ApplicationConfig applicationConfig) {
         restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
     }
 
@@ -70,13 +75,21 @@ public class Jobs {
                 : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
         Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
         this.put(job);
+        synchronized (observers) {
+            this.observers.forEach(obs -> obs.onJobbAdded(job));
+        }
+    }
+
+    public void addObserver(Observer obs) {
+        synchronized (observers) {
+            this.observers.add(obs);
+        }
     }
 
     private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
         jobsByType.put(job.getType().getId(), job.getId(), job);
-        kafkaConsumers.addJob(job);
     }
 
     public synchronized Iterable<Job> getAll() {
@@ -91,10 +104,14 @@ public class Jobs {
         return job;
     }
 
-    public synchronized void remove(Job job) {
-        this.allJobs.remove(job.getId());
-        jobsByType.remove(job.getType().getId(), job.getId());
-        kafkaConsumers.removeJob(job);
+    public void remove(Job job) {
+        synchronized (this) {
+            this.allJobs.remove(job.getId());
+            jobsByType.remove(job.getType().getId(), job.getId());
+        }
+        synchronized (observers) {
+            this.observers.forEach(obs -> obs.onJobRemoved(job));
+        }
     }
 
     public synchronized int size() {