@Autowired
private ConsumerCallbacks consumerCallbacks;
- @Autowired
- private EiTypes eiTypes;
-
@Autowired
private EiJobs eiJobs;
- @Immutable
- public interface EiTypeRegistrationInfo {
- String id();
-
- Object jobDataSchema();
- }
-
@Immutable
public interface EiProducerRegistrationInfo {
String id();
- Collection<EiTypeRegistrationInfo> supportedTypes();
+ Collection<EiType> supportedTypes();
String jobCallbackUrl();
String producerSupervisionCallbackUrl();
-
}
public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) {
producersByType.put(type.getId(), producer.getId(), producer);
}
- if (previousDefinition != null) {
- purgeTypes(previousDefinition.getEiTypes());
- this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
- }
+ Collection<EiType> previousTypes =
+ previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>();
- producerCallbacks.restartEiJobs(producer, this.eiJobs);
- consumerCallbacks.notifyConsumersProducerAdded(producer);
- return producer;
- }
+ producerCallbacks.startEiJobs(producer, this.eiJobs) //
+ .collectList() //
+ .flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .collectList() //
+ .flatMapMany(list -> consumerCallbacks.notifyJobStatus(previousTypes)) //
+ .subscribe();
- private void purgeTypes(Collection<EiType> types) {
- for (EiType type : types) {
- if (getProducersForType(type.getId()).isEmpty()) {
- this.eiTypes.remove(type);
- }
- }
- }
-
- private EiType getType(EiTypeRegistrationInfo typeInfo) {
- EiType type = this.eiTypes.get(typeInfo.id());
- if (type == null) {
- type = new EiType(typeInfo.id(), typeInfo.jobDataSchema());
- this.eiTypes.put(type);
- this.consumerCallbacks.notifyConsumersTypeAdded(type);
- }
- return type;
+ return producer;
}
private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) {
- ArrayList<EiType> types = new ArrayList<>();
-
- EiProducer producer = new EiProducer(producerInfo.id(), types, producerInfo.jobCallbackUrl(),
+ return new EiProducer(producerInfo.id(), producerInfo.supportedTypes(), producerInfo.jobCallbackUrl(),
producerInfo.producerSupervisionCallbackUrl());
-
- for (EiTypeRegistrationInfo typeInfo : producerInfo.supportedTypes()) {
- EiType type = getType(typeInfo);
- types.add(type);
- }
- return producer;
}
public synchronized Collection<EiProducer> getAllProducers() {
this.producersByType.clear();
}
- public void deregisterProducer(EiProducer producer, EiTypes eiTypes) {
+ public void deregisterProducer(EiProducer producer) {
allEiProducers.remove(producer.getId());
for (EiType type : producer.getEiTypes()) {
if (producersByType.remove(type.getId(), producer.getId()) == null) {
this.logger.error("Bug, no producer found");
}
- if (this.producersByType.get(type.getId()).isEmpty()) {
- eiTypes.remove(type);
- }
}
- this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
+ this.consumerCallbacks.notifyJobStatus(producer.getEiTypes()) //
+ .subscribe();
}
public synchronized Collection<EiProducer> getProducersForType(EiType type) {
return producerIds;
}
+ public synchronized boolean isJobEnabled(EiJob job) {
+ for (EiProducer producer : this.producersByType.get(job.getTypeId())) {
+ if (producer.isJobEnabled(job)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}