X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Ftest%2Fjava%2Forg%2Foran%2Fdmaapadapter%2FIntegrationWithKafka.java;h=470e114ebda7c154cd998f9c229918fd16a53072;hb=ce1d9f2d3e1d2713289dc4d2b5c246f99ec65160;hp=a0db58a07725aec88fef7fb8f191ed2f1d3f3391;hpb=b3896f4ad7912be9e12c05e7d4770fa39752d797;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index a0db58a0..470e114e 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -262,20 +262,24 @@ class IntegrationWithKafka { var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. sendDataToStream(dataToSend); - verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]"); + verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); + + // Just for testing quoting + this.consumerController.testResults.reset(); + dataToSend = Flux.just(senderRecord("Message\"_", 1)); + sendDataToStream(dataToSend); + verifiedReceivedByConsumer("[\"Message\\\"_1\"]"); // Delete the jobs this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty()); + await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty()); } @Test void kafkaIOverflow() throws InterruptedException { - // This does not work. After an overflow, the kafka stream does not seem to work - // final String JOB_ID1 = "ID1"; final String JOB_ID2 = "ID2"; @@ -290,18 +294,20 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. - sendDataToStream(dataToSend); // this will overflow + sendDataToStream(dataToSend); // this should overflow - KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next(); + KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next(); await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); this.consumerController.testResults.reset(); kafkaTopicConsumers.restartNonRunningTasks(); + this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job + Thread.sleep(1000); // Restarting the input seems to take some asynch time - dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1 + dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i)); sendDataToStream(dataToSend); - verifiedReceivedByConsumer("Message__1", "Message__1"); + verifiedReceivedByConsumer("Howdy_1"); } }