From 2dbde318f013212c81c4a1f477d7638ec3367aa5 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 15 Jan 2021 13:12:39 +0100 Subject: [PATCH] Restarting jobs in producer supervision For enabled producers, jobs that has not been successfully started are started. Added a timkestamp on EiJobs in the producer API Change-Id: I39973499aefcba84f371c9d53008babf4ff38460 Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-378 --- enrichment-coordinator-service/api/ecs-api.json | 4 ++ enrichment-coordinator-service/api/ecs-api.yaml | 3 ++ .../controllers/producer/ProducerCallbacks.java | 24 +++++------ .../controllers/producer/ProducerJobInfo.java | 11 ++++- .../org/oransc/enrichment/repository/EiJob.java | 5 +++ .../oransc/enrichment/repository/EiProducer.java | 5 ++- .../oransc/enrichment/repository/EiProducers.java | 2 +- .../enrichment/tasks/ProducerSupervision.java | 45 ++++++++++++++++---- .../org/oransc/enrichment/ApplicationTest.java | 48 +++++++++++++++++++--- 9 files changed, 117 insertions(+), 30 deletions(-) diff --git a/enrichment-coordinator-service/api/ecs-api.json b/enrichment-coordinator-service/api/ecs-api.json index 6d2f0fa8..f1517df7 100644 --- a/enrichment-coordinator-service/api/ecs-api.json +++ b/enrichment-coordinator-service/api/ecs-api.json @@ -595,6 +595,10 @@ "description": "Idenitity of the EI job", "type": "string" }, + "last_updated": { + "description": "The time when the job was last updated or created (ISO-8601)", + "type": "string" + }, "ei_job_data": { "description": "Json for the job data", "type": "object" diff --git a/enrichment-coordinator-service/api/ecs-api.yaml b/enrichment-coordinator-service/api/ecs-api.yaml index 3dd53b6e..7b3d011e 100644 --- a/enrichment-coordinator-service/api/ecs-api.yaml +++ b/enrichment-coordinator-service/api/ecs-api.yaml @@ -745,6 +745,9 @@ components: ei_job_identity: type: string description: Idenitity of the EI job + last_updated: + type: string + description: The time when the job was last updated or created (ISO-8601) ei_job_data: type: object properties: {} diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java index 6d74b497..9b489cdf 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java @@ -57,9 +57,14 @@ public class ProducerCallbacks { this.restClient = restClientFactory.createRestClientNoHttpProxy(""); } + public Mono healthCheck(EiProducer producer) { + return restClient.get(producer.getProducerSupervisionCallbackUrl()); + } + public void stopEiJob(EiJob eiJob, EiProducers eiProducers) { for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) { String url = producer.getJobCallbackUrl() + "/" + eiJob.getId(); + producer.setJobDisabled(eiJob); restClient.delete(url) // .subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), // throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(), @@ -69,7 +74,7 @@ public class ProducerCallbacks { } /** - * Calls all producers for an EiJob activation. + * Start a job in all producers that suports the job type * * @param eiJob an EI job * @return the number of producers that returned OK @@ -77,40 +82,35 @@ public class ProducerCallbacks { public Mono startEiJob(EiJob eiJob, EiProducers eiProducers) { Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1)); return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) // - .flatMap(eiProducer -> postStartEiJob(eiProducer, eiJob, retrySpec)) // + .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) // .collectList() // .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); // } /** - * Restart all jobs for one producer + * Start all jobs for one producer * * @param producer * @param eiJobs */ - public Flux restartEiJobs(EiProducer producer, EiJobs eiJobs) { + public Flux startEiJobs(EiProducer producer, EiJobs eiJobs) { final int maxNoOfParalellRequests = 10; Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1)); return Flux.fromIterable(producer.getEiTypes()) // .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) // - .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) // - .onErrorResume(t -> { - logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage()); - return Flux.empty(); - }); // - + .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests); } - private Mono postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) { + public Mono startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) { ProducerJobInfo request = new ProducerJobInfo(eiJob); String body = gson.toJson(request); return restClient.post(producer.getJobCallbackUrl(), body) // .retryWhen(retrySpec) // .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) // - .doOnNext(resp -> producer.setJobDisabled(eiJob)) // .onErrorResume(throwable -> { + producer.setJobDisabled(eiJob); logger.warn("Job subscription failed {}", producer.getId(), throwable.toString()); return Mono.empty(); }) // diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java index c02c280e..bc3dba2e 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java @@ -60,16 +60,23 @@ public class ProducerJobInfo { @JsonProperty("owner") public String owner; - public ProducerJobInfo(Object jobData, String id, String typeId, String targetUri, String owner) { + @ApiModelProperty(value = "The time when the job was last updated or created (ISO-8601)") + @SerializedName("last_updated") + @JsonProperty("last_updated") + public String lastUpdated; + + public ProducerJobInfo(Object jobData, String id, String typeId, String targetUri, String owner, + String lastUpdated) { this.id = id; this.jobData = jobData; this.typeId = typeId; this.targetUri = targetUri; this.owner = owner; + this.lastUpdated = lastUpdated; } public ProducerJobInfo(EiJob job) { - this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl(), job.getOwner()); + this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl(), job.getOwner(), job.getLastUpdated()); } public ProducerJobInfo() { diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java index 9825ab7c..46602f33 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java @@ -21,6 +21,7 @@ package org.oransc.enrichment.repository; import java.lang.invoke.MethodHandles; +import java.time.Instant; import lombok.Builder; import lombok.Getter; @@ -53,6 +54,10 @@ public class EiJob { @Getter private final String jobStatusUrl; + @Getter + @Builder.Default + private String lastUpdated = Instant.now().toString(); + @Getter @Builder.Default private boolean isLastStatusReportedEnabled = true; diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducer.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducer.java index d5423ae1..d8b20158 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducer.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducer.java @@ -75,7 +75,10 @@ public class EiProducer { this.enabledJobs.remove(job.getId()); } - synchronized boolean isJobEnabled(EiJob job) { + /** + * Is the job enabled for this producer? + */ + public synchronized boolean isJobEnabled(EiJob job) { return this.enabledJobs.contains(job.getId()); } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java index a0d30e8b..f0fc49ff 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java @@ -85,7 +85,7 @@ public class EiProducers { Collection previousTypes = previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>(); - producerCallbacks.restartEiJobs(producer, this.eiJobs) // + producerCallbacks.startEiJobs(producer, this.eiJobs) // .collectList() // .flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) // .collectList() // diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java index 17c77b36..c2e4b975 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java @@ -20,9 +20,13 @@ package org.oransc.enrichment.tasks; -import org.oransc.enrichment.clients.AsyncRestClient; -import org.oransc.enrichment.clients.AsyncRestClientFactory; +import java.time.Duration; + import org.oransc.enrichment.configuration.ApplicationConfig; +import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks; +import org.oransc.enrichment.controllers.producer.ProducerCallbacks; +import org.oransc.enrichment.repository.EiJob; +import org.oransc.enrichment.repository.EiJobs; import org.oransc.enrichment.repository.EiProducer; import org.oransc.enrichment.repository.EiProducers; import org.slf4j.Logger; @@ -34,6 +38,7 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** * Regularly checks the availability of the EI Producers @@ -45,13 +50,17 @@ public class ProducerSupervision { private static final Logger logger = LoggerFactory.getLogger(ProducerSupervision.class); private final EiProducers eiProducers; - private final AsyncRestClient restClient; + private final EiJobs eiJobs; + private final ProducerCallbacks producerCallbacks; + private final ConsumerCallbacks consumerCallbacks; @Autowired - public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers) { - AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); - this.restClient = restClientFactory.createRestClientNoHttpProxy(""); + public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs, + ProducerCallbacks producerCallbacks, ConsumerCallbacks consumerCallbacks) { this.eiProducers = eiProducers; + this.eiJobs = eiJobs; + this.producerCallbacks = producerCallbacks; + this.consumerCallbacks = consumerCallbacks; } @Scheduled(fixedRate = 1000 * 60 * 5) @@ -66,13 +75,33 @@ public class ProducerSupervision { } private Mono checkOneProducer(EiProducer producer) { - return restClient.get(producer.getProducerSupervisionCallbackUrl()) // + return this.producerCallbacks.healthCheck(producer) // .onErrorResume(throwable -> { handleNonRespondingProducer(throwable, producer); return Mono.empty(); })// .doOnNext(response -> handleRespondingProducer(response, producer)) - .flatMap(response -> Mono.just(producer)); + .flatMap(response -> checkProducerJobs(producer)) // + .flatMap(responses -> Mono.just(producer)); + } + + private Mono checkProducerJobs(EiProducer producer) { + return getEiJobs(producer) // + .filter(eiJob -> !producer.isJobEnabled(eiJob)) // + .flatMap(eiJob -> startEiJob(producer, eiJob), 1) // + .collectList() // + .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) // + .collectList(); + } + + private Mono startEiJob(EiProducer producer, EiJob eiJob) { + Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1)); + return producerCallbacks.startEiJob(producer, eiJob, retrySpec); + } + + private Flux getEiJobs(EiProducer producer) { + return Flux.fromIterable(producer.getEiTypes()) // + .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType))); } private void handleNonRespondingProducer(Throwable throwable, EiProducer producer) { diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java index d0828357..3a52fbc2 100644 --- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java +++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java @@ -64,6 +64,7 @@ import org.oransc.enrichment.controllers.producer.ProducerStatusInfo; import org.oransc.enrichment.exceptions.ServiceException; import org.oransc.enrichment.repository.EiJob; import org.oransc.enrichment.repository.EiJobs; +import org.oransc.enrichment.repository.EiProducer; import org.oransc.enrichment.repository.EiProducers; import org.oransc.enrichment.repository.EiType; import org.oransc.enrichment.repository.EiTypes; @@ -561,6 +562,8 @@ class ApplicationTest { @Test void testProducerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException { + + ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults(); putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID); { @@ -569,14 +572,12 @@ class ApplicationTest { putEiJob(EI_TYPE_ID, EI_JOB_ID); verifyJobStatus(EI_JOB_ID, "ENABLED"); deleteEiProducer(EI_PRODUCER_ID); + // A Job disabled status notification shall now be received + await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1)); + assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED); verifyJobStatus(EI_JOB_ID, "DISABLED"); } - // Job disabled status notification shall be received - ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults(); - await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1)); - assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED); - assertThat(this.eiProducers.size()).isEqualTo(1); assertThat(this.eiTypes.size()).isEqualTo(1); assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.ENABLED); @@ -588,11 +589,41 @@ class ApplicationTest { assertThat(this.eiProducers.size()).isEqualTo(1); assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED); - // After 3 failed checks, the producer and the type shall be deregisterred + // After 3 failed checks, the producer shall be deregisterred this.producerSupervision.createTask().blockLast(); assertThat(this.eiProducers.size()).isEqualTo(0); // The producer is removed assertThat(this.eiTypes.size()).isEqualTo(1); // The type remains + // Now we have one disabled job, and no producer. + // PUT a producer, then a Job ENABLED status notification shall be received + putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID); + await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(2)); + assertThat(consumerResults.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED); + verifyJobStatus(EI_JOB_ID, "ENABLED"); + } + + @Test + void testProducerSupervision2() throws JsonMappingException, JsonProcessingException, ServiceException { + // Test that supervision enables not enabled jobs and sends a notification when + // suceeded + + putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID); + putEiJob(EI_TYPE_ID, EI_JOB_ID); + + EiProducer producer = this.eiProducers.getProducer(EI_PRODUCER_ID); + EiJob job = this.eiJobs.getJob(EI_JOB_ID); + // Pretend that the producer did reject the job and the a DISABLED notification + // is sent for the job + producer.setJobDisabled(job); + job.setLastReportedStatus(false); + verifyJobStatus(EI_JOB_ID, "DISABLED"); + + // Run the supervision and wait for the job to get started in the producer + this.producerSupervision.createTask().blockLast(); + ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults(); + await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1)); + assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED); + verifyJobStatus(EI_JOB_ID, "ENABLED"); } @Test @@ -614,10 +645,15 @@ class ApplicationTest { assertThat(this.eiJobs.size()).isEqualTo(2); { + EiJob savedJob = this.eiJobs.getJob("jobId1"); // Restore the jobs EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks); jobs.restoreJobsFromDatabase(); assertThat(jobs.size()).isEqualTo(2); + EiJob restoredJob = jobs.getJob("jobId1"); + assertThat(restoredJob.getId()).isEqualTo("jobId1"); + assertThat(restoredJob.getLastUpdated()).isEqualTo(savedJob.getLastUpdated()); + jobs.remove("jobId1", this.eiProducers); jobs.remove("jobId2", this.eiProducers); } -- 2.16.6