ProducerStatusInfo.OperationalState opState =
producer.isAvailable() ? ProducerStatusInfo.OperationalState.ENABLED
: ProducerStatusInfo.OperationalState.DISABLED;
- this.logger.debug("opState {}", opState);
return new ProducerStatusInfo(opState);
}
}
}
- registerProducer(eiProducerId, registrationInfo);
+ EiProducer producer = registerProducer(eiProducerId, registrationInfo);
if (previousDefinition != null) {
purgeTypes(previousDefinition.getEiTypes());
+ this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
}
+ this.consumerCallbacks.notifyConsumersProducerAdded(producer);
return new ResponseEntity<>(previousDefinition == null ? HttpStatus.CREATED : HttpStatus.OK);
} catch (Exception e) {
}
private EiProducer registerProducer(String producerId, ProducerRegistrationInfo registrationInfo) {
- ArrayList<EiType> types = new ArrayList<>();
+ ArrayList<EiType> typesForProducer = new ArrayList<>();
+ EiProducer producer = createProducer(typesForProducer, producerId, registrationInfo);
for (ProducerEiTypeRegistrationInfo typeInfo : registrationInfo.types) {
- types.add(registerType(typeInfo));
+ EiType type = registerType(typeInfo);
+ typesForProducer.add(type);
+ type.addProducer(producer); //
}
- EiProducer producer = createProducer(types, producerId, registrationInfo);
this.eiProducers.put(producer);
- for (EiType type : types) {
- for (EiJob job : this.eiJobs.getJobsForType(type)) {
- this.producerCallbacks.notifyProducerJobStarted(producer, job) //
- .subscribe();
- }
- type.addProducer(producer);
- }
+ producerCallbacks.restartJobs(producer, this.eiJobs);
+
return producer;
}