+
+ final boolean isNewJob = this.eiJobs.get(eiJobId) == null;
+
+ return validatePutEiJob(eiTypeId, eiJobId, eiJobInfo) //
+ .flatMap(this::notifyProducersNewJob) //
+ .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
+ .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+ .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
+ }
+
+ private Mono<EiJob> notifyProducersNewJob(EiJob newEiJob) {
+ return this.producerCallbacks.notifyProducersJobStarted(newEiJob) //
+ .flatMap(noOfAcceptingProducers -> {
+ if (noOfAcceptingProducers.intValue() > 0) {
+ return Mono.just(newEiJob);
+ } else {
+ return Mono.error(new ServiceException("Job not accepted by any producers", HttpStatus.CONFLICT));
+ }
+ });
+ }
+
+ private Mono<EiJob> validatePutEiJob(String eiTypeId, String eiJobId, ConsumerEiJobInfo eiJobInfo) {