import java.util.Map;
import java.util.zip.GZIPInputStream;
+import lombok.Setter;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
private final DataStore dataStore;
+ @Setter
+ private String kafkaGroupId;
+
public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) {
this.applicationConfig = applConfig;
this.type = type;
this.dataStore = DataStore.create(applConfig);
+ this.kafkaGroupId = this.type.getKafkaGroupId();
}
@Override
sig -> logger.error("KafkaTopicListener stopped, type: {}, reason: {}", this.type.getId(), sig)) //
.filter(t -> t.value().length > 0 || t.key().length > 0) //
.map(input -> new DataFromTopic(input.key(), input.value(), DataFromTopic.findZipped(input.headers()))) //
- .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore), 100) //
+ .flatMap(data -> getDataFromFileIfNewPmFileEvent(data, type, dataStore)) //
.publish() //
.autoConnect(1);
}
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
+
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.type.getKafkaGroupId());
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
return ReceiverOptions.<byte[], byte[]>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
import java.util.List;
import java.util.Map;
+import lombok.Builder;
+
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
int count = 0;
- public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) {
+ public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext,
+ String groupId) {
this.applicationConfig = applicationConfig;
this.OUTPUT_TOPIC = outputTopic;
.build();
KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
+ if (groupId != null) {
+ topicListener.setKafkaGroupId(groupId);
+ }
topicListener.getFlux() //
.map(this::unzip) //
}
private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
- assertThat(this.applicationConfig.isZipOutput()).isEqualTo(receivedKafkaOutput.isZipped);
+ if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped) {
+ logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped,
+ this.applicationConfig.isZipOutput());
+ }
if (!receivedKafkaOutput.isZipped) {
return receivedKafkaOutput;
}
this.applicationConfig.setZipOutput(false);
if (kafkaReceiver == null) {
- kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext);
- kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext);
+ kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext, null);
+ kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext, null);
}
kafkaReceiver.reset();
kafkaReceiver2.reset();
Thread.sleep(4000);
}
- private void printStatistics() {
+ private StatisticsCollection getStatistics() {
String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
String statsResp = restClient().get(targetUri).block();
StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class);
- int noOfSentBytes = 0;
- int noOfSentObjs = 0;
+ return stats;
+ }
+
+ @Builder
+ static class CharacteristicsResult {
+ long noOfFilesPerSecond;
+ long noOfSentBytes;
+ long noOfSentGigaBytes;
+ long noOfSentObjects;
+ long inputFileSize;
+ long noOfReceivedFiles;
+ long noOfReceivedBytes;
+ long noOfSubscribers;
+ long sizeOfSentObj;
+ boolean zipOutput;
+ }
+
+ private CharacteristicsResult getCharacteristicsResult(Instant startTime) {
+ final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
+ StatisticsCollection stats = getStatistics();
+ long noOfSentBytes = 0;
+ long noOfSentObjs = 0;
for (Statistics s : stats.jobStatistics) {
noOfSentBytes += s.getNoOfSentBytes();
noOfSentObjs += s.getNoOfSentObjects();
}
- logger.error(" Stats noOfSentBytes (total): {}, noOfSentObjects (total): {}, noOfTopics: {}", noOfSentBytes,
- noOfSentObjs, stats.jobStatistics.size());
+
+ Statistics oneJobsStats = stats.jobStatistics.iterator().next();
+
+ return CharacteristicsResult.builder() //
+ .noOfSentBytes(noOfSentBytes) //
+ .noOfSentObjects(noOfSentObjs) //
+ .noOfSentGigaBytes(noOfSentBytes / (1024 * 1024)) //
+ .noOfSubscribers(stats.jobStatistics.size()) //
+ .zipOutput(this.applicationConfig.isZipOutput()) //
+ .noOfFilesPerSecond((oneJobsStats.getNoOfReceivedObjects() * 1000) / durationMs) //
+ .noOfReceivedBytes(oneJobsStats.getNoOfReceivedBytes()) //
+ .inputFileSize(oneJobsStats.getNoOfReceivedBytes() / oneJobsStats.getNoOfReceivedObjects()) //
+ .noOfReceivedFiles(oneJobsStats.getNoOfReceivedObjects()) //
+ .sizeOfSentObj(oneJobsStats.getNoOfSentBytes() / oneJobsStats.getNoOfSentObjects()) //
+ .build();
}
private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
logger.error("*** {} Duration ({} ms), objects/second: {}", str, durationMs,
(noOfIterations * 1000) / durationMs);
- printStatistics();
+
+ System.out.println("--------------");
+ System.out.println(gson.toJson(getCharacteristicsResult(startTime)));
+ System.out.println("--------------");
+
}
@Test
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
waitForKafkaListener();
+ Instant startTime = Instant.now();
String sendString = "testData " + Instant.now();
String sendKey = "key " + Instant.now();
var dataToSend = Flux.just(kafkaSenderRecord(sendString, sendKey, KAFKA_TYPE_ID));
await().untilAsserted(() -> assertThat(kafkaReceiver.lastValue()).isEqualTo(sendString));
assertThat(kafkaReceiver.lastKey()).isEqualTo(sendKey);
- printStatistics();
+ System.out.println(gson.toJson(getCharacteristicsResult(startTime)));
}
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
void kafkaCharacteristics_manyPmJobs() throws Exception {
// Filter PM reports and sent to many jobs over Kafka
+ this.applicationConfig.setZipOutput(false);
+
// Register producer, Register types
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("pmCounterNumber0");
- filterData.getMeasTypes().add("pmCounterNumber1");
+ final int NO_OF_COUNTERS = 5;
+ for (int i = 0; i < NO_OF_COUNTERS; ++i) {
+ filterData.getMeasTypes().add("pmCounterNumber" + i);
+ }
filterData.getMeasObjClass().add("NRCellCU");
- this.applicationConfig.setZipOutput(false);
- final int NO_OF_JOBS = 100;
+ final int NO_OF_JOBS = 150;
+
ArrayList<KafkaReceiver> receivers = new ArrayList<>();
for (int i = 0; i < NO_OF_JOBS; ++i) {
final String outputTopic = "manyJobs_" + i;
this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic,
restClient());
- KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext);
+ KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null);
receivers.add(receiver);
}
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 10000;
+ final int NO_OF_OBJECTS = 1000;
Instant startTime = Instant.now();
}
}
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ @Test
+ void kafkaCharacteristics_onePmJobs_sharedTopic() throws Exception {
+ // Filter PM reports and sent to many jobs over Kafka
+
+ this.applicationConfig.setZipOutput(true);
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ final int NO_OF_COUNTERS = 0;
+ for (int i = 0; i < NO_OF_COUNTERS; ++i) {
+ filterData.getMeasTypes().add("pmCounterNumber" + i);
+ }
+ filterData.getMeasObjClass().add("NRCellCU");
+
+ final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(outputTopic, filterData), outputTopic, restClient());
+
+ final int NO_OF_RECEIVERS = 150;
+ ArrayList<KafkaReceiver> receivers = new ArrayList<>();
+ for (int i = 0; i < NO_OF_RECEIVERS; ++i) {
+ KafkaReceiver receiver =
+ new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
+ receivers.add(receiver);
+ }
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ waitForKafkaListener();
+
+ final int NO_OF_OBJECTS = 1000;
+
+ Instant startTime = Instant.now();
+
+ final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
+
+ DataStore fileStore = dataStore();
+
+ fileStore.deleteBucket(DataStore.Bucket.FILES).block();
+ fileStore.create(DataStore.Bucket.FILES).block();
+ fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
+
+ String eventAsString = newFileEvent(FILE_NAME);
+ var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
+ sendDataToKafka(dataToSend);
+
+ logger.info("sleeping {}", kafkaReceiver.count);
+ for (KafkaReceiver receiver : receivers) {
+ while (receiver.count < NO_OF_OBJECTS) {
+ if (kafkaReceiver.count > 0) {
+ logger.info("sleeping {}", receiver.count);
+ }
+
+ Thread.sleep(1000 * 1);
+ }
+ }
+
+ printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
+
+ for (KafkaReceiver receiver : receivers) {
+ if (receiver.count != NO_OF_OBJECTS) {
+ System.out.println("** Unexpected no of objects: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
+ }
+ }
+
+ Thread.sleep(1000 * 5);
+ }
+
private String newFileEvent(String fileName) {
NewFileEvent event = NewFileEvent.builder() //
.filename(fileName) //