NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / test / java / org / oran / dmaapadapter / IntegrationWithKafka.java
index a0db58a..470e114 100644 (file)
@@ -262,20 +262,24 @@ class IntegrationWithKafka {
         var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend);
 
-        verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]");
+        verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+
+        // Just for testing quoting
+        this.consumerController.testResults.reset();
+        dataToSend = Flux.just(senderRecord("Message\"_", 1));
+        sendDataToStream(dataToSend);
+        verifiedReceivedByConsumer("[\"Message\\\"_1\"]");
 
         // Delete the jobs
         this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
         this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty());
     }
 
     @Test
     void kafkaIOverflow() throws InterruptedException {
-        // This does not work. After an overflow, the kafka stream does not seem to work
-        //
         final String JOB_ID1 = "ID1";
         final String JOB_ID2 = "ID2";
 
@@ -290,18 +294,20 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
         var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
-        sendDataToStream(dataToSend); // this will overflow
+        sendDataToStream(dataToSend); // this should overflow
 
-        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next();
         await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
         this.consumerController.testResults.reset();
 
         kafkaTopicConsumers.restartNonRunningTasks();
+        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+        Thread.sleep(1000); // Restarting the input seems to take some asynch time
 
-        dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+        dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i));
         sendDataToStream(dataToSend);
 
-        verifiedReceivedByConsumer("Message__1", "Message__1");
+        verifiedReceivedByConsumer("Howdy_1");
     }
 
 }