Restarting jobs in producer supervision
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / controllers / producer / ProducerCallbacks.java
index dc732e1..9b489cd 100644 (file)
@@ -26,7 +26,6 @@ import com.google.gson.GsonBuilder;
 import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Vector;
 
 import org.oransc.enrichment.clients.AsyncRestClient;
 import org.oransc.enrichment.clients.AsyncRestClientFactory;
@@ -34,11 +33,9 @@ import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.repository.EiJob;
 import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
-import org.oransc.enrichment.repository.EiTypes;
+import org.oransc.enrichment.repository.EiProducers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -47,7 +44,6 @@ import reactor.util.retry.Retry;
 /**
  * Callbacks to the EiProducer
  */
-@Component
 @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
 public class ProducerCallbacks {
 
@@ -55,20 +51,22 @@ public class ProducerCallbacks {
     private static Gson gson = new GsonBuilder().create();
 
     private final AsyncRestClient restClient;
-    private final EiTypes eiTypes;
 
-    @Autowired
-    public ProducerCallbacks(ApplicationConfig config, EiTypes eiTypes) {
+    public ProducerCallbacks(ApplicationConfig config) {
         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
-        this.restClient = restClientFactory.createRestClient("");
-        this.eiTypes = eiTypes;
+        this.restClient = restClientFactory.createRestClientNoHttpProxy("");
     }
 
-    public void notifyProducersJobDeleted(EiJob eiJob) {
-        for (EiProducer producer : getProducers(eiJob)) {
+    public Mono<String> healthCheck(EiProducer producer) {
+        return restClient.get(producer.getProducerSupervisionCallbackUrl());
+    }
+
+    public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
+        for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
             String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
+            producer.setJobDisabled(eiJob);
             restClient.delete(url) //
-                .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
+                .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
                     throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
                         throwable.getMessage()),
                     null);
@@ -76,40 +74,35 @@ public class ProducerCallbacks {
     }
 
     /**
-     * Calls all producers for an EiJob activation.
+     * Start a job in all producers that suports the job type
      * 
      * @param eiJob an EI job
      * @return the number of producers that returned OK
      */
-    public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+    public Mono<Integer> startEiJob(EiJob eiJob, EiProducers eiProducers) {
         Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
-        return Flux.fromIterable(getProducers(eiJob)) //
-            .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) //
+        return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
+            .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) //
             .collectList() //
             .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
     }
 
     /**
-     * Restart all jobs for one producer
+     * Start all jobs for one producer
      * 
      * @param producer
      * @param eiJobs
      */
-    public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+    public Flux<String> startEiJobs(EiProducer producer, EiJobs eiJobs) {
         final int maxNoOfParalellRequests = 10;
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
 
-        Flux.fromIterable(producer.getEiTypes()) //
+        return Flux.fromIterable(producer.getEiTypes()) //
             .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
-            .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
-            .onErrorResume(t -> {
-                logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
-                return Flux.empty();
-            }) //
-            .subscribe();
+            .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests);
     }
 
-    private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {
+    public Mono<String> startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
         String body = gson.toJson(request);
 
@@ -117,17 +110,15 @@ public class ProducerCallbacks {
             .retryWhen(retrySpec) //
             .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
             .onErrorResume(throwable -> {
+                producer.setJobDisabled(eiJob);
                 logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
                 return Mono.empty();
-            });
+            }) //
+            .doOnNext(resp -> producer.setJobEnabled(eiJob));
     }
 
-    private Collection<EiProducer> getProducers(EiJob eiJob) {
-        try {
-            return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
-        } catch (Exception e) {
-            return new Vector<>();
-        }
+    private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {
+        return eiProducers.getProducersForType(eiJob.getTypeId());
     }
 
 }