NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / test / java / org / oran / dmaapadapter / IntegrationWithEcs.java
index 376d23e..c8fcb83 100644 (file)
@@ -38,8 +38,8 @@ import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
-import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -63,7 +63,8 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
 })
 class IntegrationWithEcs {
 
-    private static final String EI_JOB_ID = "EI_JOB_ID";
+    private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID";
+    private static final String DMAAP_TYPE_ID = "DmaapInformationType";
 
     @Autowired
     private ApplicationConfig applicationConfig;
@@ -128,8 +129,7 @@ class IntegrationWithEcs {
     @AfterEach
     void reset() {
         this.consumerController.testResults.reset();
-        this.jobs.clear();
-        this.types.clear();
+        assertThat(this.jobs.size()).isZero();
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
@@ -165,11 +165,11 @@ class IntegrationWithEcs {
     }
 
     private String jobUrl(String jobId) {
-        return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId;
+        return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId + "?typeCheck=true";
     }
 
-    private void createInformationJobInEcs(String jobId) {
-        String body = gson.toJson(consumerJobInfo());
+    private void createInformationJobInEcs(String typeId, String jobId, String filter) {
+        String body = gson.toJson(consumerJobInfo(typeId, filter));
         try {
             // Delete the job if it already exists
             deleteInformationJobInEcs(jobId);
@@ -182,13 +182,8 @@ class IntegrationWithEcs {
         restClient().delete(jobUrl(jobId)).block();
     }
 
-    private ConsumerJobInfo consumerJobInfo() {
-        InfoType type = this.types.getAll().iterator().next();
-        return consumerJobInfo(type.getId(), EI_JOB_ID);
-    }
-
-    private Object jsonObject() {
-        return jsonObject("{}");
+    private ConsumerJobInfo consumerJobInfo(String typeId, String filter) {
+        return consumerJobInfo(typeId, DMAAP_JOB_ID, filter);
     }
 
     private Object jsonObject(String json) {
@@ -199,31 +194,60 @@ class IntegrationWithEcs {
         }
     }
 
-    private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) {
+    private String quote(String str) {
+        return "\"" + str + "\"";
+    }
+
+    private String consumerUri() {
+        return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+    }
+
+    private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, String filter) {
         try {
-            String targetUri = selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-            return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, "");
+
+            String jsonStr = "{ \"filter\" :" + quote(filter) + "}";
+            return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), "");
         } catch (Exception e) {
             return null;
         }
     }
 
+    @Test
+    void testCreateKafkaJob() {
+        await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+        final String TYPE_ID = "KafkaInformationType";
+
+        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+
+        ConsumerJobInfo jobInfo =
+                new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
+        String body = gson.toJson(jobInfo);
+
+        restClient().putForEntity(jobUrl("KAFKA_JOB_ID"), body).block();
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        deleteInformationJobInEcs("KAFKA_JOB_ID");
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
+
     @Test
     void testWholeChain() throws Exception {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
 
-        createInformationJobInEcs(EI_JOB_ID);
+        createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         DmaapSimulatorController.dmaapResponses.add("DmaapResponse1");
         DmaapSimulatorController.dmaapResponses.add("DmaapResponse2");
+        DmaapSimulatorController.dmaapResponses.add("Junk");
 
         ConsumerController.TestResults results = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2));
         assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
 
-        deleteInformationJobInEcs(EI_JOB_ID);
+        deleteInformationJobInEcs(DMAAP_JOB_ID);
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());