Restarting jobs in producer supervision
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / repository / EiProducers.java
index 801e7fc..f0fc49f 100644 (file)
 package org.oransc.enrichment.repository;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Vector;
 
+import org.immutables.value.Value.Immutable;
+import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
 import org.oransc.enrichment.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 /**
  * Dynamic representation of all EiProducers.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+@Component
 public class EiProducers {
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final Map<String, EiProducer> allEiProducers = new HashMap<>();
+    private final MultiMap<EiProducer> producersByType = new MultiMap<>();
 
-    public synchronized void put(EiProducer producer) {
+    @Autowired
+    private ProducerCallbacks producerCallbacks;
+
+    @Autowired
+    private ConsumerCallbacks consumerCallbacks;
+
+    @Autowired
+    private EiJobs eiJobs;
+
+    @Immutable
+    public interface EiProducerRegistrationInfo {
+        String id();
+
+        Collection<EiType> supportedTypes();
+
+        String jobCallbackUrl();
+
+        String producerSupervisionCallbackUrl();
+    }
+
+    public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) {
+        final String producerId = producerInfo.id();
+        EiProducer previousDefinition = this.get(producerId);
+        if (previousDefinition != null) {
+            for (EiType type : previousDefinition.getEiTypes()) {
+                producersByType.remove(type.getId(), producerId);
+            }
+            allEiProducers.remove(producerId);
+        }
+
+        EiProducer producer = createProducer(producerInfo);
         allEiProducers.put(producer.getId(), producer);
+        for (EiType type : producer.getEiTypes()) {
+            producersByType.put(type.getId(), producer.getId(), producer);
+        }
+
+        Collection<EiType> previousTypes =
+            previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>();
+
+        producerCallbacks.startEiJobs(producer, this.eiJobs) //
+            .collectList() //
+            .flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+            .collectList() //
+            .flatMapMany(list -> consumerCallbacks.notifyJobStatus(previousTypes)) //
+            .subscribe();
+
+        return producer;
+    }
+
+    private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) {
+        return new EiProducer(producerInfo.id(), producerInfo.supportedTypes(), producerInfo.jobCallbackUrl(),
+            producerInfo.producerSupervisionCallbackUrl());
     }
 
     public synchronized Collection<EiProducer> getAllProducers() {
@@ -58,33 +116,49 @@ public class EiProducers {
         return allEiProducers.get(id);
     }
 
-    public synchronized void remove(String id) {
-        this.allEiProducers.remove(id);
-    }
-
     public synchronized int size() {
         return allEiProducers.size();
     }
 
     public synchronized void clear() {
         this.allEiProducers.clear();
+        this.producersByType.clear();
     }
 
-    public void deregisterProducer(EiProducer producer, EiTypes eiTypes, EiJobs eiJobs) {
-        this.remove(producer);
+    public void deregisterProducer(EiProducer producer) {
+        allEiProducers.remove(producer.getId());
         for (EiType type : producer.getEiTypes()) {
-            boolean removed = type.removeProducer(producer) != null;
-            if (!removed) {
+            if (producersByType.remove(type.getId(), producer.getId()) == null) {
                 this.logger.error("Bug, no producer found");
             }
-            if (type.getProducerIds().isEmpty()) {
-                eiTypes.remove(type);
-            }
         }
+        this.consumerCallbacks.notifyJobStatus(producer.getEiTypes()) //
+            .subscribe();
+    }
+
+    public synchronized Collection<EiProducer> getProducersForType(EiType type) {
+        return this.producersByType.get(type.getId());
     }
 
-    private synchronized void remove(EiProducer producer) {
-        this.allEiProducers.remove(producer.getId());
+    public synchronized Collection<EiProducer> getProducersForType(String typeId) {
+        return this.producersByType.get(typeId);
+    }
+
+    public synchronized Collection<String> getProducerIdsForType(String typeId) {
+        Collection<String> producerIds = new ArrayList<>();
+        for (EiProducer p : this.getProducersForType(typeId)) {
+            producerIds.add(p.getId());
+        }
+        return producerIds;
+    }
+
+    public synchronized boolean isJobEnabled(EiJob job) {
+        for (EiProducer producer : this.producersByType.get(job.getTypeId())) {
+            if (producer.isJobEnabled(job)) {
+                return true;
+            }
+        }
+        return false;
     }
 
 }