return Flux.fromIterable(getProducersForJob(infoJob, infoProducers)) //
.flatMap(infoProducer -> startInfoJob(infoProducer, infoJob, retrySpec)) //
.collectList() //
- .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
+ .map(okResponses -> Integer.valueOf(okResponses.size())); //
}
/**
.doOnNext(resp -> logger.debug("Job subscription {} started OK {}", infoJob.getId(), producer.getId())) //
.onErrorResume(throwable -> {
producer.setJobDisabled(infoJob);
- logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
+ logger.warn("Job subscription failed id: {} url: {}, reason: {}", producer.getId(),
+ producer.getJobCallbackUrl(), throwable.toString());
return Mono.empty();
}) //
.doOnNext(resp -> producer.setJobEnabled(infoJob));