import java.util.Map;
import java.util.Vector;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.repository.Job.Parameters;
import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
private final KafkaTopicConsumers kafkaConsumers;
+ private final AsyncRestClientFactory restclientFactory;
- public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) {
+ public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) {
this.kafkaConsumers = kafkaConsumers;
+
+ restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
}
public synchronized Job getJob(String id) throws ServiceException {
return allJobs.get(id);
}
- public synchronized void put(Job job) {
+ public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
+ Parameters parameters) {
+ AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
+ ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
+ : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
+ Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+ this.put(job);
+ }
+
+ private synchronized void put(Job job) {
logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
jobsByType.put(job.getType().getId(), job.getId(), job);