For enabled producers, jobs that has not been successfully started are started.
Added a timkestamp on EiJobs in the producer API
Change-Id: I39973499aefcba84f371c9d53008babf4ff38460
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-378
"description": "Idenitity of the EI job",
"type": "string"
},
+ "last_updated": {
+ "description": "The time when the job was last updated or created (ISO-8601)",
+ "type": "string"
+ },
"ei_job_data": {
"description": "Json for the job data",
"type": "object"
ei_job_identity:
type: string
description: Idenitity of the EI job
+ last_updated:
+ type: string
+ description: The time when the job was last updated or created (ISO-8601)
ei_job_data:
type: object
properties: {}
this.restClient = restClientFactory.createRestClientNoHttpProxy("");
}
+ public Mono<String> healthCheck(EiProducer producer) {
+ return restClient.get(producer.getProducerSupervisionCallbackUrl());
+ }
+
public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
+ producer.setJobDisabled(eiJob);
restClient.delete(url) //
.subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
}
/**
- * Calls all producers for an EiJob activation.
+ * Start a job in all producers that suports the job type
*
* @param eiJob an EI job
* @return the number of producers that returned OK
public Mono<Integer> startEiJob(EiJob eiJob, EiProducers eiProducers) {
Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
- .flatMap(eiProducer -> postStartEiJob(eiProducer, eiJob, retrySpec)) //
+ .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) //
.collectList() //
.flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
}
/**
- * Restart all jobs for one producer
+ * Start all jobs for one producer
*
* @param producer
* @param eiJobs
*/
- public Flux<String> restartEiJobs(EiProducer producer, EiJobs eiJobs) {
+ public Flux<String> startEiJobs(EiProducer producer, EiJobs eiJobs) {
final int maxNoOfParalellRequests = 10;
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
return Flux.fromIterable(producer.getEiTypes()) //
.flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
- .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) //
- .onErrorResume(t -> {
- logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
- return Flux.empty();
- }); //
-
+ .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests);
}
- private Mono<String> postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
+ public Mono<String> startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
ProducerJobInfo request = new ProducerJobInfo(eiJob);
String body = gson.toJson(request);
return restClient.post(producer.getJobCallbackUrl(), body) //
.retryWhen(retrySpec) //
.doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
- .doOnNext(resp -> producer.setJobDisabled(eiJob)) //
.onErrorResume(throwable -> {
+ producer.setJobDisabled(eiJob);
logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
return Mono.empty();
}) //
@JsonProperty("owner")
public String owner;
- public ProducerJobInfo(Object jobData, String id, String typeId, String targetUri, String owner) {
+ @ApiModelProperty(value = "The time when the job was last updated or created (ISO-8601)")
+ @SerializedName("last_updated")
+ @JsonProperty("last_updated")
+ public String lastUpdated;
+
+ public ProducerJobInfo(Object jobData, String id, String typeId, String targetUri, String owner,
+ String lastUpdated) {
this.id = id;
this.jobData = jobData;
this.typeId = typeId;
this.targetUri = targetUri;
this.owner = owner;
+ this.lastUpdated = lastUpdated;
}
public ProducerJobInfo(EiJob job) {
- this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl(), job.getOwner());
+ this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl(), job.getOwner(), job.getLastUpdated());
}
public ProducerJobInfo() {
package org.oransc.enrichment.repository;
import java.lang.invoke.MethodHandles;
+import java.time.Instant;
import lombok.Builder;
import lombok.Getter;
@Getter
private final String jobStatusUrl;
+ @Getter
+ @Builder.Default
+ private String lastUpdated = Instant.now().toString();
+
@Getter
@Builder.Default
private boolean isLastStatusReportedEnabled = true;
this.enabledJobs.remove(job.getId());
}
- synchronized boolean isJobEnabled(EiJob job) {
+ /**
+ * Is the job enabled for this producer?
+ */
+ public synchronized boolean isJobEnabled(EiJob job) {
return this.enabledJobs.contains(job.getId());
}
Collection<EiType> previousTypes =
previousDefinition != null ? previousDefinition.getEiTypes() : new ArrayList<>();
- producerCallbacks.restartEiJobs(producer, this.eiJobs) //
+ producerCallbacks.startEiJobs(producer, this.eiJobs) //
.collectList() //
.flatMapMany(list -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
.collectList() //
package org.oransc.enrichment.tasks;
-import org.oransc.enrichment.clients.AsyncRestClient;
-import org.oransc.enrichment.clients.AsyncRestClientFactory;
+import java.time.Duration;
+
import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
+import org.oransc.enrichment.repository.EiJob;
+import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiProducers;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
/**
* Regularly checks the availability of the EI Producers
private static final Logger logger = LoggerFactory.getLogger(ProducerSupervision.class);
private final EiProducers eiProducers;
- private final AsyncRestClient restClient;
+ private final EiJobs eiJobs;
+ private final ProducerCallbacks producerCallbacks;
+ private final ConsumerCallbacks consumerCallbacks;
@Autowired
- public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers) {
- AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
- this.restClient = restClientFactory.createRestClientNoHttpProxy("");
+ public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
+ ProducerCallbacks producerCallbacks, ConsumerCallbacks consumerCallbacks) {
this.eiProducers = eiProducers;
+ this.eiJobs = eiJobs;
+ this.producerCallbacks = producerCallbacks;
+ this.consumerCallbacks = consumerCallbacks;
}
@Scheduled(fixedRate = 1000 * 60 * 5)
}
private Mono<EiProducer> checkOneProducer(EiProducer producer) {
- return restClient.get(producer.getProducerSupervisionCallbackUrl()) //
+ return this.producerCallbacks.healthCheck(producer) //
.onErrorResume(throwable -> {
handleNonRespondingProducer(throwable, producer);
return Mono.empty();
})//
.doOnNext(response -> handleRespondingProducer(response, producer))
- .flatMap(response -> Mono.just(producer));
+ .flatMap(response -> checkProducerJobs(producer)) //
+ .flatMap(responses -> Mono.just(producer));
+ }
+
+ private Mono<?> checkProducerJobs(EiProducer producer) {
+ return getEiJobs(producer) //
+ .filter(eiJob -> !producer.isJobEnabled(eiJob)) //
+ .flatMap(eiJob -> startEiJob(producer, eiJob), 1) //
+ .collectList() //
+ .flatMapMany(eiJob -> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .collectList();
+ }
+
+ private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
+ Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
+ return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
+ }
+
+ private Flux<EiJob> getEiJobs(EiProducer producer) {
+ return Flux.fromIterable(producer.getEiTypes()) //
+ .flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));
}
private void handleNonRespondingProducer(Throwable throwable, EiProducer producer) {
import org.oransc.enrichment.exceptions.ServiceException;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
+import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiProducers;
import org.oransc.enrichment.repository.EiType;
import org.oransc.enrichment.repository.EiTypes;
@Test
void testProducerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException {
+
+ ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
{
putEiJob(EI_TYPE_ID, EI_JOB_ID);
verifyJobStatus(EI_JOB_ID, "ENABLED");
deleteEiProducer(EI_PRODUCER_ID);
+ // A Job disabled status notification shall now be received
+ await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
+ assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
verifyJobStatus(EI_JOB_ID, "DISABLED");
}
- // Job disabled status notification shall be received
- ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
- await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
- assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
-
assertThat(this.eiProducers.size()).isEqualTo(1);
assertThat(this.eiTypes.size()).isEqualTo(1);
assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.ENABLED);
assertThat(this.eiProducers.size()).isEqualTo(1);
assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
- // After 3 failed checks, the producer and the type shall be deregisterred
+ // After 3 failed checks, the producer shall be deregisterred
this.producerSupervision.createTask().blockLast();
assertThat(this.eiProducers.size()).isEqualTo(0); // The producer is removed
assertThat(this.eiTypes.size()).isEqualTo(1); // The type remains
+ // Now we have one disabled job, and no producer.
+ // PUT a producer, then a Job ENABLED status notification shall be received
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(2));
+ assertThat(consumerResults.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
+ }
+
+ @Test
+ void testProducerSupervision2() throws JsonMappingException, JsonProcessingException, ServiceException {
+ // Test that supervision enables not enabled jobs and sends a notification when
+ // suceeded
+
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, EI_JOB_ID);
+
+ EiProducer producer = this.eiProducers.getProducer(EI_PRODUCER_ID);
+ EiJob job = this.eiJobs.getJob(EI_JOB_ID);
+ // Pretend that the producer did reject the job and the a DISABLED notification
+ // is sent for the job
+ producer.setJobDisabled(job);
+ job.setLastReportedStatus(false);
+ verifyJobStatus(EI_JOB_ID, "DISABLED");
+
+ // Run the supervision and wait for the job to get started in the producer
+ this.producerSupervision.createTask().blockLast();
+ ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
+ assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
}
@Test
assertThat(this.eiJobs.size()).isEqualTo(2);
{
+ EiJob savedJob = this.eiJobs.getJob("jobId1");
// Restore the jobs
EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks);
jobs.restoreJobsFromDatabase();
assertThat(jobs.size()).isEqualTo(2);
+ EiJob restoredJob = jobs.getJob("jobId1");
+ assertThat(restoredJob.getId()).isEqualTo("jobId1");
+ assertThat(restoredJob.getLastUpdated()).isEqualTo(savedJob.getLastUpdated());
+
jobs.remove("jobId1", this.eiProducers);
jobs.remove("jobId2", this.eiProducers);
}