Restarting jobs in producer supervision
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / controllers / producer / ProducerCallbacks.java
index 6d74b49..9b489cd 100644 (file)
@@ -57,9 +57,14 @@ public class ProducerCallbacks {
         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
     }
 
+    public Mono<String> healthCheck(EiProducer producer) {
+        return restClient.get(producer.getProducerSupervisionCallbackUrl());
+    }
+
     public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
         for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
             String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
+            producer.setJobDisabled(eiJob);
             restClient.delete(url) //
                 .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
                     throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
@@ -69,7 +74,7 @@ public class ProducerCallbacks {
     }
 
     /**
-     * Calls all producers for an EiJob activation.
+     * Start a job in all producers that suports the job type
      * 
      * @param eiJob an EI job
      * @return the number of producers that returned OK
@@ -77,40 +82,35 @@ public class ProducerCallbacks {
     public Mono<Integer> startEiJob(EiJob eiJob, EiProducers eiProducers) {
         Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
         return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
-            .flatMap(eiProducer -> postStartEiJob(eiProducer, eiJob, retrySpec)) //
+            .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) //
             .collectList() //
             .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
     }
 
     /**
-     * Restart all jobs for one producer
+     * Start all jobs for one producer
      * 
      * @param producer
      * @param eiJobs
      */
-    public Flux<String> restartEiJobs(EiProducer producer, EiJobs eiJobs) {
+    public Flux<String> startEiJobs(EiProducer producer, EiJobs eiJobs) {
         final int maxNoOfParalellRequests = 10;
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
 
         return Flux.fromIterable(producer.getEiTypes()) //
             .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
-            .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) //
-            .onErrorResume(t -> {
-                logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
-                return Flux.empty();
-            }); //
-
+            .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests);
     }
 
-    private Mono<String> postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
+    public Mono<String> startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
         String body = gson.toJson(request);
 
         return restClient.post(producer.getJobCallbackUrl(), body) //
             .retryWhen(retrySpec) //
             .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
-            .doOnNext(resp -> producer.setJobDisabled(eiJob)) //
             .onErrorResume(throwable -> {
+                producer.setJobDisabled(eiJob);
                 logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
                 return Mono.empty();
             }) //