return validatePutEiJob(eiJobId, eiJobObject) //
.flatMap(this::startEiJob) //
.doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
- .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+ .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
.onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.INTERNAL_SERVER_ERROR)));
}
return this.producerCallbacks.startInfoSubscriptionJob(newEiJob, infoProducers) //
.doOnNext(noOfAcceptingProducers -> this.logger.debug(
"Started EI job {}, number of activated producers: {}", newEiJob.getId(), noOfAcceptingProducers)) //
- .flatMap(noOfAcceptingProducers -> Mono.just(newEiJob));
+ .map(noOfAcceptingProducers -> newEiJob);
}
private Mono<InfoJob> validatePutEiJob(String eiJobId, A1eEiJobInfo eiJobInfo) {