<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.2</version>
+ <version>2.5.8</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric</groupId>
<spotless-maven-plugin.version>1.24.3</spotless-maven-plugin.version>
<swagger-codegen-maven-plugin.version>3.0.11</swagger-codegen-maven-plugin.version>
<docker-maven-plugin>0.30.0</docker-maven-plugin>
- <javax.ws.rs-api.version>2.1.1</javax.ws.rs-api.version>
<sonar-maven-plugin.version>3.7.0.1746</sonar-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
<exec.skip>true</exec.skip>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
- <version>1.3.7</version>
+ <version>1.3.9</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
\ No newline at end of file
+</project>
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
.doOnError(e -> logger.error("Send failed", e)) //
.blockLast();
+ sender.close();
+
}
private void verifiedReceivedByConsumer(String... strings) {
}
}
+ @Test
+ void simpleCase() throws InterruptedException {
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ Thread.sleep(4000);
+ var dataToSend = Flux.just(senderRecord("Message"));
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message");
+
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+ }
+
@Test
void kafkaIntegrationTest() throws Exception {
final String JOB_ID1 = "ID1";
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
restClient());
this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+ Thread.sleep(2000);
var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend);