NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / repository / Jobs.java
index 8a38824..e3bc61e 100644 (file)
@@ -25,7 +25,11 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Vector;
 
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.repository.Job.Parameters;
 import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,9 +43,12 @@ public class Jobs {
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
     private final KafkaTopicConsumers kafkaConsumers;
+    private final AsyncRestClientFactory restclientFactory;
 
-    public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) {
+    public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) {
         this.kafkaConsumers = kafkaConsumers;
+
+        restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
     }
 
     public synchronized Job getJob(String id) throws ServiceException {
@@ -56,7 +63,16 @@ public class Jobs {
         return allJobs.get(id);
     }
 
-    public synchronized void put(Job job) {
+    public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
+            Parameters parameters) {
+        AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
+                ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
+                : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
+        Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+        this.put(job);
+    }
+
+    private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
         jobsByType.put(job.getType().getId(), job.getId(), job);