Some changes in status notifications
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / controllers / producer / ProducerCallbacks.java
index 5a47b58..dc732e1 100644 (file)
@@ -24,6 +24,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Vector;
 
@@ -31,6 +32,7 @@ import org.oransc.enrichment.clients.AsyncRestClient;
 import org.oransc.enrichment.clients.AsyncRestClientFactory;
 import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.repository.EiJob;
+import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
 import org.oransc.enrichment.repository.EiTypes;
 import org.slf4j.Logger;
@@ -40,6 +42,7 @@ import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 /**
  * Callbacks to the EiProducer
@@ -63,7 +66,7 @@ public class ProducerCallbacks {
 
     public void notifyProducersJobDeleted(EiJob eiJob) {
         for (EiProducer producer : getProducers(eiJob)) {
-            String url = producer.getJobCallbackUrl() + "/" + eiJob.id();
+            String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
             restClient.delete(url) //
                 .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
                     throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
@@ -79,25 +82,40 @@ public class ProducerCallbacks {
      * @return the number of producers that returned OK
      */
     public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+        Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
         return Flux.fromIterable(getProducers(eiJob)) //
-            .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) //
+            .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) //
             .collectList() //
             .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
     }
 
     /**
-     * Calls one producer for an EiJob activation.
+     * Restart all jobs for one producer
      * 
-     * @param producer a producer
-     * @param eiJob an EI job
-     * @return the body of the response from the REST call
+     * @param producer
+     * @param eiJobs
      */
-    public Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob) {
+    public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+        final int maxNoOfParalellRequests = 10;
+        Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
+
+        Flux.fromIterable(producer.getEiTypes()) //
+            .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
+            .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
+            .onErrorResume(t -> {
+                logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
+                return Flux.empty();
+            }) //
+            .subscribe();
+    }
+
+    private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
         String body = gson.toJson(request);
 
-        return restClient.post(producer.getJobCallbackUrl(), body)
-            .doOnNext(resp -> logger.debug("Job subscription started OK {}", producer.getId()))
+        return restClient.post(producer.getJobCallbackUrl(), body) //
+            .retryWhen(retrySpec) //
+            .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
             .onErrorResume(throwable -> {
                 logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
                 return Mono.empty();
@@ -106,7 +124,7 @@ public class ProducerCallbacks {
 
     private Collection<EiProducer> getProducers(EiJob eiJob) {
         try {
-            return this.eiTypes.getType(eiJob.typeId()).getProducers();
+            return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
         } catch (Exception e) {
             return new Vector<>();
         }