+ assertThat(this.infoProducers.size()).isZero(); // The producer is removed
+ assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains
+
+ // Now we have one disabled job, and no producer.
+ // PUT a producer, then a Job ENABLED status notification shall be received
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerResults.eiJobStatusCallbacks.size()).isEqualTo(2));
+ assertThat(consumerResults.eiJobStatusCallbacks.get(1).state)
+ .isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
+ }
+
+ @Test
+ void producerSupervision2() throws JsonMappingException, JsonProcessingException, ServiceException {
+ // Test that supervision enables not enabled jobs and sends a notification when
+ // suceeded
+
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+ putInfoJob(TYPE_ID, EI_JOB_ID);
+
+ InfoProducer producer = this.infoProducers.getProducer(PRODUCER_ID);
+ InfoJob job = this.infoJobs.getJob(EI_JOB_ID);
+ // Pretend that the producer did reject the job and the a DISABLED notification
+ // is sent for the job
+ producer.setJobDisabled(job);
+ job.setLastReportedStatus(false);
+ verifyJobStatus(EI_JOB_ID, "DISABLED");
+
+ // Run the supervision and wait for the job to get started in the producer
+ this.producerSupervision.createTask().blockLast();
+ ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(consumerResults.eiJobStatusCallbacks.size()).isEqualTo(1));
+ assertThat(consumerResults.eiJobStatusCallbacks.get(0).state)
+ .isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
+ }
+
+ @Test
+ void testGetStatus() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putInfoProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
+ putInfoProducerWithOneTypeRejecting("simulateProducerError2", TYPE_ID);
+
+ String url = "/status";
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).contains("hunky dory");
+ }
+
+ @Test
+ void testEiJobDatabase() throws Exception {
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+ putInfoJob(TYPE_ID, "jobId1");
+ putInfoJob(TYPE_ID, "jobId2");
+
+ assertThat(this.infoJobs.size()).isEqualTo(2);
+
+ {
+ InfoJob savedJob = this.infoJobs.getJob("jobId1");
+ // Restore the jobs
+ InfoJobs jobs = new InfoJobs(this.applicationConfig, this.producerCallbacks);
+ jobs.restoreJobsFromDatabase();
+ assertThat(jobs.size()).isEqualTo(2);
+ InfoJob restoredJob = jobs.getJob("jobId1");
+ assertThat(restoredJob.getId()).isEqualTo("jobId1");
+ assertThat(restoredJob.getLastUpdated()).isEqualTo(savedJob.getLastUpdated());
+
+ jobs.remove("jobId1", this.infoProducers);
+ jobs.remove("jobId2", this.infoProducers);
+ }
+ {
+ // Restore the jobs, no jobs in database
+ InfoJobs jobs = new InfoJobs(this.applicationConfig, this.producerCallbacks);
+ jobs.restoreJobsFromDatabase();
+ assertThat(jobs.size()).isZero();
+ }
+ logger.warn("Test removing a job when the db file is gone");
+ this.infoJobs.remove("jobId1", this.infoProducers);
+ assertThat(this.infoJobs.size()).isEqualTo(1);
+
+ ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(3));
+ }
+
+ @Test
+ void testEiTypesDatabase() throws Exception {
+ putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
+
+ assertThat(this.infoTypes.size()).isEqualTo(1);
+
+ {
+ // Restore the types
+ InfoTypes types = new InfoTypes(this.applicationConfig);
+ types.restoreTypesFromDatabase();
+ assertThat(types.size()).isEqualTo(1);
+ }
+ {
+ // Restore the jobs, no jobs in database
+ InfoTypes types = new InfoTypes(this.applicationConfig);
+ types.clear();
+ types.restoreTypesFromDatabase();
+ assertThat(types.size()).isZero();
+ }
+ logger.warn("Test removing a job when the db file is gone");
+ this.infoTypes.remove(this.infoTypes.getType(TYPE_ID));
+ assertThat(this.infoJobs.size()).isZero();
+ }
+
+ @Test
+ void testConsumerTypeSubscriptionDatabase() {
+ final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
+
+ // PUT a subscription
+ String body = gson.toJson(info);
+ restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+ InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+ assertThat(restoredSubscriptions.size()).isEqualTo(1);
+ assertThat(restoredSubscriptions.getSubscriptionsForOwner("owner")).hasSize(1);
+
+ // Delete the subscription
+ restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+ restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+ assertThat(restoredSubscriptions.size()).isZero();
+ }
+
+ @Test
+ void testConsumerTypeSubscription() throws Exception {
+
+ final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
+
+ testErrorCode(restClient().get(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
+ "Could not find Information subscription: junk");
+
+ testErrorCode(restClient().delete(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
+ "Could not find Information subscription: junk");
+
+ {
+ // PUT a subscription
+ String body = gson.toJson(info);
+ ResponseEntity<String> resp =
+ restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
+ resp = restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+ }
+ {
+ // GET IDs
+ ResponseEntity<String> resp = restClient().getForEntity(typeSubscriptionUrl()).block();
+ assertThat(resp.getBody()).isEqualTo("[\"subscriptionId\"]");
+ resp = restClient().getForEntity(typeSubscriptionUrl() + "?owner=owner").block();
+ assertThat(resp.getBody()).isEqualTo("[\"subscriptionId\"]");
+ resp = restClient().getForEntity(typeSubscriptionUrl() + "?owner=junk").block();
+ assertThat(resp.getBody()).isEqualTo("[]");
+ }
+
+ {
+ // GET the individual subscription
+ ResponseEntity<String> resp = restClient().getForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+ ConsumerTypeSubscriptionInfo respInfo = gson.fromJson(resp.getBody(), ConsumerTypeSubscriptionInfo.class);
+ assertThat(respInfo).isEqualTo(info);
+ }
+
+ {
+ // Test the callbacks
+ final ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+
+ // Test callback for PUT type
+ this.putInfoType(TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerCalls.typeRegistrationInfoCallbacks.size()).isEqualTo(1));
+ assertThat(consumerCalls.typeRegistrationInfoCallbacks.get(0).state)
+ .isEqualTo(ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED);
+
+ // Test callback for DELETE type
+ this.deleteInfoType(TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerCalls.typeRegistrationInfoCallbacks.size()).isEqualTo(2));
+ assertThat(consumerCalls.typeRegistrationInfoCallbacks.get(1).state)
+ .isEqualTo(ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED);
+ }
+
+ {
+ // DELETE the subscription
+ ResponseEntity<String> resp =
+ restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT);
+ assertThat(this.infoTypeSubscriptions.size()).isZero();
+ resp = restClient().getForEntity(typeSubscriptionUrl()).block();
+ assertThat(resp.getBody()).isEqualTo("[]");
+ }