X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Frepository%2FEiProducers.java;h=1fdb9864f1de94b304fbeb597745c7f600b4b725;hb=ce4b14cdfcd3a2e131c7364776452a67ee858f72;hp=fcc915684a0ef63916a1ea897e4fab9d4be2a423;hpb=f0019c8168b9d59f132ba47e939e8aa3ef324b7d;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java index fcc91568..1fdb9864 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java @@ -28,8 +28,8 @@ import java.util.Map; import java.util.Vector; import org.immutables.value.Value.Immutable; -import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks; -import org.oransc.enrichment.controllers.producer.ProducerCallbacks; +import org.oransc.enrichment.controllers.a1e.A1eCallbacks; +import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks; import org.oransc.enrichment.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,31 +50,20 @@ public class EiProducers { private ProducerCallbacks producerCallbacks; @Autowired - private ConsumerCallbacks consumerCallbacks; - - @Autowired - private EiTypes eiTypes; + private A1eCallbacks consumerCallbacks; @Autowired private EiJobs eiJobs; - @Immutable - public interface EiTypeRegistrationInfo { - String id(); - - Object jobDataSchema(); - } - @Immutable public interface EiProducerRegistrationInfo { String id(); - Collection supportedTypes(); + Collection supportedTypes(); String jobCallbackUrl(); String producerSupervisionCallbackUrl(); - } public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) { @@ -93,45 +82,22 @@ public class EiProducers { producersByType.put(type.getId(), producer.getId(), producer); } - if (previousDefinition != null) { - purgeTypes(previousDefinition.getEiTypes()); - this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition); - } + Collection previousTypes = + previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>(); - producerCallbacks.restartEiJobs(producer, this.eiJobs); - consumerCallbacks.notifyConsumersProducerAdded(producer); - return producer; - } + producerCallbacks.startEiJobs(producer, this.eiJobs) // + .collectList() // + .flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) // + .collectList() // + .flatMapMany(list -> consumerCallbacks.notifyJobStatus(previousTypes)) // + .subscribe(); - private void purgeTypes(Collection types) { - for (EiType type : types) { - if (getProducersForType(type.getId()).isEmpty()) { - this.eiTypes.remove(type); - } - } - } - - private EiType getType(EiTypeRegistrationInfo typeInfo) { - EiType type = this.eiTypes.get(typeInfo.id()); - if (type == null) { - type = new EiType(typeInfo.id(), typeInfo.jobDataSchema()); - this.eiTypes.put(type); - this.consumerCallbacks.notifyConsumersTypeAdded(type); - } - return type; + return producer; } private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) { - ArrayList types = new ArrayList<>(); - - EiProducer producer = new EiProducer(producerInfo.id(), types, producerInfo.jobCallbackUrl(), + return new EiProducer(producerInfo.id(), producerInfo.supportedTypes(), producerInfo.jobCallbackUrl(), producerInfo.producerSupervisionCallbackUrl()); - - for (EiTypeRegistrationInfo typeInfo : producerInfo.supportedTypes()) { - EiType type = getType(typeInfo); - types.add(type); - } - return producer; } public synchronized Collection getAllProducers() { @@ -159,17 +125,15 @@ public class EiProducers { this.producersByType.clear(); } - public void deregisterProducer(EiProducer producer, EiTypes eiTypes) { + public void deregisterProducer(EiProducer producer) { allEiProducers.remove(producer.getId()); for (EiType type : producer.getEiTypes()) { if (producersByType.remove(type.getId(), producer.getId()) == null) { this.logger.error("Bug, no producer found"); } - if (this.producersByType.get(type.getId()).isEmpty()) { - eiTypes.remove(type); - } } - this.consumerCallbacks.notifyConsumersProducerDeleted(producer); + this.consumerCallbacks.notifyJobStatus(producer.getEiTypes()) // + .subscribe(); } public synchronized Collection getProducersForType(EiType type) { @@ -188,4 +152,13 @@ public class EiProducers { return producerIds; } + public synchronized boolean isJobEnabled(EiJob job) { + for (EiProducer producer : this.producersByType.get(job.getTypeId())) { + if (producer.isJobEnabled(job)) { + return true; + } + } + return false; + } + }