import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.gson.JsonParser;
import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
-import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.r1.ProducerJobInfo;
import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@Autowired
private ApplicationConfig applicationConfig;
- @Autowired
- private ProducerRegstrationTask producerRegstrationTask;
-
@Autowired
private Jobs jobs;
@Autowired
private EcsSimulatorController ecsSimulatorController;
+ private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+
@LocalServerPort
int localServerHttpPort;
this.consumerController.testResults.reset();
this.ecsSimulatorController.testResults.reset();
this.jobs.clear();
- this.types.clear();
}
private AsyncRestClient restClient(boolean useTrustValidation) {
}
private ConsumerJobInfo consumerJobInfo() {
- InfoType type = this.types.getAll().iterator().next();
- return consumerJobInfo(type.getId(), "EI_JOB_ID");
+ return consumerJobInfo("DmaapInformationType", "EI_JOB_ID");
}
private Object jsonObject() {
}
}
+ @Test
+ void testResponseCodes() throws Exception {
+ String supervisionUrl = baseUrl() + ProducerCallbacksController.SUPERVISION_URL;
+ ResponseEntity<String> resp = restClient().getForEntity(supervisionUrl).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+
+ String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
+ resp = restClient().deleteForEntity(jobUrl + "/junk").block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+
+ ProducerJobInfo info = new ProducerJobInfo(null, "id", "typeId", "targetUri", "owner", "lastUpdated");
+ String body = gson.toJson(info);
+ testErrorCode(restClient().post(jobUrl, body, MediaType.APPLICATION_JSON), HttpStatus.NOT_FOUND,
+ "Could not find type");
+ }
+
@Test
void testWholeChain() throws Exception {
final String JOB_ID = "ID";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create a job
this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2));
assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
+ String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
+ String jobs = restClient().get(jobUrl).block();
+ assertThat(jobs).contains(JOB_ID);
+
// Delete the job
this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
+ @Test
+ void testReRegister() throws Exception {
+ // Wait foir register types and producer
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Clear the registration, should trigger a re-register
+ ecsSimulatorController.testResults.reset();
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Just clear the registerred types, should trigger a re-register
+ ecsSimulatorController.testResults.types.clear();
+ await().untilAsserted(
+ () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
+ }
+
+ @Test
+ void testCreateKafkaJob() {
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ final String TYPE_ID = "KafkaInformationType";
+
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+ // Create a job
+ this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Delete the job
+ this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
+ testErrorCode(request, expStatus, responseContains, true);
+ }
+
+ private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains,
+ boolean expectApplicationProblemJsonMediaType) {
+ StepVerifier.create(request) //
+ .expectSubscription() //
+ .expectErrorMatches(
+ t -> checkWebClientError(t, expStatus, responseContains, expectApplicationProblemJsonMediaType)) //
+ .verify();
+ }
+
+ private boolean checkWebClientError(Throwable throwable, HttpStatus expStatus, String responseContains,
+ boolean expectApplicationProblemJsonMediaType) {
+ assertTrue(throwable instanceof WebClientResponseException);
+ WebClientResponseException responseException = (WebClientResponseException) throwable;
+ assertThat(responseException.getStatusCode()).isEqualTo(expStatus);
+ assertThat(responseException.getResponseBodyAsString()).contains(responseContains);
+ if (expectApplicationProblemJsonMediaType) {
+ assertThat(responseException.getHeaders().getContentType()).isEqualTo(MediaType.APPLICATION_PROBLEM_JSON);
+ }
+ return true;
+ }
}