From: PatrikBuhr Date: Tue, 12 Jul 2022 11:40:36 +0000 (+0200) Subject: NONRTRIC - dmaap adapter characteristic improvement X-Git-Tag: 1.2.0~30 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F69%2F8769%2F2;p=nonrtric%2Fplt%2Fdmaapadapter.git NONRTRIC - dmaap adapter characteristic improvement Minor changes. Added a testcase. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: Ib45f40763f3124c5e8c32d66e23a7b4a1252e428 --- diff --git a/pom.xml b/pom.xml index 1bfe947..8dc6998 100644 --- a/pom.xml +++ b/pom.xml @@ -61,12 +61,6 @@ true - - org.springdoc - springdoc-openapi-ui - 1.6.3 - test - org.springframework.boot spring-boot-starter-web @@ -137,6 +131,12 @@ + + org.springdoc + springdoc-openapi-ui + 1.6.3 + test + com.github.erosb everit-json-schema diff --git a/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java index b6d65ad..39c7a06 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java +++ b/src/main/java/org/oran/dmaapadapter/repository/filters/PmReportFilter.java @@ -20,10 +20,10 @@ package org.oran.dmaapadapter.repository.filters; - import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import lombok.Getter; @@ -37,10 +37,10 @@ public class PmReportFilter implements Filter { @Getter public static class FilterData { - Collection sourceNames = new ArrayList<>(); - Collection measObjInstIds = new ArrayList<>(); - Collection measTypes = new ArrayList<>(); - Collection measuredEntityDns = new ArrayList<>(); + final Collection sourceNames = new HashSet<>(); + final Collection measObjInstIds = new ArrayList<>(); + final Collection measTypes = new HashSet<>(); + final Collection measuredEntityDns = new ArrayList<>(); } private static class MeasTypesIndexed extends PmReport.MeasTypes { @@ -119,7 +119,7 @@ public class PmReportFilter implements Filter { PmReport.MeasValuesList newMeasValuesList = oldMeasValues.shallowClone(); - if (isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds) || filter.measObjInstIds.isEmpty()) { + if (filter.measObjInstIds.isEmpty() || isContainedInAny(oldMeasValues.measObjInstId, filter.measObjInstIds)) { newMeasValuesList.measResults = createMeasResults(oldMeasValues.measResults, measTypes, filter); } return newMeasValuesList; diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index a345670..0fa7a8e 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -26,6 +26,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.gson.JsonParser; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.time.Instant; import java.util.HashMap; @@ -50,6 +53,7 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.repository.filters.PmReportFilter; import org.oran.dmaapadapter.tasks.KafkaTopicListener; import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; @@ -80,6 +84,7 @@ import reactor.kafka.sender.SenderRecord; class IntegrationWithKafka { final String TYPE_ID = "KafkaInformationType"; + final String PM_TYPE_ID = "PmInformationTypeKafka"; @Autowired private ApplicationConfig applicationConfig; @@ -152,11 +157,14 @@ class IntegrationWithKafka { } private static class KafkaReceiver { - public final static String OUTPUT_TOPIC = "outputTopic"; - private TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", ""); + public final String OUTPUT_TOPIC; + private TopicListener.Output receivedKafkaOutput; private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); - public KafkaReceiver(ApplicationConfig applicationConfig) { + int count = 0; + + public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic) { + this.OUTPUT_TOPIC = outputTopic; // Create a listener to the output topic. The KafkaTopicListener happens to be // suitable for that, @@ -171,6 +179,7 @@ class IntegrationWithKafka { private void set(TopicListener.Output receivedKafkaOutput) { this.receivedKafkaOutput = receivedKafkaOutput; + this.count++; logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput); } @@ -181,15 +190,24 @@ class IntegrationWithKafka { synchronized String lastValue() { return this.receivedKafkaOutput.value; } + + void reset() { + count = 0; + this.receivedKafkaOutput = new TopicListener.Output("", ""); + } } private static KafkaReceiver kafkaReceiver; + private static KafkaReceiver kafkaReceiver2; @BeforeEach void init() { if (kafkaReceiver == null) { - kafkaReceiver = new KafkaReceiver(this.applicationConfig); + kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic"); + kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2"); } + kafkaReceiver.reset(); + kafkaReceiver2.reset(); } @AfterEach @@ -259,6 +277,18 @@ class IntegrationWithKafka { } } + ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) { + try { + Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic); + String str = gson.toJson(param); + Object parametersObj = jsonObject(str); + + return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", null, ""); + } catch (Exception e) { + return null; + } + } + ConsumerJobInfo consumerJobInfoKafka(String topic) { try { Job.Parameters param = new Job.Parameters(null, null, null, 1, topic); @@ -283,12 +313,8 @@ class IntegrationWithKafka { return SenderOptions.create(props); } - private SenderRecord senderRecord(String data) { - return senderRecord(data, ""); - } - - private SenderRecord senderRecord(String data, String key) { - final InfoType infoType = this.types.get(TYPE_ID); + private SenderRecord senderRecord(String data, String key, String typeId) { + final InfoType infoType = this.types.get(typeId); int correlationMetadata = 2; return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata); } @@ -338,7 +364,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); waitForKafkaListener(); - var dataToSend = Flux.just(senderRecord("Message", "")); + var dataToSend = Flux.just(senderRecord("Message", "", TYPE_ID)); sendDataToStream(dataToSend); verifiedReceivedByConsumer("Message"); @@ -360,7 +386,9 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); waitForKafkaListener(); - var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. + var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1, + // Message_2 + // etc. sendDataToStream(dataToSend); verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); @@ -374,13 +402,13 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient()); + this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); waitForKafkaListener(); String sendString = "testData " + Instant.now(); String sendKey = "key " + Instant.now(); - var dataToSend = Flux.just(senderRecord(sendString, sendKey)); + var dataToSend = Flux.just(senderRecord(sendString, sendKey, TYPE_ID)); sendDataToStream(dataToSend); await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString)); @@ -396,7 +424,7 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient()); + this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); waitForKafkaListener(); @@ -404,7 +432,8 @@ class IntegrationWithKafka { Instant startTime = Instant.now(); - var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i)); // Message_1, etc. + var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1, + // etc. sendDataToStream(dataToSend); while (!kafkaReceiver.lastValue().equals("Message_" + NO_OF_OBJECTS)) { @@ -416,6 +445,52 @@ class IntegrationWithKafka { logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); } + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. + @Test + void kafkaCharacteristics_pmFilter() throws Exception { + // Filter PM reports and senttotowjobs over Kafka + + final String JOB_ID = "kafkaCharacteristics"; + final String JOB_ID2 = "kafkaCharacteristics2"; + + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); + filterData.getMeasTypes().add("succImmediateAssignProcs"); + filterData.getMeasObjInstIds().add("UtranCell"); + + this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, + restClient()); + this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2, + restClient()); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + waitForKafkaListener(); + + final int NO_OF_OBJECTS = 100000; + + Instant startTime = Instant.now(); + + String path = "./src/test/resources/pm_report.json"; + String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset()); + + var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(pmReportJson, "", PM_TYPE_ID)); + sendDataToStream(dataToSend); + + while (kafkaReceiver.count != NO_OF_OBJECTS) { + logger.info("sleeping {}", kafkaReceiver.count); + Thread.sleep(1000 * 1); + } + + // System.out.println(kafkaReceiver.receivedKafkaOutput.value); + + final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); + logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); + logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); + } + @Test void kafkaDeleteJobShouldNotStopListener() throws Exception { final String JOB_ID1 = "ID1"; @@ -432,7 +507,9 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); - var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. + var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1, + // Message_2 + // etc. sendDataToStream(dataToSend); // this should not overflow // Delete jobs, recreate one @@ -442,7 +519,7 @@ class IntegrationWithKafka { this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - dataToSend = Flux.just(senderRecord("Howdy")); + dataToSend = Flux.just(senderRecord("Howdy", "", TYPE_ID)); sendDataToStream(dataToSend); verifiedReceivedByConsumerLast("Howdy"); diff --git a/src/test/resources/test_application_configuration.json b/src/test/resources/test_application_configuration.json index df1ccd5..f64ab12 100644 --- a/src/test/resources/test_application_configuration.json +++ b/src/test/resources/test_application_configuration.json @@ -8,7 +8,7 @@ }, { "id": "KafkaInformationType", - "kafkaInputTopic": "TutorialTopic", + "kafkaInputTopic": "KafkaInput", "useHttpProxy": false }, { @@ -20,10 +20,10 @@ }, { "id": "PmInformationTypeKafka", - "kafkaInputTopic": "TutorialTopic", + "kafkaInputTopic": "KafkaPmInput", "useHttpProxy": false, "dataType": "PmData", "isJson": true } ] -} \ No newline at end of file +}