+ @RequestBody ConsumerEiJobInfo eiJobObject) {
+
+ final boolean isNewJob = this.eiJobs.get(eiJobId) == null;
+
+ return validatePutEiJob(eiJobId, eiJobObject) //
+ .flatMap(this::startEiJob) //
+ .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
+ .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+ .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
+ }
+
+ private Mono<EiJob> startEiJob(EiJob newEiJob) {
+ return this.producerCallbacks.startEiJob(newEiJob, eiProducers) //
+ .doOnNext(noOfAcceptingProducers -> this.logger.debug(
+ "Started EI job {}, number of activated producers: {}", newEiJob.getId(), noOfAcceptingProducers)) //
+ .flatMap(noOfAcceptingProducers -> Mono.just(newEiJob));
+ }
+
+ private Mono<EiJob> validatePutEiJob(String eiJobId, ConsumerEiJobInfo eiJobInfo) {