+ @Autowired
+ private ConsumerCallbacks consumerCallbacks;
+
+ @Autowired
+ private EiJobs eiJobs;
+
+ @Immutable
+ public interface EiProducerRegistrationInfo {
+ String id();
+
+ Collection<EiType> supportedTypes();
+
+ String jobCallbackUrl();
+
+ String producerSupervisionCallbackUrl();
+ }
+
+ public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) {
+ final String producerId = producerInfo.id();
+ EiProducer previousDefinition = this.get(producerId);
+ if (previousDefinition != null) {
+ for (EiType type : previousDefinition.getEiTypes()) {
+ producersByType.remove(type.getId(), producerId);
+ }
+ allEiProducers.remove(producerId);
+ }
+
+ EiProducer producer = createProducer(producerInfo);
+ allEiProducers.put(producer.getId(), producer);
+ for (EiType type : producer.getEiTypes()) {
+ producersByType.put(type.getId(), producer.getId(), producer);
+ }
+
+ Collection<EiType> previousTypes =
+ previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>();
+
+ producerCallbacks.startEiJobs(producer, this.eiJobs) //
+ .collectList() //
+ .flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .collectList() //
+ .flatMapMany(list -> consumerCallbacks.notifyJobStatus(previousTypes)) //
+ .subscribe();
+
+ return producer;
+ }
+
+ private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) {
+ return new EiProducer(producerInfo.id(), producerInfo.supportedTypes(), producerInfo.jobCallbackUrl(),
+ producerInfo.producerSupervisionCallbackUrl());