import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.datastore.DataStore;
+import org.oran.dmaapadapter.datastore.DataStore.Bucket;
import org.oran.dmaapadapter.datastore.FileStore;
+import org.oran.dmaapadapter.datastore.S3ObjectStore;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.PmReport;
import org.oran.dmaapadapter.filter.PmReportFilter;
assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
assertThat(this.consumerController.testResults.receivedHeaders).isEmpty();
- FileStore fileStore = new FileStore(applicationConfig);
- fileStore.deleteFiles();
+ DataStore fileStore = this.dataStore();
+ fileStore.create(DataStore.Bucket.FILES).block();
+ fileStore.create(DataStore.Bucket.LOCKS).block();
+ }
+
+ private DataStore dataStore() {
+ return this.applicationConfig.isS3Enabled() ? new S3ObjectStore(applicationConfig)
+ : new FileStore(applicationConfig);
}
@AfterEach
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();
+ FileStore fileStore = new FileStore(applicationConfig);
+ fileStore.deleteBucket(Bucket.FILES);
+ fileStore.deleteBucket(Bucket.LOCKS);
+
}
private AsyncRestClient restClient(boolean useTrustValidation) {
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
import org.oran.dmaapadapter.datastore.DataStore;
+import org.oran.dmaapadapter.datastore.FileStore;
import org.oran.dmaapadapter.datastore.S3ObjectStore;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.PmReportFilter;
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
"app.pm-files-path=./src/test/resources/", //
"app.s3.locksBucket=ropfilelocks", //
- "app.s3.bucket=ropfiles"}) //
+ "app.pm-files-path=/tmp/dmaapadaptor", //
+ "app.s3.bucket="}) //
class IntegrationWithKafka {
final String TYPE_ID = "KafkaInformationType";
kafkaReceiver.reset();
kafkaReceiver2.reset();
- S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
- fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
- fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block();
+ DataStore fileStore = this.dataStore();
+ fileStore.create(DataStore.Bucket.FILES).block();
+ fileStore.create(DataStore.Bucket.LOCKS).block();
}
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();
- S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+ DataStore fileStore = dataStore();
fileStore.deleteBucket(DataStore.Bucket.FILES).block();
fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
}
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 50000;
+ final int NO_OF_OBJECTS = 5000;
Instant startTime = Instant.now();
.filename(FILE_NAME) //
.build();
- S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+ DataStore fileStore = dataStore();
- fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
- fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
+ fileStore.create(DataStore.Bucket.FILES).block();
+ fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
String eventAsString = gson.toJson(event);
logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
printStatistics();
+ }
+
+ @Test
+ void clear() {
+
+ }
+
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ @Test
+ void kafkaCharacteristics_manyPmJobs() throws Exception {
+ // Filter PM reports and sent to two jobs over Kafka
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getMeasTypes().add("succImmediateAssignProcs");
+ filterData.getMeasObjClass().add("UtranCell");
+
+ final int NO_OF_JOBS = 150;
+ ArrayList<KafkaReceiver> receivers = new ArrayList<>();
+ for (int i = 0; i < NO_OF_JOBS; ++i) {
+ final String OUTPUT_TOPIC = "manyJobs_" + i;
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(OUTPUT_TOPIC, filterData), OUTPUT_TOPIC,
+ restClient());
+ KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, OUTPUT_TOPIC, this.securityContext);
+ receivers.add(receiver);
+ }
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
+ waitForKafkaListener();
+
+ final int NO_OF_OBJECTS = 500;
+
+ Instant startTime = Instant.now();
+
+ final String FILE_NAME = "pm_report.json.gz";
+
+ NewFileEvent event = NewFileEvent.builder() //
+ .filename(FILE_NAME) //
+ .build();
+
+ DataStore fileStore = dataStore();
+
+ fileStore.create(DataStore.Bucket.FILES).block();
+ fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
+
+ String eventAsString = gson.toJson(event);
+
+ var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
+ sendDataToKafka(dataToSend);
+
+ while (receivers.get(0).count != NO_OF_OBJECTS) {
+ logger.info("sleeping {}", kafkaReceiver.count);
+ Thread.sleep(1000 * 1);
+ }
+
+ final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+ logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+
+ for (KafkaReceiver receiver : receivers) {
+ assertThat(receiver.count).isEqualTo(NO_OF_OBJECTS);
+ // System.out.println("** " + receiver.OUTPUT_TOPIC + " " + receiver.count);
+ }
+
+ // printStatistics();
+ }
+ private DataStore dataStore() {
+ return this.applicationConfig.isS3Enabled() ? new S3ObjectStore(applicationConfig)
+ : new FileStore(applicationConfig);
}
@Test
// test
final String JOB_ID = "testHistoricalData";
- S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+ DataStore fileStore = dataStore();
- fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
- fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block();
+ fileStore.create(DataStore.Bucket.FILES).block();
+ fileStore.create(DataStore.Bucket.LOCKS).block();
- fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"),
+ fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
"O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
- fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"),
- "OTHER_SOURCENAME/test.json").block();
+ fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block();
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());