NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / enrichment-coordinator-service / src / test / java / org / oransc / enrichment / ApplicationTest.java
index 5ea62f0..8c8ce5f 100644 (file)
@@ -57,6 +57,9 @@ import org.oransc.enrichment.controllers.a1e.A1eEiTypeInfo;
 import org.oransc.enrichment.controllers.r1consumer.ConsumerConsts;
 import org.oransc.enrichment.controllers.r1consumer.ConsumerInfoTypeInfo;
 import org.oransc.enrichment.controllers.r1consumer.ConsumerJobInfo;
+import org.oransc.enrichment.controllers.r1consumer.ConsumerJobStatus;
+import org.oransc.enrichment.controllers.r1consumer.ConsumerTypeRegistrationInfo;
+import org.oransc.enrichment.controllers.r1consumer.ConsumerTypeSubscriptionInfo;
 import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks;
 import org.oransc.enrichment.controllers.r1producer.ProducerConsts;
 import org.oransc.enrichment.controllers.r1producer.ProducerInfoTypeInfo;
@@ -69,6 +72,7 @@ import org.oransc.enrichment.repository.InfoJobs;
 import org.oransc.enrichment.repository.InfoProducer;
 import org.oransc.enrichment.repository.InfoProducers;
 import org.oransc.enrichment.repository.InfoType;
+import org.oransc.enrichment.repository.InfoTypeSubscriptions;
 import org.oransc.enrichment.repository.InfoTypes;
 import org.oransc.enrichment.tasks.ProducerSupervision;
 import org.slf4j.Logger;
@@ -134,6 +138,9 @@ class ApplicationTest {
     @Autowired
     ProducerCallbacks producerCallbacks;
 
+    @Autowired
+    InfoTypeSubscriptions infoTypeSubscriptions;
+
     private static Gson gson = new GsonBuilder().create();
 
     /**
@@ -155,6 +162,7 @@ class ApplicationTest {
         this.infoJobs.clear();
         this.infoTypes.clear();
         this.infoProducers.clear();
+        this.infoTypeSubscriptions.clear();
         this.producerSimulator.getTestResults().reset();
         this.consumerSimulator.getTestResults().reset();
     }
@@ -226,6 +234,8 @@ class ApplicationTest {
         ConsumerInfoTypeInfo info = gson.fromJson(rsp, ConsumerInfoTypeInfo.class);
         assertThat(info).isNotNull();
         assertThat(info.jobDataSchema).isNotNull();
+        assertThat(info.state).isEqualTo(ConsumerInfoTypeInfo.ConsumerTypeStatusValues.ENABLED);
+        assertThat(info.noOfProducers).isEqualTo(1);
     }
 
     @Test
@@ -243,7 +253,7 @@ class ApplicationTest {
     @Test
     void a1eGetEiJobsIds() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         final String JOB_ID_JSON = "[\"jobId\"]";
         String url = A1eConsts.API_ROOT + "/eijobs?infoTypeId=typeId";
         String rsp = restClient().get(url).block();
@@ -273,7 +283,7 @@ class ApplicationTest {
     @Test
     void consumerGetInformationJobsIds() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         final String JOB_ID_JSON = "[\"jobId\"]";
         String url = ConsumerConsts.API_ROOT + "/info-jobs?infoTypeId=typeId";
         String rsp = restClient().get(url).block();
@@ -303,7 +313,7 @@ class ApplicationTest {
     @Test
     void a1eGetEiJob() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
         String rsp = restClient().get(url).block();
         A1eEiJobInfo info = gson.fromJson(rsp, A1eEiJobInfo.class);
@@ -314,7 +324,7 @@ class ApplicationTest {
     @Test
     void consumerGetEiJob() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId";
         String rsp = restClient().get(url).block();
         ConsumerJobInfo info = gson.fromJson(rsp, ConsumerJobInfo.class);
@@ -339,25 +349,30 @@ class ApplicationTest {
     @Test
     void a1eGetEiJobStatus() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
 
         verifyJobStatus("jobId", "ENABLED");
     }
 
     @Test
-    void consumerGetEiJobStatus() throws Exception {
+    void consumerGetInfoJobStatus() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
 
         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId/status";
         String rsp = restClient().get(url).block();
-        assertThat(rsp).contains("ENABLED");
+        assertThat(rsp) //
+            .contains("ENABLED") //
+            .contains(PRODUCER_ID);
+
+        ConsumerJobStatus status = gson.fromJson(rsp, ConsumerJobStatus.class);
+        assertThat(status.producers).contains(PRODUCER_ID);
     }
 
     @Test
     void a1eDeleteEiJob() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         assertThat(this.infoJobs.size()).isEqualTo(1);
         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
         restClient().delete(url).block();
@@ -371,7 +386,7 @@ class ApplicationTest {
     @Test
     void consumerDeleteEiJob() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         assertThat(this.infoJobs.size()).isEqualTo(1);
         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId";
         restClient().delete(url).block();
@@ -380,6 +395,8 @@ class ApplicationTest {
         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
         await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(1));
         assertThat(simulatorResults.jobsStopped.get(0)).isEqualTo("jobId");
+
+        testErrorCode(restClient().delete(url), HttpStatus.NOT_FOUND, "Could not find Information job: jobId");
     }
 
     @Test
@@ -400,7 +417,7 @@ class ApplicationTest {
     void a1ePutEiJob() throws Exception {
         // Test that one producer accepting a job is enough
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
+        putInfoProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
 
         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
         String body = gson.toJson(infoJobInfo());
@@ -461,7 +478,6 @@ class ApplicationTest {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
 
         verifyJobStatus(EI_JOB_ID, "ENABLED");
-
     }
 
     @Test
@@ -474,7 +490,10 @@ class ApplicationTest {
             "targetUri", "jobStatusUrl");
         String body = gson.toJson(jobInfo);
 
-        testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Json validation failure");
+        testErrorCode(restClient().put(url, body), HttpStatus.BAD_REQUEST, "Json validation failure");
+
+        testErrorCode(restClient().put(url, "{jojo}"), HttpStatus.BAD_REQUEST, "", false);
+
     }
 
     @Test
@@ -487,7 +506,7 @@ class ApplicationTest {
             new ConsumerJobInfo("typeId", jsonObject("{ \"XXstring\" : \"value\" }"), "owner", "targetUri", null);
         String body = gson.toJson(jobInfo);
 
-        testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Json validation failure");
+        testErrorCode(restClient().put(url, body), HttpStatus.BAD_REQUEST, "Json validation failure");
     }
 
     @Test
@@ -499,14 +518,14 @@ class ApplicationTest {
         ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(), "owner", "junk", null);
         String body = gson.toJson(jobInfo);
 
-        testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "URI: junk is not absolute");
+        testErrorCode(restClient().put(url, body), HttpStatus.BAD_REQUEST, "URI: junk is not absolute");
     }
 
     @Test
     void a1eChangingEiTypeGetRejected() throws Exception {
         putInfoProducerWithOneType("producer1", "typeId1");
         putInfoProducerWithOneType("producer2", "typeId2");
-        putEiJob("typeId1", "jobId");
+        putInfoJob("typeId1", "jobId");
 
         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
         String body = gson.toJson(infoJobInfo("typeId2", "jobId"));
@@ -518,7 +537,7 @@ class ApplicationTest {
     void consumerChangingInfoTypeGetRejected() throws Exception {
         putInfoProducerWithOneType("producer1", "typeId1");
         putInfoProducerWithOneType("producer2", "typeId2");
-        putEiJob("typeId1", "jobId");
+        putInfoJob("typeId1", "jobId");
 
         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId";
         String body = gson.toJson(consumerJobInfo("typeId2", "jobId"));
@@ -536,31 +555,36 @@ class ApplicationTest {
         String url = ProducerConsts.API_ROOT + "/info-types/" + TYPE_ID;
         String body = "{}";
         testErrorCode(restClient().put(url, body), HttpStatus.BAD_REQUEST, "No schema provided");
+
+        testErrorCode(restClient().post(url, body), HttpStatus.METHOD_NOT_ALLOWED, "", false);
     }
 
     @Test
     void producerDeleteEiType() throws Exception {
         putInfoType(TYPE_ID);
-        String url = ProducerConsts.API_ROOT + "/info-types/" + TYPE_ID;
-        restClient().delete(url).block();
-        assertThat(this.infoTypes.size()).isEqualTo(0);
+        this.putInfoJob(TYPE_ID, "job1");
+        this.putInfoJob(TYPE_ID, "job2");
+        deleteInfoType(TYPE_ID);
 
-        testErrorCode(restClient().delete(url), HttpStatus.NOT_FOUND, "Information type not found");
+        assertThat(this.infoTypes.size()).isZero();
+        assertThat(this.infoJobs.size()).isZero(); // Test that also the job is deleted
+
+        testErrorCode(restClient().delete(deleteInfoTypeUrl(TYPE_ID)), HttpStatus.NOT_FOUND,
+            "Information type not found");
     }
 
     @Test
     void producerDeleteEiTypeExistingProducer() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
         String url = ProducerConsts.API_ROOT + "/info-types/" + TYPE_ID;
-        testErrorCode(restClient().delete(url), HttpStatus.NOT_ACCEPTABLE,
-            "The type has active producers: " + PRODUCER_ID);
+        testErrorCode(restClient().delete(url), HttpStatus.CONFLICT, "The type has active producers: " + PRODUCER_ID);
         assertThat(this.infoTypes.size()).isEqualTo(1);
     }
 
     @Test
     void producerPutProducerWithOneType_rejecting()
         throws JsonMappingException, JsonProcessingException, ServiceException {
-        putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
+        putInfoProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
         String url = A1eConsts.API_ROOT + "/eijobs/" + EI_JOB_ID;
         String body = gson.toJson(infoJobInfo());
         restClient().put(url, body).block();
@@ -574,12 +598,12 @@ class ApplicationTest {
     }
 
     @Test
-    void producerGetEiProducerTypes() throws Exception {
+    void producerGetInfoProducerTypes() throws Exception {
         final String EI_TYPE_ID_2 = TYPE_ID + "_2";
         putInfoProducerWithOneType("producer1", TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         putInfoProducerWithOneType("producer2", EI_TYPE_ID_2);
-        putEiJob(EI_TYPE_ID_2, "jobId2");
+        putInfoJob(EI_TYPE_ID_2, "jobId2");
         String url = ProducerConsts.API_ROOT + "/info-types";
 
         ResponseEntity<String> resp = restClient().getForEntity(url).block();
@@ -589,7 +613,7 @@ class ApplicationTest {
     }
 
     @Test
-    void producerPutEiProducer() throws Exception {
+    void producerPutInfoProducer() throws Exception {
         this.putInfoType(TYPE_ID);
         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
@@ -598,7 +622,7 @@ class ApplicationTest {
         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
 
         assertThat(this.infoTypes.size()).isEqualTo(1);
-        assertThat(this.infoProducers.getProducersForType(TYPE_ID).size()).isEqualTo(1);
+        assertThat(this.infoProducers.getProducersForType(TYPE_ID)).hasSize(1);
         assertThat(this.infoProducers.size()).isEqualTo(1);
         assertThat(this.infoProducers.get("infoProducerId").getInfoTypes().iterator().next().getId())
             .isEqualTo(TYPE_ID);
@@ -606,15 +630,18 @@ class ApplicationTest {
         resp = restClient().putForEntity(url, body).block();
         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
 
+        // GET info producer
         resp = restClient().getForEntity(url).block();
         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
         assertThat(resp.getBody()).isEqualTo(body);
+
+        testErrorCode(restClient().get(url + "junk"), HttpStatus.NOT_FOUND, "Could not find Information Producer");
     }
 
     @Test
-    void producerPutEiProducerExistingJob() throws Exception {
+    void producerPutInfoProducerExistingJob() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
         restClient().putForEntity(url, body).block();
@@ -626,14 +653,14 @@ class ApplicationTest {
     }
 
     @Test
-    void testPutEiProducer_noType() throws Exception {
+    void testPutInfoProducer_noType() throws Exception {
         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
         testErrorCode(restClient().put(url, body), HttpStatus.NOT_FOUND, "Information type not found");
     }
 
     @Test
-    void producerPutProducerAndEiJob() throws Exception {
+    void producerPutProducerAndInfoJob() throws Exception {
         this.putInfoType(TYPE_ID);
         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
@@ -652,10 +679,10 @@ class ApplicationTest {
     }
 
     @Test
-    void producerGetEiJobsForProducer() throws JsonMappingException, JsonProcessingException, ServiceException {
+    void producerGetInfoJobsForProducer() throws JsonMappingException, JsonProcessingException, ServiceException {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId1");
-        putEiJob(TYPE_ID, "jobId2");
+        putInfoJob(TYPE_ID, "jobId1");
+        putInfoJob(TYPE_ID, "jobId2");
 
         // PUT a consumerRestApiTestBase.java
         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
@@ -672,7 +699,7 @@ class ApplicationTest {
     }
 
     @Test
-    void producerDeleteEiProducer() throws Exception {
+    void producerDeleteInfoProducer() throws Exception {
         putInfoProducerWithOneType("infoProducerId", TYPE_ID);
         putInfoProducerWithOneType("infoProducerId2", TYPE_ID);
 
@@ -680,18 +707,21 @@ class ApplicationTest {
         InfoType type = this.infoTypes.getType(TYPE_ID);
         assertThat(this.infoProducers.getProducerIdsForType(type.getId())).contains("infoProducerId");
         assertThat(this.infoProducers.getProducerIdsForType(type.getId())).contains("infoProducerId2");
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         assertThat(this.infoJobs.size()).isEqualTo(1);
 
-        deleteEiProducer("infoProducerId");
+        deleteInfoProducer("infoProducerId");
         assertThat(this.infoProducers.size()).isEqualTo(1);
         assertThat(this.infoProducers.getProducerIdsForType(TYPE_ID)).doesNotContain("infoProducerId");
         verifyJobStatus("jobId", "ENABLED");
 
-        deleteEiProducer("infoProducerId2");
+        deleteInfoProducer("infoProducerId2");
         assertThat(this.infoProducers.size()).isZero();
         assertThat(this.infoTypes.size()).isEqualTo(1);
         verifyJobStatus("jobId", "DISABLED");
+
+        String url = ProducerConsts.API_ROOT + "/info-producers/" + "junk";
+        testErrorCode(restClient().delete(url), HttpStatus.NOT_FOUND, "Could not find Information Producer");
     }
 
     @Test
@@ -700,21 +730,22 @@ class ApplicationTest {
         ProducerSimulatorController.TestResults producerCalls = this.producerSimulator.getTestResults();
 
         putInfoProducerWithOneType("infoProducerId", TYPE_ID);
-        putEiJob(TYPE_ID, "jobId");
+        putInfoJob(TYPE_ID, "jobId");
         putInfoProducerWithOneType("infoProducerId2", TYPE_ID);
         await().untilAsserted(() -> assertThat(producerCalls.jobsStarted.size()).isEqualTo(2));
 
-        deleteEiProducer("infoProducerId2");
+        deleteInfoProducer("infoProducerId2");
         assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains, one producer left
-        deleteEiProducer("infoProducerId");
+        deleteInfoProducer("infoProducerId");
         assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains
         assertThat(this.infoJobs.size()).isEqualTo(1); // The job remains
-        await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
-        assertThat(consumerCalls.status.get(0).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
+        await().untilAsserted(() -> assertThat(consumerCalls.eiJobStatusCallbacks.size()).isEqualTo(1));
+        assertThat(consumerCalls.eiJobStatusCallbacks.get(0).state)
+            .isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
 
         putInfoProducerWithOneType("infoProducerId", TYPE_ID);
-        await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
-        assertThat(consumerCalls.status.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
+        await().untilAsserted(() -> assertThat(consumerCalls.eiJobStatusCallbacks.size()).isEqualTo(2));
+        assertThat(consumerCalls.eiJobStatusCallbacks.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
     }
 
     @Test
@@ -723,28 +754,32 @@ class ApplicationTest {
 
         // Create a job
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, EI_JOB_ID);
+        putInfoJob(TYPE_ID, EI_JOB_ID);
 
         // change the type for the producer, the job shall be disabled
         putInfoProducerWithOneType(PRODUCER_ID, "junk");
         verifyJobStatus(EI_JOB_ID, "DISABLED");
         ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
-        await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
-        assertThat(consumerCalls.status.get(0).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
+        await().untilAsserted(() -> assertThat(consumerCalls.eiJobStatusCallbacks.size()).isEqualTo(1));
+        assertThat(consumerCalls.eiJobStatusCallbacks.get(0).state)
+            .isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
 
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
         verifyJobStatus(EI_JOB_ID, "ENABLED");
-        await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
-        assertThat(consumerCalls.status.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
+        await().untilAsserted(() -> assertThat(consumerCalls.eiJobStatusCallbacks.size()).isEqualTo(2));
+        assertThat(consumerCalls.eiJobStatusCallbacks.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
     }
 
     @Test
-    void producerGetProducerEiType() throws JsonMappingException, JsonProcessingException, ServiceException {
+    void producerGetProducerInfoType() throws JsonMappingException, JsonProcessingException, ServiceException {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
         String url = ProducerConsts.API_ROOT + "/info-types/" + TYPE_ID;
         ResponseEntity<String> resp = restClient().getForEntity(url).block();
         ProducerInfoTypeInfo info = gson.fromJson(resp.getBody(), ProducerInfoTypeInfo.class);
         assertThat(info.jobDataSchema).isNotNull();
+        assertThat(info.typeSpecificInformation).isNotNull();
+
+        testErrorCode(restClient().get(url + "junk"), HttpStatus.NOT_FOUND, "Information type not found");
     }
 
     @Test
@@ -767,17 +802,18 @@ class ApplicationTest {
     void producerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException {
 
         ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
-        putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
+        putInfoProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
 
         {
             // Create a job
             putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-            putEiJob(TYPE_ID, EI_JOB_ID);
+            putInfoJob(TYPE_ID, EI_JOB_ID);
             verifyJobStatus(EI_JOB_ID, "ENABLED");
-            deleteEiProducer(PRODUCER_ID);
+            deleteInfoProducer(PRODUCER_ID);
             // A Job disabled status notification shall now be received
-            await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
-            assertThat(consumerResults.status.get(0).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
+            await().untilAsserted(() -> assertThat(consumerResults.eiJobStatusCallbacks.size()).isEqualTo(1));
+            assertThat(consumerResults.eiJobStatusCallbacks.get(0).state)
+                .isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
             verifyJobStatus(EI_JOB_ID, "DISABLED");
         }
 
@@ -792,16 +828,17 @@ class ApplicationTest {
         assertThat(this.infoProducers.size()).isEqualTo(1);
         assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
 
-        // After 3 failed checks, the producer shall be deregisterred
+        // After 3 failed checks, the producer shall be deregistered
         this.producerSupervision.createTask().blockLast();
-        assertThat(this.infoProducers.size()).isEqualTo(0); // The producer is removed
+        assertThat(this.infoProducers.size()).isZero(); // The producer is removed
         assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains
 
         // Now we have one disabled job, and no producer.
         // PUT a producer, then a Job ENABLED status notification shall be received
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(2));
-        assertThat(consumerResults.status.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
+        await().untilAsserted(() -> assertThat(consumerResults.eiJobStatusCallbacks.size()).isEqualTo(2));
+        assertThat(consumerResults.eiJobStatusCallbacks.get(1).state)
+            .isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
         verifyJobStatus(EI_JOB_ID, "ENABLED");
     }
 
@@ -811,7 +848,7 @@ class ApplicationTest {
         // suceeded
 
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, EI_JOB_ID);
+        putInfoJob(TYPE_ID, EI_JOB_ID);
 
         InfoProducer producer = this.infoProducers.getProducer(PRODUCER_ID);
         InfoJob job = this.infoJobs.getJob(EI_JOB_ID);
@@ -824,15 +861,16 @@ class ApplicationTest {
         // Run the supervision and wait for the job to get started in the producer
         this.producerSupervision.createTask().blockLast();
         ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
-        await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
-        assertThat(consumerResults.status.get(0).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
+        await().untilAsserted(() -> assertThat(consumerResults.eiJobStatusCallbacks.size()).isEqualTo(1));
+        assertThat(consumerResults.eiJobStatusCallbacks.get(0).state)
+            .isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
         verifyJobStatus(EI_JOB_ID, "ENABLED");
     }
 
     @Test
     void testGetStatus() throws JsonMappingException, JsonProcessingException, ServiceException {
-        putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
-        putEiProducerWithOneTypeRejecting("simulateProducerError2", TYPE_ID);
+        putInfoProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
+        putInfoProducerWithOneTypeRejecting("simulateProducerError2", TYPE_ID);
 
         String url = "/status";
         ResponseEntity<String> resp = restClient().getForEntity(url).block();
@@ -842,8 +880,8 @@ class ApplicationTest {
     @Test
     void testEiJobDatabase() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
-        putEiJob(TYPE_ID, "jobId1");
-        putEiJob(TYPE_ID, "jobId2");
+        putInfoJob(TYPE_ID, "jobId1");
+        putInfoJob(TYPE_ID, "jobId2");
 
         assertThat(this.infoJobs.size()).isEqualTo(2);
 
@@ -864,7 +902,7 @@ class ApplicationTest {
             // Restore the jobs, no jobs in database
             InfoJobs jobs = new InfoJobs(this.applicationConfig, this.producerCallbacks);
             jobs.restoreJobsFromDatabase();
-            assertThat(jobs.size()).isEqualTo(0);
+            assertThat(jobs.size()).isZero();
         }
         logger.warn("Test removing a job when the db file is gone");
         this.infoJobs.remove("jobId1", this.infoProducers);
@@ -885,21 +923,136 @@ class ApplicationTest {
             InfoTypes types = new InfoTypes(this.applicationConfig);
             types.restoreTypesFromDatabase();
             assertThat(types.size()).isEqualTo(1);
-
         }
         {
             // Restore the jobs, no jobs in database
             InfoTypes types = new InfoTypes(this.applicationConfig);
             types.clear();
             types.restoreTypesFromDatabase();
-            assertThat(types.size()).isEqualTo(0);
+            assertThat(types.size()).isZero();
         }
         logger.warn("Test removing a job when the db file is gone");
         this.infoTypes.remove(this.infoTypes.getType(TYPE_ID));
-        assertThat(this.infoJobs.size()).isEqualTo(0);
+        assertThat(this.infoJobs.size()).isZero();
+    }
+
+    @Test
+    void testConsumerTypeSubscriptionDatabase() {
+        final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
+        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
+
+        // PUT a subscription
+        String body = gson.toJson(info);
+        restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+        assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+        InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+        assertThat(restoredSubscriptions.size()).isEqualTo(1);
+        assertThat(restoredSubscriptions.getSubscriptionsForOwner("owner")).hasSize(1);
+
+        // Delete the subscription
+        restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+        restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+        assertThat(restoredSubscriptions.size()).isZero();
+    }
+
+    @Test
+    void testConsumerTypeSubscription() throws Exception {
+
+        final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
+        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
+
+        testErrorCode(restClient().get(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
+            "Could not find Information subscription: junk");
+
+        testErrorCode(restClient().delete(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
+            "Could not find Information subscription: junk");
+
+        {
+            // PUT a subscription
+            String body = gson.toJson(info);
+            ResponseEntity<String> resp =
+                restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+            assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+            assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
+            resp = restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+            assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+        }
+        {
+            // GET IDs
+            ResponseEntity<String> resp = restClient().getForEntity(typeSubscriptionUrl()).block();
+            assertThat(resp.getBody()).isEqualTo("[\"subscriptionId\"]");
+            resp = restClient().getForEntity(typeSubscriptionUrl() + "?owner=owner").block();
+            assertThat(resp.getBody()).isEqualTo("[\"subscriptionId\"]");
+            resp = restClient().getForEntity(typeSubscriptionUrl() + "?owner=junk").block();
+            assertThat(resp.getBody()).isEqualTo("[]");
+        }
+
+        {
+            // GET the individual subscription
+            ResponseEntity<String> resp = restClient().getForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+            ConsumerTypeSubscriptionInfo respInfo = gson.fromJson(resp.getBody(), ConsumerTypeSubscriptionInfo.class);
+            assertThat(respInfo).isEqualTo(info);
+        }
+
+        {
+            // Test the callbacks
+            final ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+
+            // Test callback for PUT type
+            this.putInfoType(TYPE_ID);
+            await().untilAsserted(() -> assertThat(consumerCalls.typeRegistrationInfoCallbacks.size()).isEqualTo(1));
+            assertThat(consumerCalls.typeRegistrationInfoCallbacks.get(0).state)
+                .isEqualTo(ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED);
+
+            // Test callback for DELETE type
+            this.deleteInfoType(TYPE_ID);
+            await().untilAsserted(() -> assertThat(consumerCalls.typeRegistrationInfoCallbacks.size()).isEqualTo(2));
+            assertThat(consumerCalls.typeRegistrationInfoCallbacks.get(1).state)
+                .isEqualTo(ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED);
+        }
+
+        {
+            // DELETE the subscription
+            ResponseEntity<String> resp =
+                restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+            assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT);
+            assertThat(this.infoTypeSubscriptions.size()).isZero();
+            resp = restClient().getForEntity(typeSubscriptionUrl()).block();
+            assertThat(resp.getBody()).isEqualTo("[]");
+        }
+    }
+
+    @Test
+    void testRemovingNonWorkingSubscription() throws Exception {
+        // Test that subscriptions are removed for a unresponsive consumer
+
+        // PUT a subscription with a junk callback
+        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "/JUNK", "owner");
+        String body = gson.toJson(info);
+        restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+        assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+        this.putInfoType(TYPE_ID);
+        // The callback will fail and the subscription will be removed
+        await().untilAsserted(() -> assertThat(this.infoTypeSubscriptions.size()).isZero());
+    }
+
+    @Test
+    void testTypeSubscriptionErrorCodes() throws Exception {
+
+        testErrorCode(restClient().get(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
+            "Could not find Information subscription: junk");
+
+        testErrorCode(restClient().delete(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
+            "Could not find Information subscription: junk");
     }
 
-    private void deleteEiProducer(String infoProducerId) {
+    private String typeSubscriptionUrl() {
+        return ConsumerConsts.API_ROOT + "/info-type-subscription";
+    }
+
+    private void deleteInfoProducer(String infoProducerId) {
         String url = ProducerConsts.API_ROOT + "/info-producers/" + infoProducerId;
         restClient().deleteForEntity(url).block();
     }
@@ -918,9 +1071,9 @@ class ApplicationTest {
         assertThat(statusInfo.opState).isEqualTo(expectedOperationalState);
     }
 
-    ProducerInfoTypeInfo producerEiTypeRegistrationInfo(String typeId)
+    ProducerInfoTypeInfo ProducerInfoTypeRegistrationInfo(String typeId)
         throws JsonMappingException, JsonProcessingException {
-        return new ProducerInfoTypeInfo(jsonSchemaObject());
+        return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
     }
 
     ProducerRegistrationInfo producerEiRegistratioInfoRejecting(String typeId)
@@ -963,6 +1116,10 @@ class ApplicationTest {
         }
     }
 
+    private Object typeSpecifcInfoObject() {
+        return jsonObject("{ \"propertyName\" : \"value\" }");
+    }
+
     private Object jsonSchemaObject() {
         // a json schema with one mandatory property named "string"
         String schemaStr = "{" //
@@ -984,7 +1141,7 @@ class ApplicationTest {
         return jsonObject("{ " + EI_JOB_PROPERTY + " : \"value\" }");
     }
 
-    private InfoJob putEiJob(String infoTypeId, String jobId)
+    private InfoJob putInfoJob(String infoTypeId, String jobId)
         throws JsonMappingException, JsonProcessingException, ServiceException {
 
         String url = A1eConsts.API_ROOT + "/eijobs/" + jobId;
@@ -997,15 +1154,22 @@ class ApplicationTest {
     private HttpStatus putInfoType(String infoTypeId)
         throws JsonMappingException, JsonProcessingException, ServiceException {
         String url = ProducerConsts.API_ROOT + "/info-types/" + infoTypeId;
-        String body = gson.toJson(producerEiTypeRegistrationInfo(infoTypeId));
+        String body = gson.toJson(ProducerInfoTypeRegistrationInfo(infoTypeId));
 
         ResponseEntity<String> resp = restClient().putForEntity(url, body).block();
         this.infoTypes.getType(infoTypeId);
         return resp.getStatusCode();
+    }
+
+    private String deleteInfoTypeUrl(String typeId) {
+        return ProducerConsts.API_ROOT + "/info-types/" + typeId;
+    }
 
+    private void deleteInfoType(String typeId) {
+        restClient().delete(deleteInfoTypeUrl(typeId)).block();
     }
 
-    private InfoType putEiProducerWithOneTypeRejecting(String producerId, String infoTypeId)
+    private InfoType putInfoProducerWithOneTypeRejecting(String producerId, String infoTypeId)
         throws JsonMappingException, JsonProcessingException, ServiceException {
         this.putInfoType(infoTypeId);
         String url = ProducerConsts.API_ROOT + "/info-producers/" + producerId;