+ void producerDeleteEiProducer() throws Exception {
+ putInfoProducerWithOneType("infoProducerId", TYPE_ID);
+ putInfoProducerWithOneType("infoProducerId2", TYPE_ID);
+
+ assertThat(this.infoProducers.size()).isEqualTo(2);
+ InfoType type = this.infoTypes.getType(TYPE_ID);
+ assertThat(this.infoProducers.getProducerIdsForType(type.getId())).contains("infoProducerId");
+ assertThat(this.infoProducers.getProducerIdsForType(type.getId())).contains("infoProducerId2");
+ putEiJob(TYPE_ID, "jobId");
+ assertThat(this.infoJobs.size()).isEqualTo(1);
+
+ deleteEiProducer("infoProducerId");
+ assertThat(this.infoProducers.size()).isEqualTo(1);
+ assertThat(this.infoProducers.getProducerIdsForType(TYPE_ID)).doesNotContain("infoProducerId");
+ verifyJobStatus("jobId", "ENABLED");
+
+ deleteEiProducer("infoProducerId2");
+ assertThat(this.infoProducers.size()).isZero();
+ assertThat(this.infoTypes.size()).isEqualTo(1);
+ verifyJobStatus("jobId", "DISABLED");
+ }
+
+ @Test
+ void a1eJobStatusNotifications() throws JsonMappingException, JsonProcessingException, ServiceException {
+ ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+ ProducerSimulatorController.TestResults producerCalls = this.producerSimulator.getTestResults();
+
+ putInfoProducerWithOneType("infoProducerId", TYPE_ID);
+ putEiJob(TYPE_ID, "jobId");
+ putInfoProducerWithOneType("infoProducerId2", TYPE_ID);
+ await().untilAsserted(() -> assertThat(producerCalls.jobsStarted.size()).isEqualTo(2));
+
+ deleteEiProducer("infoProducerId2");
+ assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains, one producer left
+ deleteEiProducer("infoProducerId");
+ assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains
+ assertThat(this.infoJobs.size()).isEqualTo(1); // The job remains
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
+ assertThat(consumerCalls.status.get(0).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
+
+ putInfoProducerWithOneType("infoProducerId", TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
+ assertThat(consumerCalls.status.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
+ }
+
+ @Test
+ void a1eJobStatusNotifications2() throws JsonMappingException, JsonProcessingException, ServiceException {
+ // Test replacing a producer with new and removed types
+
+ // Create a job
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+ putEiJob(TYPE_ID, EI_JOB_ID);
+
+ // change the type for the producer, the job shall be disabled
+ putInfoProducerWithOneType(PRODUCER_ID, "junk");
+ verifyJobStatus(EI_JOB_ID, "DISABLED");
+ ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
+ assertThat(consumerCalls.status.get(0).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
+
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
+ assertThat(consumerCalls.status.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
+ }
+
+ @Test
+ void producerGetProducerEiType() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+ String url = ProducerConsts.API_ROOT + "/info-types/" + TYPE_ID;
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ ProducerInfoTypeInfo info = gson.fromJson(resp.getBody(), ProducerInfoTypeInfo.class);
+ assertThat(info.jobDataSchema).isNotNull();
+ }
+
+ @Test
+ void producerGetProducerIdentifiers() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+ String url = ProducerConsts.API_ROOT + "/info-producers";
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).contains(PRODUCER_ID);
+
+ url = ProducerConsts.API_ROOT + "/info-producers?info_type_id=" + TYPE_ID;
+ resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).contains(PRODUCER_ID);
+
+ url = ProducerConsts.API_ROOT + "/info-producers?info_type_id=junk";
+ resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).isEqualTo("[]");
+ }
+
+ @Test
+ void producerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException {
+
+ ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
+ putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
+
+ {
+ // Create a job
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+ putEiJob(TYPE_ID, EI_JOB_ID);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
+ deleteEiProducer(PRODUCER_ID);
+ // A Job disabled status notification shall now be received
+ await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
+ assertThat(consumerResults.status.get(0).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
+ verifyJobStatus(EI_JOB_ID, "DISABLED");
+ }
+
+ assertThat(this.infoProducers.size()).isEqualTo(1);
+ assertThat(this.infoTypes.size()).isEqualTo(1);
+ assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.ENABLED);
+
+ this.producerSupervision.createTask().blockLast();
+ this.producerSupervision.createTask().blockLast();