+ private void assertProducerOpState(String producerId,
+ ProducerStatusInfo.OperationalState expectedOperationalState) {
+ String statusUrl = ProducerConsts.API_ROOT + "/eiproducers/" + producerId + "/status";
+ ResponseEntity<String> resp = restClient().getForEntity(statusUrl).block();
+ ProducerStatusInfo statusInfo = gson.fromJson(resp.getBody(), ProducerStatusInfo.class);
+ assertThat(statusInfo.opState).isEqualTo(expectedOperationalState);
+ }
+
+ @Test
+ void testProducerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
+
+ assertThat(this.eiProducers.size()).isEqualTo(1);
+ assertThat(this.eiTypes.size()).isEqualTo(1);
+ assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.ENABLED);
+
+ this.producerSupervision.createTask().blockLast();
+ this.producerSupervision.createTask().blockLast();
+ assertThat(this.eiProducers.size()).isEqualTo(1);
+ assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
+
+ // After 3 failed checks, the producer shall be deregisterred
+ this.producerSupervision.createTask().blockLast();
+ assertThat(this.eiProducers.size()).isEqualTo(0);
+ assertThat(this.eiTypes.size()).isEqualTo(0);
+ }
+
+ @Test
+ void testGetStatus() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
+ putEiProducerWithOneTypeRejecting("simulateProducerError2", EI_TYPE_ID);
+
+ String url = "/status";
+ ResponseEntity<String> resp = restClient().getForEntity(url).block();
+ assertThat(resp.getBody()).contains("hunky dory");
+ }
+