Restarting jobs in producer supervision 47/5447/1
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 15 Jan 2021 12:12:39 +0000 (13:12 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 15 Jan 2021 12:14:54 +0000 (13:14 +0100)
For enabled producers, jobs that has not been successfully started are started.
Added a timkestamp on EiJobs in the producer API

Change-Id: I39973499aefcba84f371c9d53008babf4ff38460
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-378

enrichment-coordinator-service/api/ecs-api.json
enrichment-coordinator-service/api/ecs-api.yaml
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducer.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java

index 6d2f0fa..f1517df 100644 (file)
                     "description": "Idenitity of the EI job",
                     "type": "string"
                 },
+                "last_updated": {
+                    "description": "The time when the job was last updated or created (ISO-8601)",
+                    "type": "string"
+                },
                 "ei_job_data": {
                     "description": "Json for the job data",
                     "type": "object"
index 3dd53b6..7b3d011 100644 (file)
@@ -745,6 +745,9 @@ components:
         ei_job_identity:
           type: string
           description: Idenitity of the EI job
+        last_updated:
+          type: string
+          description: The time when the job was last updated or created (ISO-8601)
         ei_job_data:
           type: object
           properties: {}
index 6d74b49..9b489cd 100644 (file)
@@ -57,9 +57,14 @@ public class ProducerCallbacks {
         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
     }
 
+    public Mono<String> healthCheck(EiProducer producer) {
+        return restClient.get(producer.getProducerSupervisionCallbackUrl());
+    }
+
     public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
         for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
             String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
+            producer.setJobDisabled(eiJob);
             restClient.delete(url) //
                 .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
                     throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
@@ -69,7 +74,7 @@ public class ProducerCallbacks {
     }
 
     /**
-     * Calls all producers for an EiJob activation.
+     * Start a job in all producers that suports the job type
      * 
      * @param eiJob an EI job
      * @return the number of producers that returned OK
@@ -77,40 +82,35 @@ public class ProducerCallbacks {
     public Mono<Integer> startEiJob(EiJob eiJob, EiProducers eiProducers) {
         Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
         return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
-            .flatMap(eiProducer -> postStartEiJob(eiProducer, eiJob, retrySpec)) //
+            .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) //
             .collectList() //
             .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
     }
 
     /**
-     * Restart all jobs for one producer
+     * Start all jobs for one producer
      * 
      * @param producer
      * @param eiJobs
      */
-    public Flux<String> restartEiJobs(EiProducer producer, EiJobs eiJobs) {
+    public Flux<String> startEiJobs(EiProducer producer, EiJobs eiJobs) {
         final int maxNoOfParalellRequests = 10;
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
 
         return Flux.fromIterable(producer.getEiTypes()) //
             .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
-            .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) //
-            .onErrorResume(t -> {
-                logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
-                return Flux.empty();
-            }); //
-
+            .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests);
     }
 
-    private Mono<String> postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
+    public Mono<String> startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
         String body = gson.toJson(request);
 
         return restClient.post(producer.getJobCallbackUrl(), body) //
             .retryWhen(retrySpec) //
             .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
-            .doOnNext(resp -> producer.setJobDisabled(eiJob)) //
             .onErrorResume(throwable -> {
+                producer.setJobDisabled(eiJob);
                 logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
                 return Mono.empty();
             }) //
index c02c280..bc3dba2 100644 (file)
@@ -60,16 +60,23 @@ public class ProducerJobInfo {
     @JsonProperty("owner")
     public String owner;
 
-    public ProducerJobInfo(Object jobData, String id, String typeId, String targetUri, String owner) {
+    @ApiModelProperty(value = "The time when the job was last updated or created (ISO-8601)")
+    @SerializedName("last_updated")
+    @JsonProperty("last_updated")
+    public String lastUpdated;
+
+    public ProducerJobInfo(Object jobData, String id, String typeId, String targetUri, String owner,
+        String lastUpdated) {
         this.id = id;
         this.jobData = jobData;
         this.typeId = typeId;
         this.targetUri = targetUri;
         this.owner = owner;
+        this.lastUpdated = lastUpdated;
     }
 
     public ProducerJobInfo(EiJob job) {
-        this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl(), job.getOwner());
+        this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl(), job.getOwner(), job.getLastUpdated());
     }
 
     public ProducerJobInfo() {
index 9825ab7..46602f3 100644 (file)
@@ -21,6 +21,7 @@
 package org.oransc.enrichment.repository;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 
 import lombok.Builder;
 import lombok.Getter;
@@ -53,6 +54,10 @@ public class EiJob {
     @Getter
     private final String jobStatusUrl;
 
+    @Getter
+    @Builder.Default
+    private String lastUpdated = Instant.now().toString();
+
     @Getter
     @Builder.Default
     private boolean isLastStatusReportedEnabled = true;
index d5423ae..d8b2015 100644 (file)
@@ -75,7 +75,10 @@ public class EiProducer {
         this.enabledJobs.remove(job.getId());
     }
 
-    synchronized boolean isJobEnabled(EiJob job) {
+    /**
+     * Is the job enabled for this producer?
+     */
+    public synchronized boolean isJobEnabled(EiJob job) {
         return this.enabledJobs.contains(job.getId());
     }
 
index a0d30e8..f0fc49f 100644 (file)
@@ -85,7 +85,7 @@ public class EiProducers {
         Collection<EiType> previousTypes =
             previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>();
 
-        producerCallbacks.restartEiJobs(producer, this.eiJobs) //
+        producerCallbacks.startEiJobs(producer, this.eiJobs) //
             .collectList() //
             .flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
             .collectList() //
index 17c77b3..c2e4b97 100644 (file)
 
 package org.oransc.enrichment.tasks;
 
-import org.oransc.enrichment.clients.AsyncRestClient;
-import org.oransc.enrichment.clients.AsyncRestClientFactory;
+import java.time.Duration;
+
 import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
+import org.oransc.enrichment.repository.EiJob;
+import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
 import org.oransc.enrichment.repository.EiProducers;
 import org.slf4j.Logger;
@@ -34,6 +38,7 @@ import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 /**
  * Regularly checks the availability of the EI Producers
@@ -45,13 +50,17 @@ public class ProducerSupervision {
     private static final Logger logger = LoggerFactory.getLogger(ProducerSupervision.class);
 
     private final EiProducers eiProducers;
-    private final AsyncRestClient restClient;
+    private final EiJobs eiJobs;
+    private final ProducerCallbacks producerCallbacks;
+    private final ConsumerCallbacks consumerCallbacks;
 
     @Autowired
-    public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers) {
-        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
-        this.restClient = restClientFactory.createRestClientNoHttpProxy("");
+    public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
+        ProducerCallbacks producerCallbacks, ConsumerCallbacks consumerCallbacks) {
         this.eiProducers = eiProducers;
+        this.eiJobs = eiJobs;
+        this.producerCallbacks = producerCallbacks;
+        this.consumerCallbacks = consumerCallbacks;
     }
 
     @Scheduled(fixedRate = 1000 * 60 * 5)
@@ -66,13 +75,33 @@ public class ProducerSupervision {
     }
 
     private Mono<EiProducer> checkOneProducer(EiProducer producer) {
-        return restClient.get(producer.getProducerSupervisionCallbackUrl()) //
+        return this.producerCallbacks.healthCheck(producer) //
             .onErrorResume(throwable -> {
                 handleNonRespondingProducer(throwable, producer);
                 return Mono.empty();
             })//
             .doOnNext(response -> handleRespondingProducer(response, producer))
-            .flatMap(response -> Mono.just(producer));
+            .flatMap(response -> checkProducerJobs(producer)) //
+            .flatMap(responses -> Mono.just(producer));
+    }
+
+    private Mono<?> checkProducerJobs(EiProducer producer) {
+        return getEiJobs(producer) //
+            .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
+            .flatMap(eiJob -> startEiJob(producer, eiJob), 1) //
+            .collectList() //
+            .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+            .collectList();
+    }
+
+    private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
+        Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
+        return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
+    }
+
+    private Flux<EiJob> getEiJobs(EiProducer producer) {
+        return Flux.fromIterable(producer.getEiTypes()) //
+            .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));
     }
 
     private void handleNonRespondingProducer(Throwable throwable, EiProducer producer) {
index d082835..3a52fbc 100644 (file)
@@ -64,6 +64,7 @@ import org.oransc.enrichment.controllers.producer.ProducerStatusInfo;
 import org.oransc.enrichment.exceptions.ServiceException;
 import org.oransc.enrichment.repository.EiJob;
 import org.oransc.enrichment.repository.EiJobs;
+import org.oransc.enrichment.repository.EiProducer;
 import org.oransc.enrichment.repository.EiProducers;
 import org.oransc.enrichment.repository.EiType;
 import org.oransc.enrichment.repository.EiTypes;
@@ -561,6 +562,8 @@ class ApplicationTest {
 
     @Test
     void testProducerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException {
+
+        ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
         putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
 
         {
@@ -569,14 +572,12 @@ class ApplicationTest {
             putEiJob(EI_TYPE_ID, EI_JOB_ID);
             verifyJobStatus(EI_JOB_ID, "ENABLED");
             deleteEiProducer(EI_PRODUCER_ID);
+            // A Job disabled status notification shall now be received
+            await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
+            assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
             verifyJobStatus(EI_JOB_ID, "DISABLED");
         }
 
-        // Job disabled status notification shall be received
-        ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
-        await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
-        assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
-
         assertThat(this.eiProducers.size()).isEqualTo(1);
         assertThat(this.eiTypes.size()).isEqualTo(1);
         assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.ENABLED);
@@ -588,11 +589,41 @@ class ApplicationTest {
         assertThat(this.eiProducers.size()).isEqualTo(1);
         assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
 
-        // After 3 failed checks, the producer and the type shall be deregisterred
+        // After 3 failed checks, the producer shall be deregisterred
         this.producerSupervision.createTask().blockLast();
         assertThat(this.eiProducers.size()).isEqualTo(0); // The producer is removed
         assertThat(this.eiTypes.size()).isEqualTo(1); // The type remains
 
+        // Now we have one disabled job, and no producer.
+        // PUT a producer, then a Job ENABLED status notification shall be received
+        putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+        await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(2));
+        assertThat(consumerResults.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+        verifyJobStatus(EI_JOB_ID, "ENABLED");
+    }
+
+    @Test
+    void testProducerSupervision2() throws JsonMappingException, JsonProcessingException, ServiceException {
+        // Test that supervision enables not enabled jobs and sends a notification when
+        // suceeded
+
+        putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+        putEiJob(EI_TYPE_ID, EI_JOB_ID);
+
+        EiProducer producer = this.eiProducers.getProducer(EI_PRODUCER_ID);
+        EiJob job = this.eiJobs.getJob(EI_JOB_ID);
+        // Pretend that the producer did reject the job and the a DISABLED notification
+        // is sent for the job
+        producer.setJobDisabled(job);
+        job.setLastReportedStatus(false);
+        verifyJobStatus(EI_JOB_ID, "DISABLED");
+
+        // Run the supervision and wait for the job to get started in the producer
+        this.producerSupervision.createTask().blockLast();
+        ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
+        await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
+        assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+        verifyJobStatus(EI_JOB_ID, "ENABLED");
     }
 
     @Test
@@ -614,10 +645,15 @@ class ApplicationTest {
         assertThat(this.eiJobs.size()).isEqualTo(2);
 
         {
+            EiJob savedJob = this.eiJobs.getJob("jobId1");
             // Restore the jobs
             EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks);
             jobs.restoreJobsFromDatabase();
             assertThat(jobs.size()).isEqualTo(2);
+            EiJob restoredJob = jobs.getJob("jobId1");
+            assertThat(restoredJob.getId()).isEqualTo("jobId1");
+            assertThat(restoredJob.getLastUpdated()).isEqualTo(savedJob.getLastUpdated());
+
             jobs.remove("jobId1", this.eiProducers);
             jobs.remove("jobId2", this.eiProducers);
         }