X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Ftest%2Fjava%2Forg%2Foran%2Fdmaapadapter%2FIntegrationWithEcs.java;h=c8fcb8320b7f713c02ff898774fac7b8fbd62ad7;hb=242299199382ec3fd7d514dde2eb607086a6a46e;hp=376d23e532cb1c6e8a5ba81172332afe86902ee0;hpb=0f6367023720ecc7d7b4b38cbbc4282792172a89;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java index 376d23e5..c8fcb832 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java @@ -38,8 +38,8 @@ import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.r1.ConsumerJobInfo; -import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; import org.springframework.beans.factory.annotation.Autowired; @@ -63,7 +63,8 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; }) class IntegrationWithEcs { - private static final String EI_JOB_ID = "EI_JOB_ID"; + private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID"; + private static final String DMAAP_TYPE_ID = "DmaapInformationType"; @Autowired private ApplicationConfig applicationConfig; @@ -128,8 +129,7 @@ class IntegrationWithEcs { @AfterEach void reset() { this.consumerController.testResults.reset(); - this.jobs.clear(); - this.types.clear(); + assertThat(this.jobs.size()).isZero(); } private AsyncRestClient restClient(boolean useTrustValidation) { @@ -165,11 +165,11 @@ class IntegrationWithEcs { } private String jobUrl(String jobId) { - return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId; + return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId + "?typeCheck=true"; } - private void createInformationJobInEcs(String jobId) { - String body = gson.toJson(consumerJobInfo()); + private void createInformationJobInEcs(String typeId, String jobId, String filter) { + String body = gson.toJson(consumerJobInfo(typeId, filter)); try { // Delete the job if it already exists deleteInformationJobInEcs(jobId); @@ -182,13 +182,8 @@ class IntegrationWithEcs { restClient().delete(jobUrl(jobId)).block(); } - private ConsumerJobInfo consumerJobInfo() { - InfoType type = this.types.getAll().iterator().next(); - return consumerJobInfo(type.getId(), EI_JOB_ID); - } - - private Object jsonObject() { - return jsonObject("{}"); + private ConsumerJobInfo consumerJobInfo(String typeId, String filter) { + return consumerJobInfo(typeId, DMAAP_JOB_ID, filter); } private Object jsonObject(String json) { @@ -199,31 +194,60 @@ class IntegrationWithEcs { } } - private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) { + private String quote(String str) { + return "\"" + str + "\""; + } + + private String consumerUri() { + return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL; + } + + private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, String filter) { try { - String targetUri = selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL; - return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, ""); + + String jsonStr = "{ \"filter\" :" + quote(filter) + "}"; + return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), ""); } catch (Exception e) { return null; } } + @Test + void testCreateKafkaJob() { + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + final String TYPE_ID = "KafkaInformationType"; + + Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); + + ConsumerJobInfo jobInfo = + new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); + String body = gson.toJson(jobInfo); + + restClient().putForEntity(jobUrl("KAFKA_JOB_ID"), body).block(); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + deleteInformationJobInEcs("KAFKA_JOB_ID"); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + @Test void testWholeChain() throws Exception { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); - createInformationJobInEcs(EI_JOB_ID); + createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*"); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); DmaapSimulatorController.dmaapResponses.add("DmaapResponse1"); DmaapSimulatorController.dmaapResponses.add("DmaapResponse2"); + DmaapSimulatorController.dmaapResponses.add("Junk"); ConsumerController.TestResults results = this.consumerController.testResults; await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2)); assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); - deleteInformationJobInEcs(EI_JOB_ID); + deleteInformationJobInEcs(DMAAP_JOB_ID); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());