package org.oran.dmaapadapter.repository;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.repository.Job.Parameters;
-import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
@Component
public class Jobs {
+ public interface Observer {
+ void onJobbAdded(Job job);
+
+ void onJobRemoved(Job job);
+ }
+
private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
- private final KafkaTopicConsumers kafkaConsumers;
private final AsyncRestClientFactory restclientFactory;
+ private final List<Observer> observers = new ArrayList<>();
- public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) {
- this.kafkaConsumers = kafkaConsumers;
-
+ public Jobs(@Autowired ApplicationConfig applicationConfig) {
restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
}
public synchronized Job getJob(String id) throws ServiceException {
Job job = allJobs.get(id);
if (job == null) {
- throw new ServiceException("Could not find job: " + id);
+ throw new ServiceException("Could not find job: " + id, HttpStatus.NOT_FOUND);
}
return job;
}
: restclientFactory.createRestClientNoHttpProxy(callbackUrl);
Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
this.put(job);
+ synchronized (observers) {
+ this.observers.forEach(obs -> obs.onJobbAdded(job));
+ }
+ }
+
+ public void addObserver(Observer obs) {
+ synchronized (observers) {
+ this.observers.add(obs);
+ }
}
private synchronized void put(Job job) {
logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
jobsByType.put(job.getType().getId(), job.getId(), job);
- kafkaConsumers.addJob(job);
}
public synchronized Iterable<Job> getAll() {
return job;
}
- public synchronized void remove(Job job) {
- this.allJobs.remove(job.getId());
- jobsByType.remove(job.getType().getId(), job.getId());
- kafkaConsumers.removeJob(job);
+ public void remove(Job job) {
+ synchronized (this) {
+ this.allJobs.remove(job.getId());
+ jobsByType.remove(job.getType().getId(), job.getId());
+ }
+ synchronized (observers) {
+ this.observers.forEach(obs -> obs.onJobRemoved(job));
+ }
}
public synchronized int size() {