Added some logging
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / controllers / producer / ProducerCallbacks.java
index 45b4475..9b489cd 100644 (file)
@@ -57,11 +57,16 @@ public class ProducerCallbacks {
         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
     }
 
+    public Mono<String> healthCheck(EiProducer producer) {
+        return restClient.get(producer.getProducerSupervisionCallbackUrl());
+    }
+
     public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
         for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
             String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
+            producer.setJobDisabled(eiJob);
             restClient.delete(url) //
-                .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
+                .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
                     throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
                         throwable.getMessage()),
                     null);
@@ -69,7 +74,7 @@ public class ProducerCallbacks {
     }
 
     /**
-     * Calls all producers for an EiJob activation.
+     * Start a job in all producers that suports the job type
      * 
      * @param eiJob an EI job
      * @return the number of producers that returned OK
@@ -77,32 +82,27 @@ public class ProducerCallbacks {
     public Mono<Integer> startEiJob(EiJob eiJob, EiProducers eiProducers) {
         Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
         return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
-            .flatMap(eiProducer -> postStartEiJob(eiProducer, eiJob, retrySpec)) //
+            .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) //
             .collectList() //
             .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
     }
 
     /**
-     * Restart all jobs for one producer
+     * Start all jobs for one producer
      * 
      * @param producer
      * @param eiJobs
      */
-    public void restartEiJobs(EiProducer producer, EiJobs eiJobs) {
+    public Flux<String> startEiJobs(EiProducer producer, EiJobs eiJobs) {
         final int maxNoOfParalellRequests = 10;
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
 
-        Flux.fromIterable(producer.getEiTypes()) //
+        return Flux.fromIterable(producer.getEiTypes()) //
             .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
-            .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) //
-            .onErrorResume(t -> {
-                logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
-                return Flux.empty();
-            }) //
-            .subscribe();
+            .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests);
     }
 
-    private Mono<String> postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
+    public Mono<String> startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
         String body = gson.toJson(request);
 
@@ -110,9 +110,11 @@ public class ProducerCallbacks {
             .retryWhen(retrySpec) //
             .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
             .onErrorResume(throwable -> {
+                producer.setJobDisabled(eiJob);
                 logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
                 return Mono.empty();
-            });
+            }) //
+            .doOnNext(resp -> producer.setJobEnabled(eiJob));
     }
 
     private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {