NONRTRIC - DMaap adapter added handling of json 03/8503/2
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 8 Jun 2022 08:12:00 +0000 (10:12 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 9 Jun 2022 07:41:44 +0000 (09:41 +0200)
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-764
Change-Id: I688a843cd3d7cd07e7ee82fe8d3f4693959a1092

docs/conf.py
docs/overview.rst
src/main/java/org/oran/dmaapadapter/repository/InfoType.java
src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/resources/test_application_configuration.json

index 4cc4265..c9cfb91 100644 (file)
@@ -4,6 +4,8 @@ from docs_conf.conf import *
 
 branch = 'latest'
 
+language = 'en'
+
 linkcheck_ignore = [
     'http://localhost.*',
     'http://127.0.0.1.*',
index 212e6b6..db78630 100644 (file)
@@ -46,6 +46,13 @@ Each entry will be registered as a subscribe information type in ICS. The follow
 
 * dataType, this can be set to "pmData" which gives a possibility to perform a special filtering of PM data.
 
+* isJson, this indicates that the received is Json objects (from Kafka a stream of objects and from DMaaP an array of quoted json objects).
+  Default value is false.
+  If the received data is Json objects, the data sent to the consumer does not need to be quoted.
+  When buffering is used the output will be an array of json objects '[{},{}]' as opposed to an array of strings '["string1", "string2"]'.
+  When buffering is not used, the output content-type will be 'application/json' as opposed to 'text/plain'. When buffering is used, the
+  output content-type will 'application/json' regardless of this parameter.
+
 These parameters will be used to choose which parameter schemas that defines which parameters that can be used when creating an information job/data subscription.
 
 Below follows an example of a configuration file.
@@ -57,7 +64,8 @@ Below follows an example of a configuration file.
           {
              "id": "DmaapInformationType",
              "dmaapTopicUrl": "/dmaap-topic-1",
-             "useHttpProxy": true
+             "useHttpProxy": true,
+             "isJson" : true
           },
           {
              "id": "KafkaInformationType",
@@ -66,7 +74,8 @@ Below follows an example of a configuration file.
           {
              "id": "PmInformationType",
              "dmaapTopicUrl": "/dmaap-topic-2",
-             "dataType": "PmData"
+             "dataType": "PmData",
+             "isJson" : true
           }
        ]
     }
index c919931..d7f89be 100644 (file)
@@ -42,12 +42,17 @@ public class InfoType {
 
     private final String dataType;
 
-    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType) {
+    @Getter
+    private boolean isJson = false;
+
+    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType,
+            boolean isJson) {
         this.id = id;
         this.dmaapTopicUrl = dmaapTopicUrl;
         this.useHttpProxy = useHttpProxy;
         this.kafkaInputTopic = kafkaInputTopic;
         this.dataType = dataType;
+        this.isJson = isJson;
     }
 
     public boolean isKafkaTopicDefined() {
index c13d36c..b361616 100644 (file)
 
 package org.oran.dmaapadapter.repository.filters;
 
-
 import java.util.ArrayList;
 import java.util.Collection;
 
-class PmReport {
+public class PmReport {
 
     Event event = new Event();
 
index 99904df..3a13c76 100644 (file)
@@ -92,7 +92,8 @@ public class JobDataConsumer {
 
     private Mono<String> postToClient(String body) {
         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
-        MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
+        MediaType contentType =
+                this.job.isBuffered() || this.job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
         return job.getConsumerRestClient().post("", body, contentType);
     }
 
@@ -112,7 +113,7 @@ public class JobDataConsumer {
                 .filter(t -> !t.isEmpty()); //
 
         if (job.isBuffered()) {
-            result = result.map(this::quote) //
+            result = result.map(str -> quoteNonJson(str, job)) //
                     .bufferTimeout( //
                             job.getParameters().getBufferTimeout().getMaxSize(), //
                             job.getParameters().getBufferTimeout().getMaxTime()) //
@@ -121,6 +122,10 @@ public class JobDataConsumer {
         return result;
     }
 
+    private String quoteNonJson(String str, Job job) {
+        return job.getType().isJson() ? str : quote(str);
+    }
+
     private String quote(String str) {
         final String q = "\"";
         return q + str.replace(q, "\\\"") + q;
index ea47373..6c6ceda 100644 (file)
@@ -33,6 +33,7 @@ import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Map;
 
 import org.json.JSONObject;
@@ -52,6 +53,7 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.filters.PmReport;
 import org.oran.dmaapadapter.repository.filters.PmReportFilter;
 import org.oran.dmaapadapter.tasks.JobDataConsumer;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
@@ -297,6 +299,7 @@ class ApplicationTest {
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
         assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
 
         // Test send an exception
         kafkaConsumer.start(Flux.error(new NullPointerException()));
@@ -308,7 +311,7 @@ class ApplicationTest {
     }
 
     @Test
-    void testReceiveAndPostDataFromDmaap() throws Exception {
+    void testReceiveAndPostDataFromDmaapBuffering() throws Exception {
         final String JOB_ID = "testReceiveAndPostDataFromDmaap";
 
         // Register producer, Register types
@@ -322,12 +325,11 @@ class ApplicationTest {
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
         // the job (consumer)
-        DmaapSimulatorController.addResponse("DmaapResponse1");
-        DmaapSimulatorController.addResponse("DmaapResponse2");
+        DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
-        assertThat(consumer.receivedBodies.get(0)).contains("[\"DmaapResponse1");
-        assertThat(consumer.receivedBodies.get(0)).contains("DmaapResponse2\"]");
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
 
         String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
         String jobs = restClient().get(jobUrl).block();
@@ -338,6 +340,36 @@ class ApplicationTest {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
+    @Test
+    void testReceiveAndPostDataFromDmaapNonBuffering() throws Exception {
+        final String JOB_ID = "testReceiveAndPostDataFromDmaapNonBuffering";
+
+        // Register producer, Register types
+        waitForRegistration();
+
+        // Create a job
+        Job.Parameters param = new Job.Parameters(null, null, null, 1);
+        ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param)));
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        // Return two messages from DMAAP and verify that these are sent to the owner of
+        // the job (consumer)
+        DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]");
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
+        assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse2");
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
+
+        // Delete the job
+        this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
+
+    static class PmReportArray extends ArrayList<PmReport> {
+    };
+
     @Test
     void testPmFiltering() throws Exception {
         // Create a job
@@ -353,7 +385,8 @@ class ApplicationTest {
         filterData.getMeasObjInstIds().add("UtranCell=Gbg-997");
         filterData.getSourceNames().add("O-DU-1122");
         filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1");
-        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null);
+        Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
+                new Job.BufferTimeout(123, 456), null);
         String paramJson = gson.toJson(param);
         ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson));
 
@@ -368,8 +401,13 @@ class ApplicationTest {
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
         String receivedFiltered = consumer.receivedBodies.get(0);
         assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1");
+
+        PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class);
+        assertThat(reportsParsed).hasSize(1);
+
     }
 
     @Test
@@ -398,6 +436,7 @@ class ApplicationTest {
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
         String receivedFiltered = consumer.receivedBodies.get(0);
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
         assertThat(receivedFiltered).contains("event");
     }
 
@@ -422,12 +461,15 @@ class ApplicationTest {
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        DmaapSimulatorController.addResponse("Hello");
+        DmaapSimulatorController.addResponse("[\"Hello\"]");
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1));
         String received = consumer.receivedBodies.get(0);
         assertThat(received).isEqualTo("Hello");
+        // This is the only time it is verified that mime type is plaintext when isJson
+        // is false and buffering is not used
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
 
         // Check that the auth token was received by the consumer
         assertThat(consumer.receivedHeaders).hasSize(1);
@@ -458,6 +500,7 @@ class ApplicationTest {
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
         String receivedFiltered = consumer.receivedBodies.get(0);
+        assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
         assertThat(receivedFiltered).contains("event");
     }
 
index d2da047..a58625c 100644 (file)
@@ -59,7 +59,7 @@ public class DmaapSimulatorController {
     }
 
     public static void addResponse(String response) {
-        dmaapResponses.add("[" + quote(response) + "]");
+        dmaapResponses.add(response);
     }
 
     private static String quote(String str) {
index 6eca59d..603cea7 100644 (file)
@@ -272,13 +272,13 @@ class IntegrationWithIcs {
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        DmaapSimulatorController.addResponse("DmaapResponse1");
-        DmaapSimulatorController.addResponse("DmaapResponse2");
-        DmaapSimulatorController.addResponse("Junk");
+        DmaapSimulatorController.addResponse("[\"DmaapResponse1\"]");
+        DmaapSimulatorController.addResponse("[\"DmaapResponse2\"]");
 
         ConsumerController.TestResults results = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(2));
         assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
+        assertThat(results.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8");
 
         deleteInformationJobInIcs(DMAAP_JOB_ID);
 
index 581e164..df1ccd5 100644 (file)
@@ -3,7 +3,8 @@
       {
          "id": "DmaapInformationType",
          "dmaapTopicUrl": "/dmaap-topic-1",
-         "useHttpProxy": false
+         "useHttpProxy": false,
+         "isJson": false
       },
       {
          "id": "KafkaInformationType",
          "id": "PmInformationType",
          "dmaapTopicUrl": "/dmaap-topic-2",
          "useHttpProxy": false,
-         "dataType": "PmData"
+         "dataType": "PmData",
+         "isJson": true
       },
       {
          "id": "PmInformationTypeKafka",
          "kafkaInputTopic": "TutorialTopic",
          "useHttpProxy": false,
-         "dataType": "PmData"
+         "dataType": "PmData",
+         "isJson": true
       }
    ]
-}
+}
\ No newline at end of file