Added Information consumer API
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / repository / EiProducers.java
index fcc9156..1fdb986 100644 (file)
@@ -28,8 +28,8 @@ import java.util.Map;
 import java.util.Vector;
 
 import org.immutables.value.Value.Immutable;
-import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
-import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
+import org.oransc.enrichment.controllers.a1e.A1eCallbacks;
+import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks;
 import org.oransc.enrichment.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,31 +50,20 @@ public class EiProducers {
     private ProducerCallbacks producerCallbacks;
 
     @Autowired
-    private ConsumerCallbacks consumerCallbacks;
-
-    @Autowired
-    private EiTypes eiTypes;
+    private A1eCallbacks consumerCallbacks;
 
     @Autowired
     private EiJobs eiJobs;
 
-    @Immutable
-    public interface EiTypeRegistrationInfo {
-        String id();
-
-        Object jobDataSchema();
-    }
-
     @Immutable
     public interface EiProducerRegistrationInfo {
         String id();
 
-        Collection<EiTypeRegistrationInfo> supportedTypes();
+        Collection<EiType> supportedTypes();
 
         String jobCallbackUrl();
 
         String producerSupervisionCallbackUrl();
-
     }
 
     public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) {
@@ -93,45 +82,22 @@ public class EiProducers {
             producersByType.put(type.getId(), producer.getId(), producer);
         }
 
-        if (previousDefinition != null) {
-            purgeTypes(previousDefinition.getEiTypes());
-            this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
-        }
+        Collection<EiType> previousTypes =
+            previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>();
 
-        producerCallbacks.restartEiJobs(producer, this.eiJobs);
-        consumerCallbacks.notifyConsumersProducerAdded(producer);
-        return producer;
-    }
+        producerCallbacks.startEiJobs(producer, this.eiJobs) //
+            .collectList() //
+            .flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+            .collectList() //
+            .flatMapMany(list -> consumerCallbacks.notifyJobStatus(previousTypes)) //
+            .subscribe();
 
-    private void purgeTypes(Collection<EiType> types) {
-        for (EiType type : types) {
-            if (getProducersForType(type.getId()).isEmpty()) {
-                this.eiTypes.remove(type);
-            }
-        }
-    }
-
-    private EiType getType(EiTypeRegistrationInfo typeInfo) {
-        EiType type = this.eiTypes.get(typeInfo.id());
-        if (type == null) {
-            type = new EiType(typeInfo.id(), typeInfo.jobDataSchema());
-            this.eiTypes.put(type);
-            this.consumerCallbacks.notifyConsumersTypeAdded(type);
-        }
-        return type;
+        return producer;
     }
 
     private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) {
-        ArrayList<EiType> types = new ArrayList<>();
-
-        EiProducer producer = new EiProducer(producerInfo.id(), types, producerInfo.jobCallbackUrl(),
+        return new EiProducer(producerInfo.id(), producerInfo.supportedTypes(), producerInfo.jobCallbackUrl(),
             producerInfo.producerSupervisionCallbackUrl());
-
-        for (EiTypeRegistrationInfo typeInfo : producerInfo.supportedTypes()) {
-            EiType type = getType(typeInfo);
-            types.add(type);
-        }
-        return producer;
     }
 
     public synchronized Collection<EiProducer> getAllProducers() {
@@ -159,17 +125,15 @@ public class EiProducers {
         this.producersByType.clear();
     }
 
-    public void deregisterProducer(EiProducer producer, EiTypes eiTypes) {
+    public void deregisterProducer(EiProducer producer) {
         allEiProducers.remove(producer.getId());
         for (EiType type : producer.getEiTypes()) {
             if (producersByType.remove(type.getId(), producer.getId()) == null) {
                 this.logger.error("Bug, no producer found");
             }
-            if (this.producersByType.get(type.getId()).isEmpty()) {
-                eiTypes.remove(type);
-            }
         }
-        this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
+        this.consumerCallbacks.notifyJobStatus(producer.getEiTypes()) //
+            .subscribe();
     }
 
     public synchronized Collection<EiProducer> getProducersForType(EiType type) {
@@ -188,4 +152,13 @@ public class EiProducers {
         return producerIds;
     }
 
+    public synchronized boolean isJobEnabled(EiJob job) {
+        for (EiProducer producer : this.producersByType.get(job.getTypeId())) {
+            if (producer.isJobEnabled(job)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
 }