X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Frepository%2FJobs.java;h=e3bc61e801d73d07cf9b6f3fbc5c7eb40e37e659;hb=5e1623ab25b62c6c28849bfd862eba4648465922;hp=8a388248a9073c2ab0b127634357c2f7423c59c6;hpb=b2d6339441c650962e34502e7527ca0835fa342f;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 8a388248..e3bc61e8 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -25,7 +25,11 @@ import java.util.HashMap; import java.util.Map; import java.util.Vector; +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.repository.Job.Parameters; import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +43,12 @@ public class Jobs { private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); private final KafkaTopicConsumers kafkaConsumers; + private final AsyncRestClientFactory restclientFactory; - public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) { + public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) { this.kafkaConsumers = kafkaConsumers; + + restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); } public synchronized Job getJob(String id) throws ServiceException { @@ -56,7 +63,16 @@ public class Jobs { return allJobs.get(id); } - public synchronized void put(Job job) { + public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, + Parameters parameters) { + AsyncRestClient consumerRestClient = type.isUseHttpProxy() // + ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) // + : restclientFactory.createRestClientNoHttpProxy(callbackUrl); + Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient); + this.put(job); + } + + private synchronized void put(Job job) { logger.debug("Put job: {}", job.getId()); allJobs.put(job.getId(), job); jobsByType.put(job.getType().getId(), job.getId(), job);