From: PatrikBuhr Date: Fri, 13 Nov 2020 16:00:57 +0000 (+0100) Subject: Some changes in status notifications X-Git-Tag: 2.1.0~12 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=3ac4de0524650cea3d17f9ad5ff7e9cf5dffbe83;p=nonrtric.git Some changes in status notifications Change-Id: Iacf20164d22e39d605625d884dc0ed8b18b6ea00 Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-173 --- diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java index cded9535..9087355e 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java @@ -63,25 +63,39 @@ public class ConsumerCallbacks { public void notifyConsumersProducerDeleted(EiProducer eiProducer) { for (EiType type : eiProducer.getEiTypes()) { if (this.eiTypes.get(type.getId()) == null) { + // The type is removed for (EiJob job : this.eiJobs.getJobsForType(type)) { - noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED)); + if (job.isLastStatusReportedEnabled()) { + noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED)); + job.setLastReportedStatus(false); + } } } } } + public void notifyConsumersProducerAdded(EiProducer eiProducer) { + for (EiType type : eiProducer.getEiTypes()) { + notifyConsumersTypeAdded(type); + } + } + public void notifyConsumersTypeAdded(EiType eiType) { for (EiJob job : this.eiJobs.getJobsForType(eiType)) { - noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED)); + if (!job.isLastStatusReportedEnabled()) { + noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED)); + job.setLastReportedStatus(true); + } } } private void noifyJobOwner(EiJob job, ConsumerEiJobStatus status) { - if (!job.jobStatusUrl().isEmpty()) { + if (!job.getJobStatusUrl().isEmpty()) { String body = gson.toJson(status); - this.restClient.post(job.jobStatusUrl(), body) // - .subscribe(notUsed -> logger.debug("Consumer notified OK {}", job.id()), // - throwable -> logger.warn("Consumer notify failed {} {}", job.jobStatusUrl(), throwable.toString()), // + this.restClient.post(job.getJobStatusUrl(), body) // + .subscribe(notUsed -> logger.debug("Consumer notified OK {}", job.getId()), // + throwable -> logger.warn("Consumer notify failed {} {}", job.getJobStatusUrl(), + throwable.toString()), // null); } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java index b194dc1f..43169151 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java @@ -48,7 +48,6 @@ import org.oransc.enrichment.repository.EiJobs; import org.oransc.enrichment.repository.EiProducer; import org.oransc.enrichment.repository.EiType; import org.oransc.enrichment.repository.EiTypes; -import org.oransc.enrichment.repository.ImmutableEiJob; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -151,14 +150,14 @@ public class ConsumerController { List result = new ArrayList<>(); if (owner != null) { for (EiJob job : this.eiJobs.getJobsForOwner(owner)) { - if (eiTypeId == null || job.typeId().equals(eiTypeId)) { - result.add(job.id()); + if (eiTypeId == null || job.getTypeId().equals(eiTypeId)) { + result.add(job.getId()); } } } else if (eiTypeId != null) { - this.eiJobs.getJobsForType(eiTypeId).forEach(job -> result.add(job.id())); + this.eiJobs.getJobsForType(eiTypeId).forEach(job -> result.add(job.getId())); } else { - this.eiJobs.getJobs().forEach(job -> result.add(job.id())); + this.eiJobs.getJobs().forEach(job -> result.add(job.getId())); } return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK); } catch ( @@ -208,19 +207,18 @@ public class ConsumerController { private Collection getProducers(EiJob eiJob) { try { - return this.eiTypes.getType(eiJob.typeId()).getProducers(); + return this.eiTypes.getType(eiJob.getTypeId()).getProducers(); } catch (Exception e) { return new Vector<>(); } } private ConsumerEiJobStatus toEiJobStatus(EiJob job) { - for (EiProducer producer : getProducers(job)) { - if (producer.isAvailable()) { - return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED); - } + if (getProducers(job).isEmpty()) { + return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED); + } else { + return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED); } - return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED); } @DeleteMapping(path = "/eijobs/{eiJobId}", produces = MediaType.APPLICATION_JSON_VALUE) @@ -288,7 +286,7 @@ public class ConsumerController { validateJsonObjectAgainstSchema(eiType.getJobDataSchema(), eiJobInfo.jobData); EiJob existingEiJob = this.eiJobs.get(eiJobId); - if (existingEiJob != null && !existingEiJob.typeId().equals(eiJobInfo.eiTypeId)) { + if (existingEiJob != null && !existingEiJob.getTypeId().equals(eiJobInfo.eiTypeId)) { throw new ServiceException("Not allowed to change type for existing EI job", HttpStatus.CONFLICT); } return Mono.just(toEiJob(eiJobInfo, eiJobId, eiType)); @@ -316,14 +314,12 @@ public class ConsumerController { } private EiJob toEiJob(ConsumerEiJobInfo info, String id, EiType type) { - return ImmutableEiJob.builder() // - .id(id) // - .typeId(type.getId()) // - .owner(info.owner) // - .jobData(info.jobData) // - .targetUrl(info.targetUri) // - .jobStatusUrl(info.statusNotificationUri == null ? "" : info.statusNotificationUri) // - .build(); + return new EiJob(id, // + type.getId(), // + info.owner, // + info.jobData, // + info.targetUri, // + info.statusNotificationUri == null ? "" : info.statusNotificationUri); } private ConsumerEiTypeInfo toEiTypeInfo() { @@ -331,6 +327,7 @@ public class ConsumerController { } private ConsumerEiJobInfo toEiJobInfo(EiJob s) { - return new ConsumerEiJobInfo(s.typeId(), s.jobData(), s.owner(), s.targetUrl(), s.jobStatusUrl()); + return new ConsumerEiJobInfo(s.getTypeId(), s.getJobData(), s.getOwner(), s.getTargetUrl(), + s.getJobStatusUrl()); } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java index 5a47b58f..dc732e12 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java @@ -24,6 +24,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import java.lang.invoke.MethodHandles; +import java.time.Duration; import java.util.Collection; import java.util.Vector; @@ -31,6 +32,7 @@ import org.oransc.enrichment.clients.AsyncRestClient; import org.oransc.enrichment.clients.AsyncRestClientFactory; import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.repository.EiJob; +import org.oransc.enrichment.repository.EiJobs; import org.oransc.enrichment.repository.EiProducer; import org.oransc.enrichment.repository.EiTypes; import org.slf4j.Logger; @@ -40,6 +42,7 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** * Callbacks to the EiProducer @@ -63,7 +66,7 @@ public class ProducerCallbacks { public void notifyProducersJobDeleted(EiJob eiJob) { for (EiProducer producer : getProducers(eiJob)) { - String url = producer.getJobCallbackUrl() + "/" + eiJob.id(); + String url = producer.getJobCallbackUrl() + "/" + eiJob.getId(); restClient.delete(url) // .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), // throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(), @@ -79,25 +82,40 @@ public class ProducerCallbacks { * @return the number of producers that returned OK */ public Mono notifyProducersJobStarted(EiJob eiJob) { + Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1)); return Flux.fromIterable(getProducers(eiJob)) // - .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) // + .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) // .collectList() // .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); // } /** - * Calls one producer for an EiJob activation. + * Restart all jobs for one producer * - * @param producer a producer - * @param eiJob an EI job - * @return the body of the response from the REST call + * @param producer + * @param eiJobs */ - public Mono notifyProducerJobStarted(EiProducer producer, EiJob eiJob) { + public void restartJobs(EiProducer producer, EiJobs eiJobs) { + final int maxNoOfParalellRequests = 10; + Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1)); + + Flux.fromIterable(producer.getEiTypes()) // + .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) // + .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) // + .onErrorResume(t -> { + logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage()); + return Flux.empty(); + }) // + .subscribe(); + } + + private Mono notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) { ProducerJobInfo request = new ProducerJobInfo(eiJob); String body = gson.toJson(request); - return restClient.post(producer.getJobCallbackUrl(), body) - .doOnNext(resp -> logger.debug("Job subscription started OK {}", producer.getId())) + return restClient.post(producer.getJobCallbackUrl(), body) // + .retryWhen(retrySpec) // + .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) // .onErrorResume(throwable -> { logger.warn("Job subscription failed {}", producer.getId(), throwable.toString()); return Mono.empty(); @@ -106,7 +124,7 @@ public class ProducerCallbacks { private Collection getProducers(EiJob eiJob) { try { - return this.eiTypes.getType(eiJob.typeId()).getProducers(); + return this.eiTypes.getType(eiJob.getTypeId()).getProducers(); } catch (Exception e) { return new Vector<>(); } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java index c670ea4a..e517b3a9 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java @@ -216,7 +216,6 @@ public class ProducerController { ProducerStatusInfo.OperationalState opState = producer.isAvailable() ? ProducerStatusInfo.OperationalState.ENABLED : ProducerStatusInfo.OperationalState.DISABLED; - this.logger.debug("opState {}", opState); return new ProducerStatusInfo(opState); } @@ -240,10 +239,12 @@ public class ProducerController { } } - registerProducer(eiProducerId, registrationInfo); + EiProducer producer = registerProducer(eiProducerId, registrationInfo); if (previousDefinition != null) { purgeTypes(previousDefinition.getEiTypes()); + this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition); } + this.consumerCallbacks.notifyConsumersProducerAdded(producer); return new ResponseEntity<>(previousDefinition == null ? HttpStatus.CREATED : HttpStatus.OK); } catch (Exception e) { @@ -295,20 +296,17 @@ public class ProducerController { } private EiProducer registerProducer(String producerId, ProducerRegistrationInfo registrationInfo) { - ArrayList types = new ArrayList<>(); + ArrayList typesForProducer = new ArrayList<>(); + EiProducer producer = createProducer(typesForProducer, producerId, registrationInfo); for (ProducerEiTypeRegistrationInfo typeInfo : registrationInfo.types) { - types.add(registerType(typeInfo)); + EiType type = registerType(typeInfo); + typesForProducer.add(type); + type.addProducer(producer); // } - EiProducer producer = createProducer(types, producerId, registrationInfo); this.eiProducers.put(producer); - for (EiType type : types) { - for (EiJob job : this.eiJobs.getJobsForType(type)) { - this.producerCallbacks.notifyProducerJobStarted(producer, job) // - .subscribe(); - } - type.addProducer(producer); - } + producerCallbacks.restartJobs(producer, this.eiJobs); + return producer; } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java index a2f5b89c..1380cca2 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java @@ -63,7 +63,7 @@ public class ProducerJobInfo { } public ProducerJobInfo(EiJob job) { - this(job.jobData(), job.id(), job.typeId(), job.targetUrl()); + this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl()); } public ProducerJobInfo() { diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java index 95bbc036..47ed53c1 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java @@ -20,25 +20,53 @@ package org.oransc.enrichment.repository; -import org.immutables.gson.Gson; -import org.immutables.value.Value; +import java.lang.invoke.MethodHandles; + +import lombok.Getter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents the dynamic information about a EI job */ -@Value.Immutable -@Gson.TypeAdapters -public interface EiJob { - String id(); +public class EiJob { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Getter + private final String id; + + @Getter + private final String typeId; + + @Getter + private final String owner; + + @Getter + private final Object jobData; + + @Getter + private final String targetUrl; - String typeId(); + @Getter + private final String jobStatusUrl; - String owner(); + @Getter + private boolean isLastStatusReportedEnabled = true; - Object jobData(); + public EiJob(String id, String typeId, String owner, Object jobData, String targetUrl, String jobStatusUrl) { + this.id = id; + this.typeId = typeId; + this.owner = owner; + this.jobData = jobData; + this.targetUrl = targetUrl; + this.jobStatusUrl = jobStatusUrl; + } - String targetUrl(); + public void setLastReportedStatus(boolean isEnabled) { + this.isLastStatusReportedEnabled = isEnabled; + logger.debug("Job status id: {}, enabled: {}", this.isLastStatusReportedEnabled, isEnabled); + } - String jobStatusUrl(); } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java index 1532c535..bff5be2c 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java @@ -65,12 +65,15 @@ public class EiJobs { } public synchronized void restoreJobsFromDatabase() throws IOException { + Files.createDirectories(Paths.get(getDatabaseDirectory())); File dbDir = new File(getDatabaseDirectory()); + for (File file : dbDir.listFiles()) { String json = Files.readString(file.toPath()); EiJob job = gson.fromJson(json, EiJob.class); this.put(job, false); } + } public synchronized void put(EiJob job) { @@ -114,9 +117,9 @@ public class EiJobs { } public synchronized void remove(EiJob job) { - this.allEiJobs.remove(job.id()); - jobsByType.remove(job.typeId(), job.id()); - jobsByOwner.remove(job.owner(), job.id()); + this.allEiJobs.remove(job.getId()); + jobsByType.remove(job.getTypeId(), job.getId()); + jobsByOwner.remove(job.getOwner(), job.getId()); try { Files.delete(getPath(job)); @@ -136,15 +139,16 @@ public class EiJobs { jobsByOwner.clear(); try { FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory())); + Files.createDirectories(Paths.get(getDatabaseDirectory())); } catch (IOException e) { logger.warn("Could not delete database : {}", e.getMessage()); } } private void put(EiJob job, boolean storePersistently) { - allEiJobs.put(job.id(), job); - jobsByType.put(job.typeId(), job.id(), job); - jobsByOwner.put(job.owner(), job.id(), job); + allEiJobs.put(job.getId(), job); + jobsByType.put(job.getTypeId(), job.getId(), job); + jobsByOwner.put(job.getOwner(), job.getId(), job); if (storePersistently) { storeJobInFile(job); } @@ -152,12 +156,11 @@ public class EiJobs { private void storeJobInFile(EiJob job) { try { - Files.createDirectories(Paths.get(getDatabaseDirectory())); try (PrintStream out = new PrintStream(new FileOutputStream(getFile(job)))) { out.print(gson.toJson(job)); } } catch (Exception e) { - logger.warn("Could not save job: {} {}", job.id(), e.getMessage()); + logger.warn("Could not save job: {} {}", job.getId(), e.getMessage()); } } @@ -166,7 +169,7 @@ public class EiJobs { } private Path getPath(EiJob job) { - return Path.of(getDatabaseDirectory(), job.id()); + return Path.of(getDatabaseDirectory(), job.getId()); } private String getDatabaseDirectory() { diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java index 9ef2590f..68271f8f 100644 --- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java +++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java @@ -33,6 +33,7 @@ import com.google.gson.JsonParser; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; @@ -64,6 +65,8 @@ import org.oransc.enrichment.repository.EiProducers; import org.oransc.enrichment.repository.EiType; import org.oransc.enrichment.repository.EiTypes; import org.oransc.enrichment.tasks.ProducerSupervision; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -91,6 +94,8 @@ import reactor.test.StepVerifier; "app.webclient.trust-store=./config/truststore.jks", // "app.vardata-directory=./target"}) class ApplicationTest { + private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final String EI_TYPE_ID = "typeId"; private final String EI_PRODUCER_ID = "producerId"; private final String EI_JOB_PROPERTY = "\"property1\""; @@ -286,12 +291,14 @@ class ApplicationTest { ProducerJobInfo request = simulatorResults.jobsStarted.get(0); assertThat(request.id).isEqualTo("jobId"); - assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(1); + // One retry --> two calls + await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2)); + assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2); resp = restClient().putForEntity(url, body).block(); assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); EiJob job = this.eiJobs.getJob("jobId"); - assertThat(job.owner()).isEqualTo("owner"); + assertThat(job.getOwner()).isEqualTo("owner"); } @Test @@ -302,7 +309,9 @@ class ApplicationTest { testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Job not accepted by any producers"); ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults(); - assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(1); + // There is one retry -> 2 calls + await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2)); + assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2); } @Test @@ -458,19 +467,46 @@ class ApplicationTest { @Test void testJobStatusNotifications() throws JsonMappingException, JsonProcessingException, ServiceException { + ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults(); + ProducerSimulatorController.TestResults producerCalls = this.producerSimulator.getTestResults(); + putEiProducerWithOneType("eiProducerId", EI_TYPE_ID); putEiJob(EI_TYPE_ID, "jobId"); + putEiProducerWithOneType("eiProducerId2", EI_TYPE_ID); + await().untilAsserted(() -> assertThat(producerCalls.jobsStarted.size()).isEqualTo(2)); + deleteEiProducer("eiProducerId2"); + assertThat(this.eiTypes.size()).isEqualTo(1); // The type remains, one producer left deleteEiProducer("eiProducerId"); assertThat(this.eiTypes.size()).isZero(); // The type is gone assertThat(this.eiJobs.size()).isEqualTo(1); // The job remains - ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults(); - await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1)); - assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED); + await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1)); + assertThat(consumerCalls.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED); putEiProducerWithOneType("eiProducerId", EI_TYPE_ID); - await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(2)); - assertThat(consumerResults.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED); + await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2)); + assertThat(consumerCalls.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED); + } + + @Test + void testJobStatusNotifications2() throws JsonMappingException, JsonProcessingException, ServiceException { + // Test replacing a producer with new and removed types + + // Create a job + putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID); + putEiJob(EI_TYPE_ID, EI_JOB_ID); + + // change the type for the producer, the EI_TYPE_ID is deleted + putEiProducerWithOneType(EI_PRODUCER_ID, "junk"); + verifyJobStatus(EI_JOB_ID, "DISABLED"); + ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults(); + await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1)); + assertThat(consumerCalls.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED); + + putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID); + verifyJobStatus(EI_JOB_ID, "ENABLED"); + await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2)); + assertThat(consumerCalls.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED); } @Test @@ -496,7 +532,7 @@ class ApplicationTest { { // Create a job putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID); - putEiJob(EI_TYPE_ID, "jobId"); + putEiJob(EI_TYPE_ID, EI_JOB_ID); deleteEiProducer(EI_PRODUCER_ID); } @@ -506,13 +542,18 @@ class ApplicationTest { this.producerSupervision.createTask().blockLast(); this.producerSupervision.createTask().blockLast(); + + // Now we have one producer that is disabled, but the job will be enabled until + // the producer/type is removed assertThat(this.eiProducers.size()).isEqualTo(1); assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED); + verifyJobStatus(EI_JOB_ID, "ENABLED"); // After 3 failed checks, the producer and the type shall be deregisterred this.producerSupervision.createTask().blockLast(); assertThat(this.eiProducers.size()).isEqualTo(0); assertThat(this.eiTypes.size()).isEqualTo(0); + verifyJobStatus(EI_JOB_ID, "DISABLED"); // Job disabled status notification shall be received ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults(); @@ -552,8 +593,8 @@ class ApplicationTest { jobs.restoreJobsFromDatabase(); assertThat(jobs.size()).isEqualTo(0); } - - this.eiJobs.remove("jobId1"); // removing a job when the db file is gone + logger.warn("Test removing a job when the db file is gone"); + this.eiJobs.remove("jobId1"); assertThat(this.eiJobs.size()).isEqualTo(1); }