From: PatrikBuhr Date: Tue, 3 May 2022 15:28:31 +0000 (+0200) Subject: PM Filter X-Git-Tag: 1.1.0~7^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F42%2F8142%2F1;p=nonrtric%2Fplt%2Fdmaapadapter.git PM Filter Fix that DMAAP returns an array of strings Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-743 Change-Id: I3b28a27ecfd38423fa1a89cadd727c24441104c2 --- diff --git a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index 94f9f8d..32ecd73 100644 --- a/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -85,7 +85,7 @@ public class ProducerCallbacksController { @RequestBody String body) { try { ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class); - logger.debug("Job started callback {}", request.id); + logger.debug("Job started callback id: {}, body: {}", request.id, body); this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner, request.lastUpdated, toJobParameters(request.jobData)); return new ResponseEntity<>(HttpStatus.OK); diff --git a/src/main/java/org/oran/dmaapadapter/repository/Job.java b/src/main/java/org/oran/dmaapadapter/repository/Job.java index 9dd4987..989643c 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -182,6 +182,7 @@ public class Job { public String filter(String data) { if (filter == null) { + logger.debug("No filter used"); return data; } return filter.filter(data); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index dfa6d07..0631fa1 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -21,6 +21,8 @@ package org.oran.dmaapadapter.tasks; import java.time.Duration; +import java.util.Collection; +import java.util.LinkedList; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; @@ -47,6 +49,7 @@ public class DmaapTopicConsumer { protected final ApplicationConfig applicationConfig; protected final InfoType type; protected final Jobs jobs; + private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); @@ -81,30 +84,37 @@ public class DmaapTopicConsumer { .flatMap(notUsed -> Mono.empty()); } - private Mono getFromMessageRouter(String topicUrl) { + private Flux getFromMessageRouter(String topicUrl) { logger.trace("getFromMessageRouter {}", topicUrl); return dmaapRestClient.get(topicUrl) // .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. + .flatMapMany(body -> toMessages(body)) // .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) // .onErrorResume(this::handleDmaapErrorResponse); // } + private Flux toMessages(String body) { + Collection messages = gson.fromJson(body, LinkedList.class); + return Flux.fromIterable(messages); + } + private Mono handleConsumerErrorResponse(Throwable t) { logger.warn("error from CONSUMER {}", t.getMessage()); return Mono.empty(); } - protected Flux pushDataToConsumers(String body) { - logger.debug("Received data {}", body); + protected Flux pushDataToConsumers(String input) { + logger.debug("Received data {}", input); final int CONCURRENCY = 50; // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // - .map(job -> Tuples.of(job, job.filter(body))) // + .map(job -> Tuples.of(job, job.filter(input))) // .filter(t -> !t.getT2().isEmpty()) // .doOnNext(touple -> logger.debug("Sending to consumer {}", touple.getT1().getCallbackUrl())) // .flatMap(touple -> touple.getT1().getConsumerRestClient().post("", touple.getT2(), MediaType.APPLICATION_JSON), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); } + } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index e4675a5..5c8ed5e 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -77,7 +77,7 @@ public class KafkaJobDataConsumer { public synchronized void start(Flux input) { stop(); this.errorStats.resetKafkaErrors(); - this.subscription = getMessagesFromKafka(input, job) // + this.subscription = handleMessagesFromKafka(input, job) // .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleConsumerSentOk, // @@ -107,7 +107,7 @@ public class KafkaJobDataConsumer { return this.subscription != null; } - private Flux getMessagesFromKafka(Flux input, Job job) { + private Flux handleMessagesFromKafka(Flux input, Job job) { Flux result = input.map(job::filter) // .filter(t -> !t.isEmpty()); // diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index fc023cf..9b6ea58 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -317,8 +317,8 @@ class ApplicationTest { // Return two messages from DMAAP and verify that these are sent to the owner of // the job (consumer) - DmaapSimulatorController.dmaapResponses.add("DmaapResponse1"); - DmaapSimulatorController.dmaapResponses.add("DmaapResponse2"); + DmaapSimulatorController.addResponse("DmaapResponse1"); + DmaapSimulatorController.addResponse("DmaapResponse2"); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(2)); assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); @@ -358,7 +358,7 @@ class ApplicationTest { // filtered PM message String path = "./src/test/resources/pm_report.json"; String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - DmaapSimulatorController.dmaapPmResponses.add(pmReportJson); + DmaapSimulatorController.addPmResponse(pmReportJson); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); @@ -388,7 +388,7 @@ class ApplicationTest { // filtered PM message String path = "./src/test/resources/pm_report.json"; String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - DmaapSimulatorController.dmaapPmResponses.add(pmReportJson); + DmaapSimulatorController.addPmResponse(pmReportJson); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); @@ -403,7 +403,7 @@ class ApplicationTest { // Register producer, Register types waitForRegistration(); - // Create a job with a JsonPath + // Create a job with atestJsonPathFiltering JsonPath ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath()); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); @@ -413,7 +413,7 @@ class ApplicationTest { // filtered PM message String path = "./src/test/resources/pm_report.json"; String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); - DmaapSimulatorController.dmaapPmResponses.add(pmReportJson); + DmaapSimulatorController.addPmResponse(pmReportJson); ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); diff --git a/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java b/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java index db0bd22..7d0c313 100644 --- a/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java +++ b/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java @@ -51,9 +51,18 @@ public class DmaapSimulatorController { public static final String DMAAP_TOPIC_URL = "/dmaap-topic-1"; public static final String DMAAP_TOPIC_PM_URL = "/dmaap-topic-2"; - public static List dmaapResponses = Collections.synchronizedList(new LinkedList()); + private static List dmaapResponses = Collections.synchronizedList(new LinkedList()); - public static List dmaapPmResponses = Collections.synchronizedList(new LinkedList()); + private static List dmaapPmResponses = Collections.synchronizedList(new LinkedList()); + + public static void addPmResponse(String response) { + response = response.replace("\"", "\\\""); + dmaapPmResponses.add("[\"" + response + "\"]"); + } + + public static void addResponse(String response) { + dmaapResponses.add("[\"" + response + "\"]"); + } @GetMapping(path = DMAAP_TOPIC_URL, produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "GET from topic", @@ -87,7 +96,6 @@ public class DmaapSimulatorController { String resp = dmaapPmResponses.remove(0); return new ResponseEntity<>(resp, HttpStatus.OK); } - } } diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 0abaf59..54a4940 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -27,6 +27,10 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonParser; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -41,6 +45,8 @@ import org.oran.dmaapadapter.r1.ConsumerJobInfo; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -65,6 +71,7 @@ class IntegrationWithIcs { private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID"; private static final String DMAAP_TYPE_ID = "DmaapInformationType"; + private static final Logger logger = LoggerFactory.getLogger(Application.class); @Autowired private ApplicationConfig applicationConfig; @@ -166,17 +173,21 @@ class IntegrationWithIcs { } private void createInformationJobInIcs(String typeId, String jobId, String filter) { - String body = gson.toJson(consumerJobInfo(typeId, filter)); - try { - // Delete the job if it already exists - deleteInformationJobInIcs(jobId); - } catch (Exception e) { - } + createInformationJobInIcs(jobId, consumerJobInfo(typeId, filter)); + } + + private void createInformationJobInIcs(String jobId, ConsumerJobInfo jobInfo) { + String body = gson.toJson(jobInfo); restClient().putForEntity(jobUrl(jobId), body).block(); + logger.info("Created job {}, {}", jobId, body); } private void deleteInformationJobInIcs(String jobId) { - restClient().delete(jobUrl(jobId)).block(); + try { + restClient().delete(jobUrl(jobId)).block(); + } catch (Exception e) { + logger.warn("Couldnot delete job: {} reason: {}", jobId, e.getMessage()); + } } private ConsumerJobInfo consumerJobInfo(String typeId, String filter) { @@ -195,6 +206,10 @@ class IntegrationWithIcs { return "\"" + str + "\""; } + private String reQuote(String str) { + return str.replaceAll("'", "\\\""); + } + private String consumerUri() { return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL; } @@ -205,6 +220,7 @@ class IntegrationWithIcs { String jsonStr = "{ \"filter\" :" + quote(filter) + "}"; return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), ""); } catch (Exception e) { + logger.error("Error {}", e.getMessage()); return null; } } @@ -243,7 +259,6 @@ class IntegrationWithIcs { ApplicationTest.testErrorCode(restClient().put(jobUrl("KAFKA_JOB_ID"), body), HttpStatus.BAD_REQUEST, "Json validation failure"); - } @Test @@ -254,9 +269,9 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - DmaapSimulatorController.dmaapResponses.add("DmaapResponse1"); - DmaapSimulatorController.dmaapResponses.add("DmaapResponse2"); - DmaapSimulatorController.dmaapResponses.add("Junk"); + DmaapSimulatorController.addResponse("DmaapResponse1"); + DmaapSimulatorController.addResponse("DmaapResponse2"); + DmaapSimulatorController.addResponse("Junk"); ConsumerController.TestResults results = this.consumerController.testResults; await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(2)); @@ -267,4 +282,32 @@ class IntegrationWithIcs { await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } + @Test + void testPmFilter() throws Exception { + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); + final String TYPE_ID = "PmInformationType"; + + String jsonStr = + reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }"); + + ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(jsonStr), "owner", consumerUri(), ""); + + createInformationJobInIcs(DMAAP_JOB_ID, jobInfo); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + String path = "./src/test/resources/pm_report.json"; + String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); + DmaapSimulatorController.addPmResponse(pmReportJson); + + ConsumerController.TestResults results = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(results.receivedBodies).hasSize(1)); + + String filtered = results.receivedBodies.get(0); + assertThat(filtered).contains("succImmediateAssignProcs").doesNotContain("attTCHSeizures"); + + logger.info(filtered); + + deleteInformationJobInIcs(DMAAP_JOB_ID); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } }