PM Filter 42/8142/1
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 3 May 2022 15:28:31 +0000 (17:28 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 3 May 2022 15:28:31 +0000 (17:28 +0200)
Fix that DMAAP returns an array of strings

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743
Change-Id: I3b28a27ecfd38423fa1a89cadd727c24441104c2

src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
src/main/java/org/oran/dmaapadapter/repository/Job.java
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java

index 94f9f8d..32ecd73 100644 (file)
@@ -85,7 +85,7 @@ public class ProducerCallbacksController {
             @RequestBody String body) {
         try {
             ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class);
-            logger.debug("Job started callback {}", request.id);
+            logger.debug("Job started callback id: {}, body: {}", request.id, body);
             this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner,
                     request.lastUpdated, toJobParameters(request.jobData));
             return new ResponseEntity<>(HttpStatus.OK);
index 9dd4987..989643c 100644 (file)
@@ -182,6 +182,7 @@ public class Job {
 
     public String filter(String data) {
         if (filter == null) {
+            logger.debug("No filter used");
             return data;
         }
         return filter.filter(data);
index dfa6d07..0631fa1 100644 (file)
@@ -21,6 +21,8 @@
 package org.oran.dmaapadapter.tasks;
 
 import java.time.Duration;
+import java.util.Collection;
+import java.util.LinkedList;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
@@ -47,6 +49,7 @@ public class DmaapTopicConsumer {
     protected final ApplicationConfig applicationConfig;
     protected final InfoType type;
     protected final Jobs jobs;
+    private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
     public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
@@ -81,30 +84,37 @@ public class DmaapTopicConsumer {
                 .flatMap(notUsed -> Mono.empty());
     }
 
-    private Mono<String> getFromMessageRouter(String topicUrl) {
+    private Flux<String> getFromMessageRouter(String topicUrl) {
         logger.trace("getFromMessageRouter {}", topicUrl);
         return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
+                .flatMapMany(body -> toMessages(body)) //
                 .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
                 .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
+    private Flux<String> toMessages(String body) {
+        Collection<String> messages = gson.fromJson(body, LinkedList.class);
+        return Flux.fromIterable(messages);
+    }
+
     private Mono<String> handleConsumerErrorResponse(Throwable t) {
         logger.warn("error from CONSUMER {}", t.getMessage());
         return Mono.empty();
     }
 
-    protected Flux<String> pushDataToConsumers(String body) {
-        logger.debug("Received data {}", body);
+    protected Flux<String> pushDataToConsumers(String input) {
+        logger.debug("Received data {}", input);
         final int CONCURRENCY = 50;
 
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
-                .map(job -> Tuples.of(job, job.filter(body))) //
+                .map(job -> Tuples.of(job, job.filter(input))) //
                 .filter(t -> !t.getT2().isEmpty()) //
                 .doOnNext(touple -> logger.debug("Sending to consumer {}", touple.getT1().getCallbackUrl())) //
                 .flatMap(touple -> touple.getT1().getConsumerRestClient().post("", touple.getT2(),
                         MediaType.APPLICATION_JSON), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }
+
 }
index e4675a5..5c8ed5e 100644 (file)
@@ -77,7 +77,7 @@ public class KafkaJobDataConsumer {
     public synchronized void start(Flux<String> input) {
         stop();
         this.errorStats.resetKafkaErrors();
-        this.subscription = getMessagesFromKafka(input, job) //
+        this.subscription = handleMessagesFromKafka(input, job) //
                 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
@@ -107,7 +107,7 @@ public class KafkaJobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+    private Flux<String> handleMessagesFromKafka(Flux<String> input, Job job) {
         Flux<String> result = input.map(job::filter) //
                 .filter(t -> !t.isEmpty()); //
 
index fc023cf..9b6ea58 100644 (file)
@@ -317,8 +317,8 @@ class ApplicationTest {
 
         // Return two messages from DMAAP and verify that these are sent to the owner of
         // the job (consumer)
-        DmaapSimulatorController.dmaapResponses.add("DmaapResponse1");
-        DmaapSimulatorController.dmaapResponses.add("DmaapResponse2");
+        DmaapSimulatorController.addResponse("DmaapResponse1");
+        DmaapSimulatorController.addResponse("DmaapResponse2");
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2));
         assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
@@ -358,7 +358,7 @@ class ApplicationTest {
         // filtered PM message
         String path = "./src/test/resources/pm_report.json";
         String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
-        DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+        DmaapSimulatorController.addPmResponse(pmReportJson);
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
@@ -388,7 +388,7 @@ class ApplicationTest {
         // filtered PM message
         String path = "./src/test/resources/pm_report.json";
         String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
-        DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+        DmaapSimulatorController.addPmResponse(pmReportJson);
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
@@ -403,7 +403,7 @@ class ApplicationTest {
         // Register producer, Register types
         waitForRegistration();
 
-        // Create a job with a JsonPath
+        // Create a job with atestJsonPathFiltering JsonPath
         ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath());
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
@@ -413,7 +413,7 @@ class ApplicationTest {
         // filtered PM message
         String path = "./src/test/resources/pm_report.json";
         String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
-        DmaapSimulatorController.dmaapPmResponses.add(pmReportJson);
+        DmaapSimulatorController.addPmResponse(pmReportJson);
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
index db0bd22..7d0c313 100644 (file)
@@ -51,9 +51,18 @@ public class DmaapSimulatorController {
     public static final String DMAAP_TOPIC_URL = "/dmaap-topic-1";
     public static final String DMAAP_TOPIC_PM_URL = "/dmaap-topic-2";
 
-    public static List<String> dmaapResponses = Collections.synchronizedList(new LinkedList<String>());
+    private static List<String> dmaapResponses = Collections.synchronizedList(new LinkedList<String>());
 
-    public static List<String> dmaapPmResponses = Collections.synchronizedList(new LinkedList<String>());
+    private static List<String> dmaapPmResponses = Collections.synchronizedList(new LinkedList<String>());
+
+    public static void addPmResponse(String response) {
+        response = response.replace("\"", "\\\"");
+        dmaapPmResponses.add("[\"" + response + "\"]");
+    }
+
+    public static void addResponse(String response) {
+        dmaapResponses.add("[\"" + response + "\"]");
+    }
 
     @GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE)
     @Operation(summary = "GET from topic",
@@ -87,7 +96,6 @@ public class DmaapSimulatorController {
             String resp = dmaapPmResponses.remove(0);
             return new ResponseEntity<>(resp, HttpStatus.OK);
         }
-
     }
 
 }
index 0abaf59..54a4940 100644 (file)
@@ -27,6 +27,10 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonParser;
 
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -41,6 +45,8 @@ import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -65,6 +71,7 @@ class IntegrationWithIcs {
 
     private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID";
     private static final String DMAAP_TYPE_ID = "DmaapInformationType";
+    private static final Logger logger = LoggerFactory.getLogger(Application.class);
 
     @Autowired
     private ApplicationConfig applicationConfig;
@@ -166,17 +173,21 @@ class IntegrationWithIcs {
     }
 
     private void createInformationJobInIcs(String typeId, String jobId, String filter) {
-        String body = gson.toJson(consumerJobInfo(typeId, filter));
-        try {
-            // Delete the job if it already exists
-            deleteInformationJobInIcs(jobId);
-        } catch (Exception e) {
-        }
+        createInformationJobInIcs(jobId, consumerJobInfo(typeId, filter));
+    }
+
+    private void createInformationJobInIcs(String jobId, ConsumerJobInfo jobInfo) {
+        String body = gson.toJson(jobInfo);
         restClient().putForEntity(jobUrl(jobId), body).block();
+        logger.info("Created job {}, {}", jobId, body);
     }
 
     private void deleteInformationJobInIcs(String jobId) {
-        restClient().delete(jobUrl(jobId)).block();
+        try {
+            restClient().delete(jobUrl(jobId)).block();
+        } catch (Exception e) {
+            logger.warn("Couldnot delete job: {}  reason: {}", jobId, e.getMessage());
+        }
     }
 
     private ConsumerJobInfo consumerJobInfo(String typeId, String filter) {
@@ -195,6 +206,10 @@ class IntegrationWithIcs {
         return "\"" + str + "\"";
     }
 
+    private String reQuote(String str) {
+        return str.replaceAll("'", "\\\"");
+    }
+
     private String consumerUri() {
         return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
     }
@@ -205,6 +220,7 @@ class IntegrationWithIcs {
             String jsonStr = "{ \"filter\" :" + quote(filter) + "}";
             return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), "");
         } catch (Exception e) {
+            logger.error("Error {}", e.getMessage());
             return null;
         }
     }
@@ -243,7 +259,6 @@ class IntegrationWithIcs {
 
         ApplicationTest.testErrorCode(restClient().put(jobUrl("KAFKA_JOB_ID"), body), HttpStatus.BAD_REQUEST,
                 "Json validation failure");
-
     }
 
     @Test
@@ -254,9 +269,9 @@ class IntegrationWithIcs {
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
-        DmaapSimulatorController.dmaapResponses.add("DmaapResponse1");
-        DmaapSimulatorController.dmaapResponses.add("DmaapResponse2");
-        DmaapSimulatorController.dmaapResponses.add("Junk");
+        DmaapSimulatorController.addResponse("DmaapResponse1");
+        DmaapSimulatorController.addResponse("DmaapResponse2");
+        DmaapSimulatorController.addResponse("Junk");
 
         ConsumerController.TestResults results = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(2));
@@ -267,4 +282,32 @@ class IntegrationWithIcs {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
+    @Test
+    void testPmFilter() throws Exception {
+        await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
+        final String TYPE_ID = "PmInformationType";
+
+        String jsonStr =
+                reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }");
+
+        ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(jsonStr), "owner", consumerUri(), "");
+
+        createInformationJobInIcs(DMAAP_JOB_ID, jobInfo);
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        String path = "./src/test/resources/pm_report.json";
+        String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+        DmaapSimulatorController.addPmResponse(pmReportJson);
+
+        ConsumerController.TestResults results = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(1));
+
+        String filtered = results.receivedBodies.get(0);
+        assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("attTCHSeizures");
+
+        logger.info(filtered);
+
+        deleteInformationJobInIcs(DMAAP_JOB_ID);
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
 }