Merge "Update of EI Data Producer API"
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / controllers / producer / ProducerCallbacks.java
index 45b4475..6d74b49 100644 (file)
@@ -61,7 +61,7 @@ public class ProducerCallbacks {
         for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
             String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
             restClient.delete(url) //
-                .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
+                .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
                     throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
                         throwable.getMessage()),
                     null);
@@ -88,18 +88,18 @@ public class ProducerCallbacks {
      * @param producer
      * @param eiJobs
      */
-    public void restartEiJobs(EiProducer producer, EiJobs eiJobs) {
+    public Flux<String> restartEiJobs(EiProducer producer, EiJobs eiJobs) {
         final int maxNoOfParalellRequests = 10;
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
 
-        Flux.fromIterable(producer.getEiTypes()) //
+        return Flux.fromIterable(producer.getEiTypes()) //
             .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
             .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) //
             .onErrorResume(t -> {
                 logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
                 return Flux.empty();
-            }) //
-            .subscribe();
+            }); //
+
     }
 
     private Mono<String> postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
@@ -109,10 +109,12 @@ public class ProducerCallbacks {
         return restClient.post(producer.getJobCallbackUrl(), body) //
             .retryWhen(retrySpec) //
             .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
+            .doOnNext(resp -> producer.setJobDisabled(eiJob)) //
             .onErrorResume(throwable -> {
                 logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
                 return Mono.empty();
-            });
+            }) //
+            .doOnNext(resp -> producer.setJobEnabled(eiJob));
     }
 
     private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {