NONRTRIC - ECS updates of the NBI
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / controllers / r1producer / ProducerCallbacks.java
index 26dd1a1..61cd519 100644 (file)
@@ -30,10 +30,10 @@ import java.util.Collection;
 import org.oransc.enrichment.clients.AsyncRestClient;
 import org.oransc.enrichment.clients.AsyncRestClientFactory;
 import org.oransc.enrichment.configuration.ApplicationConfig;
-import org.oransc.enrichment.repository.EiJob;
-import org.oransc.enrichment.repository.EiJobs;
-import org.oransc.enrichment.repository.EiProducer;
-import org.oransc.enrichment.repository.EiProducers;
+import org.oransc.enrichment.repository.InfoJob;
+import org.oransc.enrichment.repository.InfoJobs;
+import org.oransc.enrichment.repository.InfoProducer;
+import org.oransc.enrichment.repository.InfoProducers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +42,7 @@ import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
 /**
- * Callbacks to the EiProducer
+ * Callbacks to the Producer
  */
 @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
 public class ProducerCallbacks {
@@ -57,14 +57,14 @@ public class ProducerCallbacks {
         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
     }
 
-    public Mono<String> healthCheck(EiProducer producer) {
+    public Mono<String> healthCheck(InfoProducer producer) {
         return restClient.get(producer.getProducerSupervisionCallbackUrl());
     }
 
-    public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
-        for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
-            String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
-            producer.setJobDisabled(eiJob);
+    public void stopInfoJob(InfoJob infoJob, InfoProducers infoProducers) {
+        for (InfoProducer producer : getProducersForJob(infoJob, infoProducers)) {
+            String url = producer.getJobCallbackUrl() + "/" + infoJob.getId();
+            producer.setJobDisabled(infoJob);
             restClient.delete(url) //
                 .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
                     throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
@@ -76,13 +76,13 @@ public class ProducerCallbacks {
     /**
      * Start a job in all producers that suports the job type
      * 
-     * @param eiJob an EI job
+     * @param infoJob an Information Job
      * @return the number of producers that returned OK
      */
-    public Mono<Integer> startInfoSubscriptionJob(EiJob eiJob, EiProducers eiProducers) {
+    public Mono<Integer> startInfoSubscriptionJob(InfoJob infoJob, InfoProducers infoProducers) {
         Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
-        return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
-            .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) //
+        return Flux.fromIterable(getProducersForJob(infoJob, infoProducers)) //
+            .flatMap(infoProducer -> startInfoJob(infoProducer, infoJob, retrySpec)) //
             .collectList() //
             .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
     }
@@ -91,34 +91,34 @@ public class ProducerCallbacks {
      * Start all jobs for one producer
      * 
      * @param producer
-     * @param eiJobs
+     * @param infoJobs
      */
-    public Flux<String> startEiJobs(EiProducer producer, EiJobs eiJobs) {
+    public Flux<String> startInfoJobs(InfoProducer producer, InfoJobs infoJobs) {
         final int maxNoOfParalellRequests = 10;
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
 
-        return Flux.fromIterable(producer.getEiTypes()) //
-            .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
-            .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests);
+        return Flux.fromIterable(producer.getInfoTypes()) //
+            .flatMap(type -> Flux.fromIterable(infoJobs.getJobsForType(type))) //
+            .flatMap(job -> startInfoJob(producer, job, retrySpec), maxNoOfParalellRequests);
     }
 
-    public Mono<String> startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
-        ProducerJobInfo request = new ProducerJobInfo(eiJob);
+    public Mono<String> startInfoJob(InfoProducer producer, InfoJob infoJob, Retry retrySpec) {
+        ProducerJobInfo request = new ProducerJobInfo(infoJob);
         String body = gson.toJson(request);
 
         return restClient.post(producer.getJobCallbackUrl(), body) //
             .retryWhen(retrySpec) //
-            .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
+            .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", infoJob.getId(), producer.getId())) //
             .onErrorResume(throwable -> {
-                producer.setJobDisabled(eiJob);
+                producer.setJobDisabled(infoJob);
                 logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
                 return Mono.empty();
             }) //
-            .doOnNext(resp -> producer.setJobEnabled(eiJob));
+            .doOnNext(resp -> producer.setJobEnabled(infoJob));
     }
 
-    private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {
-        return eiProducers.getProducersForType(eiJob.getTypeId());
+    private Collection<InfoProducer> getProducersForJob(InfoJob infoJob, InfoProducers infoProducers) {
+        return infoProducers.getProducersForType(infoJob.getTypeId());
     }
 
 }