X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Frepository%2FJobs.java;h=ec33774b7fe534bfea8d381cdf343a09277569dc;hb=aa73209488503ee51db068c6143c7d4ec298a036;hp=e3bc61e801d73d07cf9b6f3fbc5c7eb40e37e659;hpb=d0c7f9207203ce9a502fc15c09f9938eebfd44f7;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index e3bc61e8..ec33774b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -20,8 +20,10 @@ package org.oran.dmaapadapter.repository; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Vector; @@ -30,31 +32,35 @@ import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.repository.Job.Parameters; -import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; @Component public class Jobs { + public interface Observer { + void onJobbAdded(Job job); + + void onJobRemoved(Job job); + } + private static final Logger logger = LoggerFactory.getLogger(Jobs.class); private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); - private final KafkaTopicConsumers kafkaConsumers; private final AsyncRestClientFactory restclientFactory; + private final List observers = new ArrayList<>(); - public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) { - this.kafkaConsumers = kafkaConsumers; - + public Jobs(@Autowired ApplicationConfig applicationConfig) { restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); } public synchronized Job getJob(String id) throws ServiceException { Job job = allJobs.get(id); if (job == null) { - throw new ServiceException("Could not find job: " + id); + throw new ServiceException("Could not find job: " + id, HttpStatus.NOT_FOUND); } return job; } @@ -70,13 +76,21 @@ public class Jobs { : restclientFactory.createRestClientNoHttpProxy(callbackUrl); Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient); this.put(job); + synchronized (observers) { + this.observers.forEach(obs -> obs.onJobbAdded(job)); + } + } + + public void addObserver(Observer obs) { + synchronized (observers) { + this.observers.add(obs); + } } private synchronized void put(Job job) { logger.debug("Put job: {}", job.getId()); allJobs.put(job.getId(), job); jobsByType.put(job.getType().getId(), job.getId(), job); - kafkaConsumers.addJob(job); } public synchronized Iterable getAll() { @@ -91,10 +105,14 @@ public class Jobs { return job; } - public synchronized void remove(Job job) { - this.allJobs.remove(job.getId()); - jobsByType.remove(job.getType().getId(), job.getId()); - kafkaConsumers.removeJob(job); + public void remove(Job job) { + synchronized (this) { + this.allJobs.remove(job.getId()); + jobsByType.remove(job.getType().getId(), job.getId()); + } + synchronized (observers) { + this.observers.forEach(obs -> obs.onJobRemoved(job)); + } } public synchronized int size() {