for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
restClient.delete(url) //
- .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
+ .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
throwable.getMessage()),
null);
* @param producer
* @param eiJobs
*/
- public void restartEiJobs(EiProducer producer, EiJobs eiJobs) {
+ public Flux<String> restartEiJobs(EiProducer producer, EiJobs eiJobs) {
final int maxNoOfParalellRequests = 10;
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
- Flux.fromIterable(producer.getEiTypes()) //
+ return Flux.fromIterable(producer.getEiTypes()) //
.flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
.flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) //
.onErrorResume(t -> {
logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
return Flux.empty();
- }) //
- .subscribe();
+ }); //
+
}
private Mono<String> postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
return restClient.post(producer.getJobCallbackUrl(), body) //
.retryWhen(retrySpec) //
.doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
+ .doOnNext(resp -> producer.setJobDisabled(eiJob)) //
.onErrorResume(throwable -> {
logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
return Mono.empty();
- });
+ }) //
+ .doOnNext(resp -> producer.setJobEnabled(eiJob));
}
private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {