package org.oransc.enrichment.repository;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
+import org.immutables.value.Value.Immutable;
+import org.oransc.enrichment.controllers.a1e.A1eCallbacks;
+import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks;
import org.oransc.enrichment.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
/**
* Dynamic representation of all EiProducers.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+@Component
public class EiProducers {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, EiProducer> allEiProducers = new HashMap<>();
+ private final MultiMap<EiProducer> producersByType = new MultiMap<>();
- public synchronized void put(EiProducer producer) {
+ @Autowired
+ private ProducerCallbacks producerCallbacks;
+
+ @Autowired
+ private A1eCallbacks consumerCallbacks;
+
+ @Autowired
+ private EiJobs eiJobs;
+
+ @Immutable
+ public interface EiProducerRegistrationInfo {
+ String id();
+
+ Collection<EiType> supportedTypes();
+
+ String jobCallbackUrl();
+
+ String producerSupervisionCallbackUrl();
+ }
+
+ public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) {
+ final String producerId = producerInfo.id();
+ EiProducer previousDefinition = this.get(producerId);
+ if (previousDefinition != null) {
+ for (EiType type : previousDefinition.getEiTypes()) {
+ producersByType.remove(type.getId(), producerId);
+ }
+ allEiProducers.remove(producerId);
+ }
+
+ EiProducer producer = createProducer(producerInfo);
allEiProducers.put(producer.getId(), producer);
+ for (EiType type : producer.getEiTypes()) {
+ producersByType.put(type.getId(), producer.getId(), producer);
+ }
+
+ Collection<EiType> previousTypes =
+ previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>();
+
+ producerCallbacks.startEiJobs(producer, this.eiJobs) //
+ .collectList() //
+ .flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .collectList() //
+ .flatMapMany(list -> consumerCallbacks.notifyJobStatus(previousTypes)) //
+ .subscribe();
+
+ return producer;
+ }
+
+ private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) {
+ return new EiProducer(producerInfo.id(), producerInfo.supportedTypes(), producerInfo.jobCallbackUrl(),
+ producerInfo.producerSupervisionCallbackUrl());
}
public synchronized Collection<EiProducer> getAllProducers() {
return allEiProducers.get(id);
}
- public synchronized void remove(String id) {
- this.allEiProducers.remove(id);
- }
-
public synchronized int size() {
return allEiProducers.size();
}
public synchronized void clear() {
this.allEiProducers.clear();
+ this.producersByType.clear();
}
- public void deregisterProducer(EiProducer producer, EiTypes eiTypes, EiJobs eiJobs) {
- this.remove(producer);
+ public void deregisterProducer(EiProducer producer) {
+ allEiProducers.remove(producer.getId());
for (EiType type : producer.getEiTypes()) {
- boolean removed = type.removeProducer(producer) != null;
- if (!removed) {
+ if (producersByType.remove(type.getId(), producer.getId()) == null) {
this.logger.error("Bug, no producer found");
}
- if (type.getProducerIds().isEmpty()) {
- eiTypes.remove(type);
- }
}
+ this.consumerCallbacks.notifyJobStatus(producer.getEiTypes()) //
+ .subscribe();
+ }
+
+ public synchronized Collection<EiProducer> getProducersForType(EiType type) {
+ return this.producersByType.get(type.getId());
}
- private synchronized void remove(EiProducer producer) {
- this.allEiProducers.remove(producer.getId());
+ public synchronized Collection<EiProducer> getProducersForType(String typeId) {
+ return this.producersByType.get(typeId);
+ }
+
+ public synchronized Collection<String> getProducerIdsForType(String typeId) {
+ Collection<String> producerIds = new ArrayList<>();
+ for (EiProducer p : this.getProducersForType(typeId)) {
+ producerIds.add(p.getId());
+ }
+ return producerIds;
+ }
+
+ public synchronized boolean isJobEnabled(EiJob job) {
+ for (EiProducer producer : this.producersByType.get(job.getTypeId())) {
+ if (producer.isJobEnabled(job)) {
+ return true;
+ }
+ }
+ return false;
}
}