+ @Test
+ void testReRegister() throws Exception {
+ // Wait foir register types and producer
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Clear the registration, should trigger a re-register
+ ecsSimulatorController.testResults.reset();
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Just clear the registerred types, should trigger a re-register
+ ecsSimulatorController.testResults.types.clear();
+ await().untilAsserted(
+ () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
+ }
+
+ @Test
+ void testCreateKafkaJob() {
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ final String TYPE_ID = "KafkaInformationType";
+
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+ // Create a job
+ this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Delete the job
+ this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
+ testErrorCode(request, expStatus, responseContains, true);
+ }
+
+ private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains,
+ boolean expectApplicationProblemJsonMediaType) {
+ StepVerifier.create(request) //
+ .expectSubscription() //
+ .expectErrorMatches(
+ t -> checkWebClientError(t, expStatus, responseContains, expectApplicationProblemJsonMediaType)) //
+ .verify();
+ }
+
+ private boolean checkWebClientError(Throwable throwable, HttpStatus expStatus, String responseContains,
+ boolean expectApplicationProblemJsonMediaType) {
+ assertTrue(throwable instanceof WebClientResponseException);
+ WebClientResponseException responseException = (WebClientResponseException) throwable;
+ assertThat(responseException.getStatusCode()).isEqualTo(expStatus);
+ assertThat(responseException.getResponseBodyAsString()).contains(responseContains);
+ if (expectApplicationProblemJsonMediaType) {
+ assertThat(responseException.getHeaders().getContentType()).isEqualTo(MediaType.APPLICATION_PROBLEM_JSON);
+ }
+ return true;
+ }