import java.util.Vector;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
+ private final KafkaTopicConsumers kafkaConsumers;
- public Jobs() {}
+ public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) {
+ this.kafkaConsumers = kafkaConsumers;
+ }
public synchronized Job getJob(String id) throws ServiceException {
Job job = allJobs.get(id);
}
public synchronized void put(Job job) {
- logger.debug("Put service: {}", job.getId());
+ logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
jobsByType.put(job.getType().getId(), job.getId(), job);
+ kafkaConsumers.addJob(job);
}
public synchronized Iterable<Job> getAll() {
public synchronized void remove(Job job) {
this.allJobs.remove(job.getId());
jobsByType.remove(job.getType().getId(), job.getId());
+ kafkaConsumers.removeJob(job);
}
public synchronized int size() {