From 01a644d7bd4c2085e76e597069fc68e93ff1a0e9 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Mon, 17 Jan 2022 14:58:06 +0100 Subject: [PATCH] Reverting to springboot version 2.5.8 Version 2.6.2 did not work well with Kafka Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-703 Change-Id: Ib3d16f484be77792c87013eee565b79bfd3219cb --- dmaap-adaptor-java/pom.xml | 7 +++-- .../oran/dmaapadapter/IntegrationWithKafka.java | 30 ++++++++++++++++++++-- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/dmaap-adaptor-java/pom.xml b/dmaap-adaptor-java/pom.xml index 71f2bb82..b555912b 100644 --- a/dmaap-adaptor-java/pom.xml +++ b/dmaap-adaptor-java/pom.xml @@ -26,7 +26,7 @@ org.springframework.boot spring-boot-starter-parent - 2.6.2 + 2.5.8 org.o-ran-sc.nonrtric @@ -56,7 +56,6 @@ 1.24.3 3.0.11 0.30.0 - 2.1.1 3.7.0.1746 0.8.5 true @@ -180,7 +179,7 @@ io.projectreactor.kafka reactor-kafka - 1.3.7 + 1.3.9 com.google.guava @@ -362,4 +361,4 @@ JIRA https://jira.o-ran-sc.org/ - \ No newline at end of file + diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index c38af8a9..5a48d61f 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -215,7 +215,7 @@ class IntegrationWithKafka { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -236,6 +236,8 @@ class IntegrationWithKafka { .doOnError(e -> logger.error("Send failed", e)) // .blockLast(); + sender.close(); + } private void verifiedReceivedByConsumer(String... strings) { @@ -246,6 +248,29 @@ class IntegrationWithKafka { } } + @Test + void simpleCase() throws InterruptedException { + final String JOB_ID = "ID"; + + // Register producer, Register types + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + Thread.sleep(4000); + var dataToSend = Flux.just(senderRecord("Message")); + sendDataToStream(dataToSend); + + verifiedReceivedByConsumer("Message"); + + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty()); + } + @Test void kafkaIntegrationTest() throws Exception { final String JOB_ID1 = "ID1"; @@ -256,12 +281,13 @@ class IntegrationWithKafka { assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. One buffering and one with a filter - this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1, restClient()); this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + Thread.sleep(2000); var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. sendDataToStream(dataToSend); -- 2.16.6