+ producerCallbacks.restartEiJobs(producer, this.eiJobs);
+ consumerCallbacks.notifyConsumersProducerAdded(producer);
+ return producer;
+ }
+
+ private void purgeTypes(Collection<EiType> types) {
+ for (EiType type : types) {
+ if (getProducersForType(type.getId()).isEmpty()) {
+ this.eiTypes.remove(type);
+ }
+ }
+ }
+
+ private EiType getType(EiTypeRegistrationInfo typeInfo) {
+ EiType type = this.eiTypes.get(typeInfo.id());
+ if (type == null) {
+ type = new EiType(typeInfo.id(), typeInfo.jobDataSchema());
+ this.eiTypes.put(type);
+ this.consumerCallbacks.notifyConsumersTypeAdded(type);
+ }
+ return type;
+ }
+
+ private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) {
+ ArrayList<EiType> types = new ArrayList<>();
+
+ EiProducer producer = new EiProducer(producerInfo.id(), types, producerInfo.jobCallbackUrl(),
+ producerInfo.producerSupervisionCallbackUrl());
+
+ for (EiTypeRegistrationInfo typeInfo : producerInfo.supportedTypes()) {
+ EiType type = getType(typeInfo);
+ types.add(type);
+ }
+ return producer;