From: PatrikBuhr Date: Wed, 8 Jun 2022 08:12:00 +0000 (+0200) Subject: NONRTRIC - DMaap adapter added handling of json X-Git-Tag: 1.1.0~1 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F03%2F8503%2F2;p=nonrtric%2Fplt%2Fdmaapadapter.git NONRTRIC - DMaap adapter added handling of json Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-764 Change-Id: I688a843cd3d7cd07e7ee82fe8d3f4693959a1092 --- diff --git a/docs/conf.py b/docs/conf.py index 4cc4265..c9cfb91 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -4,6 +4,8 @@ from docs_conf.conf import * branch = 'latest' +language = 'en' + linkcheck_ignore = [ 'http://localhost.*', 'http://127.0.0.1.*', diff --git a/docs/overview.rst b/docs/overview.rst index 212e6b6..db78630 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -46,6 +46,13 @@ Each entry will be registered as a subscribe information type in ICS. The follow * dataType, this can be set to "pmData" which gives a possibility to perform a special filtering of PM data. +* isJson, this indicates that the received is Json objects (from Kafka a stream of objects and from DMaaP an array of quoted json objects). + Default value is false. + If the received data is Json objects, the data sent to the consumer does not need to be quoted. + When buffering is used the output will be an array of json objects '[{},{}]' as opposed to an array of strings '["string1", "string2"]'. + When buffering is not used, the output content-type will be 'application/json' as opposed to 'text/plain'. When buffering is used, the + output content-type will 'application/json' regardless of this parameter. + These parameters will be used to choose which parameter schemas that defines which parameters that can be used when creating an information job/data subscription. Below follows an example of a configuration file. @@ -57,7 +64,8 @@ Below follows an example of a configuration file. { "id": "DmaapInformationType", "dmaapTopicUrl": "/dmaap-topic-1", - "useHttpProxy": true + "useHttpProxy": true, + "isJson" : true }, { "id": "KafkaInformationType", @@ -66,7 +74,8 @@ Below follows an example of a configuration file. { "id": "PmInformationType", "dmaapTopicUrl": "/dmaap-topic-2", - "dataType": "PmData" + "dataType": "PmData", + "isJson" : true } ] } diff --git a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index c919931..d7f89be 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -42,12 +42,17 @@ public class InfoType { private final String dataType; - public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType) { + @Getter + private boolean isJson = false; + + public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic, String dataType, + boolean isJson) { this.id = id; this.dmaapTopicUrl = dmaapTopicUrl; this.useHttpProxy = useHttpProxy; this.kafkaInputTopic = kafkaInputTopic; this.dataType = dataType; + this.isJson = isJson; } public boolean isKafkaTopicDefined() { diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java index c13d36c..b361616 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReport.java @@ -20,11 +20,10 @@ package org.oran.dmaapadapter.repository.filters; - import java.util.ArrayList; import java.util.Collection; -class PmReport { +public class PmReport { Event event = new Event(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java index 99904df..3a13c76 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java @@ -92,7 +92,8 @@ public class JobDataConsumer { private Mono postToClient(String body) { logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body); - MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null; + MediaType contentType = + this.job.isBuffered() || this.job.getType().isJson() ? MediaType.APPLICATION_JSON : null; return job.getConsumerRestClient().post("", body, contentType); } @@ -112,7 +113,7 @@ public class JobDataConsumer { .filter(t -> !t.isEmpty()); // if (job.isBuffered()) { - result = result.map(this::quote) // + result = result.map(str -> quoteNonJson(str, job)) // .bufferTimeout( // job.getParameters().getBufferTimeout().getMaxSize(), // job.getParameters().getBufferTimeout().getMaxTime()) // @@ -121,6 +122,10 @@ public class JobDataConsumer { return result; } + private String quoteNonJson(String str, Job job) { + return job.getType().isJson() ? str : quote(str); + } + private String quote(String str) { final String q = "\""; return q + str.replace(q, "\\\"") + q; diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index ea47373..6c6ceda 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -33,6 +33,7 @@ import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Map; import org.json.JSONObject; @@ -52,6 +53,7 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.repository.filters.PmReport; import org.oran.dmaapadapter.repository.filters.PmReportFilter; import org.oran.dmaapadapter.tasks.JobDataConsumer; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; @@ -297,6 +299,7 @@ class ApplicationTest { ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]"); + assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); // Test send an exception kafkaConsumer.start(Flux.error(new NullPointerException())); @@ -308,7 +311,7 @@ class ApplicationTest { } @Test - void testReceiveAndPostDataFromDmaap() throws Exception { + void testReceiveAndPostDataFromDmaapBuffering() throws Exception { final String JOB_ID = "testReceiveAndPostDataFromDmaap"; // Register producer, Register types @@ -322,12 +325,11 @@ class ApplicationTest { // Return two messages from DMAAP and verify that these are sent to the owner of // the job (consumer) - DmaapSimulatorController.addResponse("DmaapResponse1"); - DmaapSimulatorController.addResponse("DmaapResponse2"); + DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]"); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); - assertThat(consumer.receivedBodies.get(0)).contains("[\"DmaapResponse1"); - assertThat(consumer.receivedBodies.get(0)).contains("DmaapResponse2\"]"); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"DmaapResponse1\", \"DmaapResponse2\"]"); + assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL; String jobs = restClient().get(jobUrl).block(); @@ -338,6 +340,36 @@ class ApplicationTest { await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } + @Test + void testReceiveAndPostDataFromDmaapNonBuffering() throws Exception { + final String JOB_ID = "testReceiveAndPostDataFromDmaapNonBuffering"; + + // Register producer, Register types + waitForRegistration(); + + // Create a job + Job.Parameters param = new Job.Parameters(null, null, null, 1); + ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, toJson(gson.toJson(param))); + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + // Return two messages from DMAAP and verify that these are sent to the owner of + // the job (consumer) + DmaapSimulatorController.addResponse("[\"DmaapResponse1\", \"DmaapResponse2\"]"); + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2)); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); + assertThat(consumer.receivedBodies.get(1)).isEqualTo("DmaapResponse2"); + assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); + + // Delete the job + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + + static class PmReportArray extends ArrayList { + }; + @Test void testPmFiltering() throws Exception { // Create a job @@ -353,7 +385,8 @@ class ApplicationTest { filterData.getMeasObjInstIds().add("UtranCell=Gbg-997"); filterData.getSourceNames().add("O-DU-1122"); filterData.getMeasuredEntityDns().add("ManagedElement=RNC-Gbg-1"); - Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null); + Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, + new Job.BufferTimeout(123, 456), null); String paramJson = gson.toJson(param); ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson)); @@ -368,8 +401,13 @@ class ApplicationTest { ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); + assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); String receivedFiltered = consumer.receivedBodies.get(0); assertThat(receivedFiltered).contains("succImmediateAssignProcs").doesNotContain("\"p\":2").contains("\"p\":1"); + + PmReportArray reportsParsed = gson.fromJson(receivedFiltered, PmReportArray.class); + assertThat(reportsParsed).hasSize(1); + } @Test @@ -398,6 +436,7 @@ class ApplicationTest { ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); String receivedFiltered = consumer.receivedBodies.get(0); + assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); assertThat(receivedFiltered).contains("event"); } @@ -422,12 +461,15 @@ class ApplicationTest { this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - DmaapSimulatorController.addResponse("Hello"); + DmaapSimulatorController.addResponse("[\"Hello\"]"); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1)); String received = consumer.receivedBodies.get(0); assertThat(received).isEqualTo("Hello"); + // This is the only time it is verified that mime type is plaintext when isJson + // is false and buffering is not used + assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); // Check that the auth token was received by the consumer assertThat(consumer.receivedHeaders).hasSize(1); @@ -458,6 +500,7 @@ class ApplicationTest { ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); String receivedFiltered = consumer.receivedBodies.get(0); + assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); assertThat(receivedFiltered).contains("event"); } diff --git a/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java b/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java index d2da047..a58625c 100644 --- a/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java +++ b/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java @@ -59,7 +59,7 @@ public class DmaapSimulatorController { } public static void addResponse(String response) { - dmaapResponses.add("[" + quote(response) + "]"); + dmaapResponses.add(response); } private static String quote(String str) { diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 6eca59d..603cea7 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -272,13 +272,13 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - DmaapSimulatorController.addResponse("DmaapResponse1"); - DmaapSimulatorController.addResponse("DmaapResponse2"); - DmaapSimulatorController.addResponse("Junk"); + DmaapSimulatorController.addResponse("[\"DmaapResponse1\"]"); + DmaapSimulatorController.addResponse("[\"DmaapResponse2\"]"); ConsumerController.TestResults results = this.consumerController.testResults; await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(2)); assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); + assertThat(results.receivedHeaders.get(0)).containsEntry("content-type", "text/plain;charset=UTF-8"); deleteInformationJobInIcs(DMAAP_JOB_ID); diff --git a/src/test/resources/test_application_configuration.json b/src/test/resources/test_application_configuration.json index 581e164..df1ccd5 100644 --- a/src/test/resources/test_application_configuration.json +++ b/src/test/resources/test_application_configuration.json @@ -3,7 +3,8 @@ { "id": "DmaapInformationType", "dmaapTopicUrl": "/dmaap-topic-1", - "useHttpProxy": false + "useHttpProxy": false, + "isJson": false }, { "id": "KafkaInformationType", @@ -14,13 +15,15 @@ "id": "PmInformationType", "dmaapTopicUrl": "/dmaap-topic-2", "useHttpProxy": false, - "dataType": "PmData" + "dataType": "PmData", + "isJson": true }, { "id": "PmInformationTypeKafka", "kafkaInputTopic": "TutorialTopic", "useHttpProxy": false, - "dataType": "PmData" + "dataType": "PmData", + "isJson": true } ] -} +} \ No newline at end of file