+ void testResponseCodes() throws Exception {
+ String supervisionUrl = baseUrl() + ProducerCallbacksController.SUPERVISION_URL;
+ ResponseEntity<String> resp = restClient().getForEntity(supervisionUrl).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+
+ String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
+ resp = restClient().deleteForEntity(jobUrl + "/junk").block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+
+ ProducerJobInfo info = new ProducerJobInfo(null, "id", "typeId", "targetUri", "owner", "lastUpdated");
+ String body = gson.toJson(info);
+ testErrorCode(restClient().post(jobUrl, body, MediaType.APPLICATION_JSON), HttpStatus.NOT_FOUND,
+ "Could not find type");
+ }
+
+ @Test
+ void testReceiveAndPostDataFromKafka() {
+ final String JOB_ID = "ID";
+ final String TYPE_ID = "KafkaInformationType";
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Create a job
+ Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo kafkaJobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+ this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+
+ // Handle received data from Kafka, check that it has been posted to the
+ // consumer
+ kafkaConsumer.start(Flux.just("data"));
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1));
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+
+ // Test send an exception
+ kafkaConsumer.start(Flux.error(new NullPointerException()));
+
+ // Test regular restart of stopped
+ kafkaConsumer.stop();
+ this.kafkaTopicConsumers.restartNonRunningTopics();
+ await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
+
+ // Delete the job
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ @Test
+ void testReceiveAndPostDataFromDmaap() throws Exception {
+ final String JOB_ID = "ID";