+
+ url = ProducerConsts.API_ROOT + "/eiproducers?ei_type_id=" + EI_TYPE_ID;
+ resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).contains(EI_PRODUCER_ID);
+
+ url = ProducerConsts.API_ROOT + "/eiproducers?ei_type_id=junk";
+ resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).isEqualTo("[]");
+ }
+
+ @Test
+ void testProducerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
+
+ {
+ // Create a job
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, EI_JOB_ID);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
+ deleteEiProducer(EI_PRODUCER_ID);
+ verifyJobStatus(EI_JOB_ID, "DISABLED");
+ }
+
+ // Job disabled status notification shall be received
+ ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
+ assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+
+ assertThat(this.eiProducers.size()).isEqualTo(1);
+ assertThat(this.eiTypes.size()).isEqualTo(1);
+ assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.ENABLED);
+
+ this.producerSupervision.createTask().blockLast();
+ this.producerSupervision.createTask().blockLast();
+
+ // Now we have one producer that is disabled
+ assertThat(this.eiProducers.size()).isEqualTo(1);
+ assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
+
+ // After 3 failed checks, the producer and the type shall be deregisterred
+ this.producerSupervision.createTask().blockLast();
+ assertThat(this.eiProducers.size()).isEqualTo(0); // The producer is removed
+ assertThat(this.eiTypes.size()).isEqualTo(1); // The type remains
+
+ }
+
+ @Test
+ void testGetStatus() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
+ putEiProducerWithOneTypeRejecting("simulateProducerError2", EI_TYPE_ID);
+
+ String url = "/status";
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).contains("hunky dory");
+ }
+
+ @Test
+ void testEiJobDatabase() throws Exception {
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, "jobId1");
+ putEiJob(EI_TYPE_ID, "jobId2");
+
+ assertThat(this.eiJobs.size()).isEqualTo(2);
+
+ {
+ // Restore the jobs
+ EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks);
+ jobs.restoreJobsFromDatabase();
+ assertThat(jobs.size()).isEqualTo(2);
+ jobs.remove("jobId1", this.eiProducers);
+ jobs.remove("jobId2", this.eiProducers);
+ }
+ {
+ // Restore the jobs, no jobs in database
+ EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks);
+ jobs.restoreJobsFromDatabase();
+ assertThat(jobs.size()).isEqualTo(0);
+ }
+ logger.warn("Test removing a job when the db file is gone");
+ this.eiJobs.remove("jobId1", this.eiProducers);
+ assertThat(this.eiJobs.size()).isEqualTo(1);
+
+ ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(3));
+ }
+
+ @Test
+ void testEiTypesDatabase() throws Exception {
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+
+ assertThat(this.eiTypes.size()).isEqualTo(1);
+
+ {
+ // Restore the types
+ EiTypes types = new EiTypes(this.applicationConfig);
+ types.restoreTypesFromDatabase();
+ assertThat(types.size()).isEqualTo(1);
+
+ }
+ {
+ // Restore the jobs, no jobs in database
+ EiTypes types = new EiTypes(this.applicationConfig);
+ types.clear();
+ types.restoreTypesFromDatabase();
+ assertThat(types.size()).isEqualTo(0);
+ }
+ logger.warn("Test removing a job when the db file is gone");
+ this.eiTypes.remove(this.eiTypes.getType(EI_TYPE_ID));
+ assertThat(this.eiJobs.size()).isEqualTo(0);
+ }
+
+ private void deleteEiProducer(String eiProducerId) {
+ String url = ProducerConsts.API_ROOT + "/eiproducers/" + eiProducerId;
+ restClient().deleteForEntity(url).block();
+ }
+
+ private void verifyJobStatus(String jobId, String expStatus) {
+ String url = ConsumerConsts.API_ROOT + "/eijobs/" + jobId + "/status";
+ String rsp = restClient().get(url).block();
+ assertThat(rsp).contains(expStatus);
+ }
+
+ private void assertProducerOpState(String producerId,
+ ProducerStatusInfo.OperationalState expectedOperationalState) {
+ String statusUrl = ProducerConsts.API_ROOT + "/eiproducers/" + producerId + "/status";
+ ResponseEntity<String> resp = restClient().getForEntity(statusUrl).block();
+ ProducerStatusInfo statusInfo = gson.fromJson(resp.getBody(), ProducerStatusInfo.class);
+ assertThat(statusInfo.opState).isEqualTo(expectedOperationalState);