import com.google.gson.JsonParser;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.filters.PmReportFilter;
import org.oran.dmaapadapter.tasks.KafkaTopicListener;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
class IntegrationWithKafka {
final String TYPE_ID = "KafkaInformationType";
+ final String PM_TYPE_ID = "PmInformationTypeKafka";
@Autowired
private ApplicationConfig applicationConfig;
}
private static class KafkaReceiver {
- public final static String OUTPUT_TOPIC = "outputTopic";
- private TopicListener.Output receivedKafkaOutput = new TopicListener.Output("", "");
+ public final String OUTPUT_TOPIC;
+ private TopicListener.Output receivedKafkaOutput;
private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
- public KafkaReceiver(ApplicationConfig applicationConfig) {
+ int count = 0;
+
+ public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic) {
+ this.OUTPUT_TOPIC = outputTopic;
// Create a listener to the output topic. The KafkaTopicListener happens to be
// suitable for that,
private void set(TopicListener.Output receivedKafkaOutput) {
this.receivedKafkaOutput = receivedKafkaOutput;
+ this.count++;
logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
}
synchronized String lastValue() {
return this.receivedKafkaOutput.value;
}
+
+ void reset() {
+ count = 0;
+ this.receivedKafkaOutput = new TopicListener.Output("", "");
+ }
}
private static KafkaReceiver kafkaReceiver;
+ private static KafkaReceiver kafkaReceiver2;
@BeforeEach
void init() {
if (kafkaReceiver == null) {
- kafkaReceiver = new KafkaReceiver(this.applicationConfig);
+ kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic");
+ kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2");
}
+ kafkaReceiver.reset();
+ kafkaReceiver2.reset();
}
@AfterEach
}
}
+ ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
+ try {
+ Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic);
+ String str = gson.toJson(param);
+ Object parametersObj = jsonObject(str);
+
+ return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", null, "");
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
ConsumerJobInfo consumerJobInfoKafka(String topic) {
try {
Job.Parameters param = new Job.Parameters(null, null, null, 1, topic);
return SenderOptions.create(props);
}
- private SenderRecord<String, String, Integer> senderRecord(String data) {
- return senderRecord(data, "");
- }
-
- private SenderRecord<String, String, Integer> senderRecord(String data, String key) {
- final InfoType infoType = this.types.get(TYPE_ID);
+ private SenderRecord<String, String, Integer> senderRecord(String data, String key, String typeId) {
+ final InfoType infoType = this.types.get(typeId);
int correlationMetadata = 2;
return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
}
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
waitForKafkaListener();
- var dataToSend = Flux.just(senderRecord("Message", ""));
+ var dataToSend = Flux.just(senderRecord("Message", "", TYPE_ID));
sendDataToStream(dataToSend);
verifiedReceivedByConsumer("Message");
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
waitForKafkaListener();
- var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+ var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+ // Message_2
+ // etc.
sendDataToStream(dataToSend);
verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
waitForKafkaListener();
String sendString = "testData " + Instant.now();
String sendKey = "key " + Instant.now();
- var dataToSend = Flux.just(senderRecord(sendString, sendKey));
+ var dataToSend = Flux.just(senderRecord(sendString, sendKey, TYPE_ID));
sendDataToStream(dataToSend);
await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
- this.icsSimulatorController.addJob(consumerJobInfoKafka(KafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
waitForKafkaListener();
Instant startTime = Instant.now();
- var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i)); // Message_1, etc.
+ var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+ // etc.
sendDataToStream(dataToSend);
while (!kafkaReceiver.lastValue().equals("Message_" + NO_OF_OBJECTS)) {
logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
}
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ @Test
+ void kafkaCharacteristics_pmFilter() throws Exception {
+ // Filter PM reports and senttotowjobs over Kafka
+
+ final String JOB_ID = "kafkaCharacteristics";
+ final String JOB_ID2 = "kafkaCharacteristics2";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getMeasTypes().add("succImmediateAssignProcs");
+ filterData.getMeasObjInstIds().add("UtranCell");
+
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
+ restClient());
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2,
+ restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+ waitForKafkaListener();
+
+ final int NO_OF_OBJECTS = 100000;
+
+ Instant startTime = Instant.now();
+
+ String path = "./src/test/resources/pm_report.json";
+ String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+
+ var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> senderRecord(pmReportJson, "", PM_TYPE_ID));
+ sendDataToStream(dataToSend);
+
+ while (kafkaReceiver.count != NO_OF_OBJECTS) {
+ logger.info("sleeping {}", kafkaReceiver.count);
+ Thread.sleep(1000 * 1);
+ }
+
+ // System.out.println(kafkaReceiver.receivedKafkaOutput.value);
+
+ final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+ logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+ logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
+ }
+
@Test
void kafkaDeleteJobShouldNotStopListener() throws Exception {
final String JOB_ID1 = "ID1";
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
- var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
+ var dataToSend = Flux.range(1, 100).map(i -> senderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+ // Message_2
+ // etc.
sendDataToStream(dataToSend); // this should not overflow
// Delete jobs, recreate one
this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- dataToSend = Flux.just(senderRecord("Howdy"));
+ dataToSend = Flux.just(senderRecord("Howdy", "", TYPE_ID));
sendDataToStream(dataToSend);
verifiedReceivedByConsumerLast("Howdy");