Some changes in status notifications 61/5061/5
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 13 Nov 2020 16:00:57 +0000 (17:00 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 19 Nov 2020 07:26:05 +0000 (08:26 +0100)
Change-Id: Iacf20164d22e39d605625d884dc0ed8b18b6ea00
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-173

enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java
enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java

index cded953..9087355 100644 (file)
@@ -63,25 +63,39 @@ public class ConsumerCallbacks {
     public void notifyConsumersProducerDeleted(EiProducer eiProducer) {
         for (EiType type : eiProducer.getEiTypes()) {
             if (this.eiTypes.get(type.getId()) == null) {
+                // The type is removed
                 for (EiJob job : this.eiJobs.getJobsForType(type)) {
-                    noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED));
+                    if (job.isLastStatusReportedEnabled()) {
+                        noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED));
+                        job.setLastReportedStatus(false);
+                    }
                 }
             }
         }
     }
 
+    public void notifyConsumersProducerAdded(EiProducer eiProducer) {
+        for (EiType type : eiProducer.getEiTypes()) {
+            notifyConsumersTypeAdded(type);
+        }
+    }
+
     public void notifyConsumersTypeAdded(EiType eiType) {
         for (EiJob job : this.eiJobs.getJobsForType(eiType)) {
-            noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED));
+            if (!job.isLastStatusReportedEnabled()) {
+                noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED));
+                job.setLastReportedStatus(true);
+            }
         }
     }
 
     private void noifyJobOwner(EiJob job, ConsumerEiJobStatus status) {
-        if (!job.jobStatusUrl().isEmpty()) {
+        if (!job.getJobStatusUrl().isEmpty()) {
             String body = gson.toJson(status);
-            this.restClient.post(job.jobStatusUrl(), body) //
-                .subscribe(notUsed -> logger.debug("Consumer notified OK {}", job.id()), //
-                    throwable -> logger.warn("Consumer notify failed {} {}", job.jobStatusUrl(), throwable.toString()), //
+            this.restClient.post(job.getJobStatusUrl(), body) //
+                .subscribe(notUsed -> logger.debug("Consumer notified OK {}", job.getId()), //
+                    throwable -> logger.warn("Consumer notify failed {} {}", job.getJobStatusUrl(),
+                        throwable.toString()), //
                     null);
         }
     }
index b194dc1..4316915 100644 (file)
@@ -48,7 +48,6 @@ import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
 import org.oransc.enrichment.repository.EiType;
 import org.oransc.enrichment.repository.EiTypes;
-import org.oransc.enrichment.repository.ImmutableEiJob;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -151,14 +150,14 @@ public class ConsumerController {
             List<String> result = new ArrayList<>();
             if (owner != null) {
                 for (EiJob job : this.eiJobs.getJobsForOwner(owner)) {
-                    if (eiTypeId == null || job.typeId().equals(eiTypeId)) {
-                        result.add(job.id());
+                    if (eiTypeId == null || job.getTypeId().equals(eiTypeId)) {
+                        result.add(job.getId());
                     }
                 }
             } else if (eiTypeId != null) {
-                this.eiJobs.getJobsForType(eiTypeId).forEach(job -> result.add(job.id()));
+                this.eiJobs.getJobsForType(eiTypeId).forEach(job -> result.add(job.getId()));
             } else {
-                this.eiJobs.getJobs().forEach(job -> result.add(job.id()));
+                this.eiJobs.getJobs().forEach(job -> result.add(job.getId()));
             }
             return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK);
         } catch (
@@ -208,19 +207,18 @@ public class ConsumerController {
 
     private Collection<EiProducer> getProducers(EiJob eiJob) {
         try {
-            return this.eiTypes.getType(eiJob.typeId()).getProducers();
+            return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
         } catch (Exception e) {
             return new Vector<>();
         }
     }
 
     private ConsumerEiJobStatus toEiJobStatus(EiJob job) {
-        for (EiProducer producer : getProducers(job)) {
-            if (producer.isAvailable()) {
-                return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
-            }
+        if (getProducers(job).isEmpty()) {
+            return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+        } else {
+            return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
         }
-        return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
     }
 
     @DeleteMapping(path = "/eijobs/{eiJobId}", produces = MediaType.APPLICATION_JSON_VALUE)
@@ -288,7 +286,7 @@ public class ConsumerController {
             validateJsonObjectAgainstSchema(eiType.getJobDataSchema(), eiJobInfo.jobData);
             EiJob existingEiJob = this.eiJobs.get(eiJobId);
 
-            if (existingEiJob != null && !existingEiJob.typeId().equals(eiJobInfo.eiTypeId)) {
+            if (existingEiJob != null && !existingEiJob.getTypeId().equals(eiJobInfo.eiTypeId)) {
                 throw new ServiceException("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
             }
             return Mono.just(toEiJob(eiJobInfo, eiJobId, eiType));
@@ -316,14 +314,12 @@ public class ConsumerController {
     }
 
     private EiJob toEiJob(ConsumerEiJobInfo info, String id, EiType type) {
-        return ImmutableEiJob.builder() //
-            .id(id) //
-            .typeId(type.getId()) //
-            .owner(info.owner) //
-            .jobData(info.jobData) //
-            .targetUrl(info.targetUri) //
-            .jobStatusUrl(info.statusNotificationUri == null ? "" : info.statusNotificationUri) //
-            .build();
+        return new EiJob(id, //
+            type.getId(), //
+            info.owner, //
+            info.jobData, //
+            info.targetUri, //
+            info.statusNotificationUri == null ? "" : info.statusNotificationUri);
     }
 
     private ConsumerEiTypeInfo toEiTypeInfo() {
@@ -331,6 +327,7 @@ public class ConsumerController {
     }
 
     private ConsumerEiJobInfo toEiJobInfo(EiJob s) {
-        return new ConsumerEiJobInfo(s.typeId(), s.jobData(), s.owner(), s.targetUrl(), s.jobStatusUrl());
+        return new ConsumerEiJobInfo(s.getTypeId(), s.getJobData(), s.getOwner(), s.getTargetUrl(),
+            s.getJobStatusUrl());
     }
 }
index 5a47b58..dc732e1 100644 (file)
@@ -24,6 +24,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Vector;
 
@@ -31,6 +32,7 @@ import org.oransc.enrichment.clients.AsyncRestClient;
 import org.oransc.enrichment.clients.AsyncRestClientFactory;
 import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.repository.EiJob;
+import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
 import org.oransc.enrichment.repository.EiTypes;
 import org.slf4j.Logger;
@@ -40,6 +42,7 @@ import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 /**
  * Callbacks to the EiProducer
@@ -63,7 +66,7 @@ public class ProducerCallbacks {
 
     public void notifyProducersJobDeleted(EiJob eiJob) {
         for (EiProducer producer : getProducers(eiJob)) {
-            String url = producer.getJobCallbackUrl() + "/" + eiJob.id();
+            String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
             restClient.delete(url) //
                 .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
                     throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
@@ -79,25 +82,40 @@ public class ProducerCallbacks {
      * @return the number of producers that returned OK
      */
     public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+        Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
         return Flux.fromIterable(getProducers(eiJob)) //
-            .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) //
+            .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) //
             .collectList() //
             .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
     }
 
     /**
-     * Calls one producer for an EiJob activation.
+     * Restart all jobs for one producer
      * 
-     * @param producer a producer
-     * @param eiJob an EI job
-     * @return the body of the response from the REST call
+     * @param producer
+     * @param eiJobs
      */
-    public Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob) {
+    public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+        final int maxNoOfParalellRequests = 10;
+        Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
+
+        Flux.fromIterable(producer.getEiTypes()) //
+            .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
+            .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
+            .onErrorResume(t -> {
+                logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
+                return Flux.empty();
+            }) //
+            .subscribe();
+    }
+
+    private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
         String body = gson.toJson(request);
 
-        return restClient.post(producer.getJobCallbackUrl(), body)
-            .doOnNext(resp -> logger.debug("Job subscription started OK {}", producer.getId()))
+        return restClient.post(producer.getJobCallbackUrl(), body) //
+            .retryWhen(retrySpec) //
+            .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
             .onErrorResume(throwable -> {
                 logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
                 return Mono.empty();
@@ -106,7 +124,7 @@ public class ProducerCallbacks {
 
     private Collection<EiProducer> getProducers(EiJob eiJob) {
         try {
-            return this.eiTypes.getType(eiJob.typeId()).getProducers();
+            return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
         } catch (Exception e) {
             return new Vector<>();
         }
index c670ea4..e517b3a 100644 (file)
@@ -216,7 +216,6 @@ public class ProducerController {
         ProducerStatusInfo.OperationalState opState =
             producer.isAvailable() ? ProducerStatusInfo.OperationalState.ENABLED
                 : ProducerStatusInfo.OperationalState.DISABLED;
-        this.logger.debug("opState {}", opState);
         return new ProducerStatusInfo(opState);
     }
 
@@ -240,10 +239,12 @@ public class ProducerController {
                 }
             }
 
-            registerProducer(eiProducerId, registrationInfo);
+            EiProducer producer = registerProducer(eiProducerId, registrationInfo);
             if (previousDefinition != null) {
                 purgeTypes(previousDefinition.getEiTypes());
+                this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
             }
+            this.consumerCallbacks.notifyConsumersProducerAdded(producer);
 
             return new ResponseEntity<>(previousDefinition == null ? HttpStatus.CREATED : HttpStatus.OK);
         } catch (Exception e) {
@@ -295,20 +296,17 @@ public class ProducerController {
     }
 
     private EiProducer registerProducer(String producerId, ProducerRegistrationInfo registrationInfo) {
-        ArrayList<EiType> types = new ArrayList<>();
+        ArrayList<EiType> typesForProducer = new ArrayList<>();
+        EiProducer producer = createProducer(typesForProducer, producerId, registrationInfo);
         for (ProducerEiTypeRegistrationInfo typeInfo : registrationInfo.types) {
-            types.add(registerType(typeInfo));
+            EiType type = registerType(typeInfo);
+            typesForProducer.add(type);
+            type.addProducer(producer); //
         }
-        EiProducer producer = createProducer(types, producerId, registrationInfo);
         this.eiProducers.put(producer);
 
-        for (EiType type : types) {
-            for (EiJob job : this.eiJobs.getJobsForType(type)) {
-                this.producerCallbacks.notifyProducerJobStarted(producer, job) //
-                    .subscribe();
-            }
-            type.addProducer(producer);
-        }
+        producerCallbacks.restartJobs(producer, this.eiJobs);
+
         return producer;
     }
 
index a2f5b89..1380cca 100644 (file)
@@ -63,7 +63,7 @@ public class ProducerJobInfo {
     }
 
     public ProducerJobInfo(EiJob job) {
-        this(job.jobData(), job.id(), job.typeId(), job.targetUrl());
+        this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl());
     }
 
     public ProducerJobInfo() {
index 95bbc03..47ed53c 100644 (file)
 
 package org.oransc.enrichment.repository;
 
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
+import java.lang.invoke.MethodHandles;
+
+import lombok.Getter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Represents the dynamic information about a EI job
  */
-@Value.Immutable
-@Gson.TypeAdapters
-public interface EiJob {
 
-    String id();
+public class EiJob {
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    @Getter
+    private final String id;
+
+    @Getter
+    private final String typeId;
+
+    @Getter
+    private final String owner;
+
+    @Getter
+    private final Object jobData;
+
+    @Getter
+    private final String targetUrl;
 
-    String typeId();
+    @Getter
+    private final String jobStatusUrl;
 
-    String owner();
+    @Getter
+    private boolean isLastStatusReportedEnabled = true;
 
-    Object jobData();
+    public EiJob(String id, String typeId, String owner, Object jobData, String targetUrl, String jobStatusUrl) {
+        this.id = id;
+        this.typeId = typeId;
+        this.owner = owner;
+        this.jobData = jobData;
+        this.targetUrl = targetUrl;
+        this.jobStatusUrl = jobStatusUrl;
+    }
 
-    String targetUrl();
+    public void setLastReportedStatus(boolean isEnabled) {
+        this.isLastStatusReportedEnabled = isEnabled;
+        logger.debug("Job status id: {}, enabled: {}", this.isLastStatusReportedEnabled, isEnabled);
+    }
 
-    String jobStatusUrl();
 }
index 1532c53..bff5be2 100644 (file)
@@ -65,12 +65,15 @@ public class EiJobs {
     }
 
     public synchronized void restoreJobsFromDatabase() throws IOException {
+        Files.createDirectories(Paths.get(getDatabaseDirectory()));
         File dbDir = new File(getDatabaseDirectory());
+
         for (File file : dbDir.listFiles()) {
             String json = Files.readString(file.toPath());
             EiJob job = gson.fromJson(json, EiJob.class);
             this.put(job, false);
         }
+
     }
 
     public synchronized void put(EiJob job) {
@@ -114,9 +117,9 @@ public class EiJobs {
     }
 
     public synchronized void remove(EiJob job) {
-        this.allEiJobs.remove(job.id());
-        jobsByType.remove(job.typeId(), job.id());
-        jobsByOwner.remove(job.owner(), job.id());
+        this.allEiJobs.remove(job.getId());
+        jobsByType.remove(job.getTypeId(), job.getId());
+        jobsByOwner.remove(job.getOwner(), job.getId());
 
         try {
             Files.delete(getPath(job));
@@ -136,15 +139,16 @@ public class EiJobs {
         jobsByOwner.clear();
         try {
             FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
+            Files.createDirectories(Paths.get(getDatabaseDirectory()));
         } catch (IOException e) {
             logger.warn("Could not delete database : {}", e.getMessage());
         }
     }
 
     private void put(EiJob job, boolean storePersistently) {
-        allEiJobs.put(job.id(), job);
-        jobsByType.put(job.typeId(), job.id(), job);
-        jobsByOwner.put(job.owner(), job.id(), job);
+        allEiJobs.put(job.getId(), job);
+        jobsByType.put(job.getTypeId(), job.getId(), job);
+        jobsByOwner.put(job.getOwner(), job.getId(), job);
         if (storePersistently) {
             storeJobInFile(job);
         }
@@ -152,12 +156,11 @@ public class EiJobs {
 
     private void storeJobInFile(EiJob job) {
         try {
-            Files.createDirectories(Paths.get(getDatabaseDirectory()));
             try (PrintStream out = new PrintStream(new FileOutputStream(getFile(job)))) {
                 out.print(gson.toJson(job));
             }
         } catch (Exception e) {
-            logger.warn("Could not save job: {} {}", job.id(), e.getMessage());
+            logger.warn("Could not save job: {} {}", job.getId(), e.getMessage());
         }
     }
 
@@ -166,7 +169,7 @@ public class EiJobs {
     }
 
     private Path getPath(EiJob job) {
-        return Path.of(getDatabaseDirectory(), job.id());
+        return Path.of(getDatabaseDirectory(), job.getId());
     }
 
     private String getDatabaseDirectory() {
index 9ef2590..68271f8 100644 (file)
@@ -33,6 +33,7 @@ import com.google.gson.JsonParser;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 
@@ -64,6 +65,8 @@ import org.oransc.enrichment.repository.EiProducers;
 import org.oransc.enrichment.repository.EiType;
 import org.oransc.enrichment.repository.EiTypes;
 import org.oransc.enrichment.tasks.ProducerSupervision;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -91,6 +94,8 @@ import reactor.test.StepVerifier;
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.vardata-directory=./target"})
 class ApplicationTest {
+    private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
     private final String EI_TYPE_ID = "typeId";
     private final String EI_PRODUCER_ID = "producerId";
     private final String EI_JOB_PROPERTY = "\"property1\"";
@@ -286,12 +291,14 @@ class ApplicationTest {
         ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
         assertThat(request.id).isEqualTo("jobId");
 
-        assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(1);
+        // One retry --> two calls
+        await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2));
+        assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2);
 
         resp = restClient().putForEntity(url, body).block();
         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
         EiJob job = this.eiJobs.getJob("jobId");
-        assertThat(job.owner()).isEqualTo("owner");
+        assertThat(job.getOwner()).isEqualTo("owner");
     }
 
     @Test
@@ -302,7 +309,9 @@ class ApplicationTest {
         testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Job not accepted by any producers");
 
         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
-        assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(1);
+        // There is one retry -> 2 calls
+        await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2));
+        assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2);
     }
 
     @Test
@@ -458,19 +467,46 @@ class ApplicationTest {
 
     @Test
     void testJobStatusNotifications() throws JsonMappingException, JsonProcessingException, ServiceException {
+        ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+        ProducerSimulatorController.TestResults producerCalls = this.producerSimulator.getTestResults();
+
         putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
         putEiJob(EI_TYPE_ID, "jobId");
+        putEiProducerWithOneType("eiProducerId2", EI_TYPE_ID);
+        await().untilAsserted(() -> assertThat(producerCalls.jobsStarted.size()).isEqualTo(2));
 
+        deleteEiProducer("eiProducerId2");
+        assertThat(this.eiTypes.size()).isEqualTo(1); // The type remains, one producer left
         deleteEiProducer("eiProducerId");
         assertThat(this.eiTypes.size()).isZero(); // The type is gone
         assertThat(this.eiJobs.size()).isEqualTo(1); // The job remains
-        ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
-        await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
-        assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+        await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
+        assertThat(consumerCalls.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
 
         putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
-        await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(2));
-        assertThat(consumerResults.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+        await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
+        assertThat(consumerCalls.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+    }
+
+    @Test
+    void testJobStatusNotifications2() throws JsonMappingException, JsonProcessingException, ServiceException {
+        // Test replacing a producer with new and removed types
+
+        // Create a job
+        putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+        putEiJob(EI_TYPE_ID, EI_JOB_ID);
+
+        // change the type for the producer, the EI_TYPE_ID is deleted
+        putEiProducerWithOneType(EI_PRODUCER_ID, "junk");
+        verifyJobStatus(EI_JOB_ID, "DISABLED");
+        ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+        await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
+        assertThat(consumerCalls.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+
+        putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+        verifyJobStatus(EI_JOB_ID, "ENABLED");
+        await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
+        assertThat(consumerCalls.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
     }
 
     @Test
@@ -496,7 +532,7 @@ class ApplicationTest {
         {
             // Create a job
             putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
-            putEiJob(EI_TYPE_ID, "jobId");
+            putEiJob(EI_TYPE_ID, EI_JOB_ID);
             deleteEiProducer(EI_PRODUCER_ID);
         }
 
@@ -506,13 +542,18 @@ class ApplicationTest {
 
         this.producerSupervision.createTask().blockLast();
         this.producerSupervision.createTask().blockLast();
+
+        // Now we have one producer that is disabled, but the job will be enabled until
+        // the producer/type is removed
         assertThat(this.eiProducers.size()).isEqualTo(1);
         assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
+        verifyJobStatus(EI_JOB_ID, "ENABLED");
 
         // After 3 failed checks, the producer and the type shall be deregisterred
         this.producerSupervision.createTask().blockLast();
         assertThat(this.eiProducers.size()).isEqualTo(0);
         assertThat(this.eiTypes.size()).isEqualTo(0);
+        verifyJobStatus(EI_JOB_ID, "DISABLED");
 
         // Job disabled status notification shall be received
         ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
@@ -552,8 +593,8 @@ class ApplicationTest {
             jobs.restoreJobsFromDatabase();
             assertThat(jobs.size()).isEqualTo(0);
         }
-
-        this.eiJobs.remove("jobId1"); // removing a job when the db file is gone
+        logger.warn("Test removing a job when the db file is gone");
+        this.eiJobs.remove("jobId1");
         assertThat(this.eiJobs.size()).isEqualTo(1);
     }