Minor changes
[nonrtric/plt/ranpm.git] / pmproducer / src / test / java / org / oran / pmproducer / IntegrationWithKafka.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmproducer;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25
26 import com.google.gson.JsonParser;
27
28 import java.io.IOException;
29 import java.lang.invoke.MethodHandles;
30 import java.nio.file.Path;
31 import java.time.Instant;
32 import java.time.OffsetDateTime;
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.Map;
36
37 import lombok.Builder;
38
39 import org.apache.kafka.clients.producer.ProducerConfig;
40 import org.apache.kafka.clients.producer.ProducerRecord;
41 import org.apache.kafka.common.serialization.ByteArraySerializer;
42 import org.junit.jupiter.api.AfterEach;
43 import org.junit.jupiter.api.BeforeEach;
44 import org.junit.jupiter.api.Test;
45 import org.oran.pmproducer.clients.AsyncRestClient;
46 import org.oran.pmproducer.clients.AsyncRestClientFactory;
47 import org.oran.pmproducer.clients.SecurityContext;
48 import org.oran.pmproducer.configuration.ApplicationConfig;
49 import org.oran.pmproducer.configuration.WebClientConfig;
50 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
51 import org.oran.pmproducer.controllers.ProducerCallbacksController;
52 import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection;
53 import org.oran.pmproducer.datastore.DataStore;
54 import org.oran.pmproducer.filter.PmReportFilter;
55 import org.oran.pmproducer.r1.ConsumerJobInfo;
56 import org.oran.pmproducer.repository.InfoType;
57 import org.oran.pmproducer.repository.InfoTypes;
58 import org.oran.pmproducer.repository.Job;
59 import org.oran.pmproducer.repository.Job.Statistics;
60 import org.oran.pmproducer.repository.Jobs;
61 import org.oran.pmproducer.tasks.NewFileEvent;
62 import org.oran.pmproducer.tasks.TopicListener;
63 import org.oran.pmproducer.tasks.TopicListeners;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import org.springframework.beans.factory.annotation.Autowired;
67 import org.springframework.boot.test.context.SpringBootTest;
68 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
69 import org.springframework.boot.test.context.TestConfiguration;
70 import org.springframework.boot.test.web.server.LocalServerPort;
71 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
72 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
73 import org.springframework.context.annotation.Bean;
74 import org.springframework.test.context.TestPropertySource;
75
76 import reactor.core.publisher.Flux;
77 import reactor.kafka.sender.KafkaSender;
78 import reactor.kafka.sender.SenderOptions;
79 import reactor.kafka.sender.SenderRecord;
80
81 @SuppressWarnings("java:S3577") // Rename class
82 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
83 @TestPropertySource(properties = { //
84         "server.ssl.key-store=./config/keystore.jks", //
85         "app.webclient.trust-store=./config/truststore.jks", //
86         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
87         "app.pm-files-path=./src/test/resources/", //
88         "app.s3.locksBucket=ropfilelocks", //
89         "app.pm-files-path=/tmp/dmaapadaptor", //
90         "app.s3.bucket=dmaaptest" //
91 }) //
92 class IntegrationWithKafka {
93
94     final static String PM_TYPE_ID = "PmDataOverKafka";
95
96     @Autowired
97     private ApplicationConfig applicationConfig;
98
99     @Autowired
100     private Jobs jobs;
101
102     @Autowired
103     private InfoTypes types;
104
105     @Autowired
106     private IcsSimulatorController icsSimulatorController;
107
108     @Autowired
109     private TopicListeners topicListeners;
110
111     @Autowired
112     private SecurityContext securityContext;
113
114     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
115
116     private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
117
118     @LocalServerPort
119     int localServerHttpPort;
120
121     static class TestApplicationConfig extends ApplicationConfig {
122         @Override
123         public String getIcsBaseUrl() {
124             return thisProcessUrl();
125         }
126
127         @Override
128         public String getSelfUrl() {
129             return thisProcessUrl();
130         }
131
132         private String thisProcessUrl() {
133             final String url = "https://localhost:" + getLocalServerHttpPort();
134             return url;
135         }
136     }
137
138     /**
139      * Overrides the BeanFactory.
140      */
141     @TestConfiguration
142     static class TestBeanFactory extends BeanFactory {
143
144         @Override
145         @Bean
146         public ServletWebServerFactory servletContainer() {
147             return new TomcatServletWebServerFactory();
148         }
149
150         @Override
151         @Bean
152         public ApplicationConfig getApplicationConfig() {
153             TestApplicationConfig cfg = new TestApplicationConfig();
154             return cfg;
155         }
156     }
157
158     private static class KafkaReceiver {
159         public final String OUTPUT_TOPIC;
160         private TopicListener.DataFromTopic receivedKafkaOutput;
161         private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
162         private final ApplicationConfig applicationConfig;
163
164         int count = 0;
165
166         public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext,
167                 String groupId) {
168             this.applicationConfig = applicationConfig;
169             this.OUTPUT_TOPIC = outputTopic;
170
171             // Create a listener to the output topic. The TopicListener happens to be
172             // suitable for that,
173             InfoType type = InfoType.builder() //
174                     .id("TestReceiver_" + outputTopic) //
175                     .kafkaInputTopic(OUTPUT_TOPIC) //
176                     .build();
177
178             TopicListener topicListener = new TopicListener(applicationConfig, type);
179             if (groupId != null) {
180                 topicListener.setKafkaGroupId(groupId);
181             }
182
183             topicListener.getFlux() //
184                     .map(this::unzip) //
185                     .doOnNext(this::set) //
186                     .doFinally(sig -> logger.info("Finally " + sig)) //
187                     .subscribe();
188         }
189
190         private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
191             if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) {
192                 logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(),
193                         this.applicationConfig.isZipOutput());
194             }
195
196             if (!receivedKafkaOutput.isZipped()) {
197                 return receivedKafkaOutput;
198             }
199             try {
200                 byte[] unzipped = TopicListener.unzip(receivedKafkaOutput.value);
201                 return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key);
202             } catch (IOException e) {
203                 logger.error("********* ERROR ", e.getMessage());
204                 return null;
205             }
206         }
207
208         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
209             this.receivedKafkaOutput = receivedKafkaOutput;
210             this.count++;
211             if (logger.isDebugEnabled()) {
212                 logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
213                 logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
214             }
215         }
216
217         void reset() {
218             this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null);
219             this.count = 0;
220         }
221     }
222
223     private static KafkaReceiver kafkaReceiver;
224     private static KafkaReceiver kafkaReceiver2;
225
226     @BeforeEach
227     void init() {
228         this.applicationConfig.setZipOutput(false);
229
230         if (kafkaReceiver == null) {
231             kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext, null);
232             kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext, null);
233         }
234         kafkaReceiver.reset();
235         kafkaReceiver2.reset();
236
237         DataStore fileStore = this.dataStore();
238         fileStore.create(DataStore.Bucket.FILES).block();
239         fileStore.create(DataStore.Bucket.LOCKS).block();
240
241     }
242
243     @AfterEach
244     void reset() {
245         for (Job job : this.jobs.getAll()) {
246             this.icsSimulatorController.deleteJob(job.getId(), restClient());
247         }
248         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
249         await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty());
250
251         this.icsSimulatorController.testResults.reset();
252
253         DataStore fileStore = dataStore();
254         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
255         fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
256     }
257
258     private AsyncRestClient restClient(boolean useTrustValidation) {
259         WebClientConfig config = this.applicationConfig.getWebClientConfig();
260         HttpProxyConfig httpProxyConfig = HttpProxyConfig.builder() //
261                 .httpProxyHost("") //
262                 .httpProxyPort(0) //
263                 .build();
264         config = WebClientConfig.builder() //
265                 .keyStoreType(config.getKeyStoreType()) //
266                 .keyStorePassword(config.getKeyStorePassword()) //
267                 .keyStore(config.getKeyStore()) //
268                 .keyPassword(config.getKeyPassword()) //
269                 .isTrustStoreUsed(useTrustValidation) //
270                 .trustStore(config.getTrustStore()) //
271                 .trustStorePassword(config.getTrustStorePassword()) //
272                 .httpProxyConfig(httpProxyConfig).build();
273
274         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
275         return restClientFactory.createRestClientNoHttpProxy(baseUrl());
276     }
277
278     private AsyncRestClient restClient() {
279         return restClient(false);
280     }
281
282     private String baseUrl() {
283         return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
284     }
285
286     private static Object jsonObject(String json) {
287         try {
288             return JsonParser.parseString(json).getAsJsonObject();
289         } catch (Exception e) {
290             throw new NullPointerException(e.toString());
291         }
292     }
293
294     public static ConsumerJobInfo consumerJobInfoKafka(String kafkaBootstrapServers, String topic,
295             PmReportFilter.FilterData filterData) {
296         try {
297             Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
298                     .topic(topic) //
299                     .bootStrapServers(kafkaBootstrapServers) //
300                     .build();
301             Job.Parameters param = Job.Parameters.builder() //
302                     .filter(filterData) //
303                     .deliveryInfo(deliveryInfo) //
304                     .build();
305
306             String str = gson.toJson(param);
307             Object parametersObj = jsonObject(str);
308
309             return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", "");
310         } catch (Exception e) {
311             return null;
312         }
313     }
314
315     private SenderOptions<byte[], byte[]> kafkaSenderOptions() {
316         String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
317
318         Map<String, Object> props = new HashMap<>();
319         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
320         // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
321         props.put(ProducerConfig.ACKS_CONFIG, "all");
322         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
323         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
324         return SenderOptions.create(props);
325     }
326
327     private SenderRecord<byte[], byte[], Integer> kafkaSenderRecord(String data, String key, String typeId) {
328         final InfoType infoType = this.types.get(typeId);
329         int correlationMetadata = 2;
330         return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key.getBytes(), data.getBytes()),
331                 correlationMetadata);
332     }
333
334     private void sendDataToKafka(Flux<SenderRecord<byte[], byte[], Integer>> dataToSend) {
335         final KafkaSender<byte[], byte[]> sender = KafkaSender.create(kafkaSenderOptions());
336
337         sender.send(dataToSend) //
338                 .doOnError(e -> logger.error("Send failed", e)) //
339                 .blockLast();
340
341         sender.close();
342     }
343
344     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
345     private static void waitForKafkaListener() throws InterruptedException {
346         Thread.sleep(4000);
347     }
348
349     private StatisticsCollection getStatistics() {
350         String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
351         String statsResp = restClient().get(targetUri).block();
352         StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class);
353         return stats;
354     }
355
356     @Builder
357     static class CharacteristicsResult {
358         long noOfFilesPerSecond;
359         long noOfSentBytes;
360         long noOfSentGigaBytes;
361         long noOfSentObjects;
362         long inputFileSize;
363         long noOfReceivedFiles;
364         long noOfReceivedBytes;
365         long noOfSubscribers;
366         long sizeOfSentObj;
367         boolean zipOutput;
368     }
369
370     private CharacteristicsResult getCharacteristicsResult(Instant startTime) {
371         final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
372         StatisticsCollection stats = getStatistics();
373         long noOfSentBytes = 0;
374         long noOfSentObjs = 0;
375         for (Statistics s : stats.jobStatistics) {
376             noOfSentBytes += s.getNoOfSentBytes();
377             noOfSentObjs += s.getNoOfSentObjects();
378         }
379
380         Statistics oneJobsStats = stats.jobStatistics.iterator().next();
381
382         return CharacteristicsResult.builder() //
383                 .noOfSentBytes(noOfSentBytes) //
384                 .noOfSentObjects(noOfSentObjs) //
385                 .noOfSentGigaBytes(noOfSentBytes / (1024 * 1024)) //
386                 .noOfSubscribers(stats.jobStatistics.size()) //
387                 .zipOutput(this.applicationConfig.isZipOutput()) //
388                 .noOfFilesPerSecond((oneJobsStats.getNoOfReceivedObjects() * 1000) / durationMs) //
389                 .noOfReceivedBytes(oneJobsStats.getNoOfReceivedBytes()) //
390                 .inputFileSize(oneJobsStats.getNoOfReceivedBytes() / oneJobsStats.getNoOfReceivedObjects()) //
391                 .noOfReceivedFiles(oneJobsStats.getNoOfReceivedObjects()) //
392                 .sizeOfSentObj(oneJobsStats.getNoOfSentBytes() / oneJobsStats.getNoOfSentObjects()) //
393                 .build();
394     }
395
396     private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
397         final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
398         logger.info("*** {} Duration ({} ms),  objects/second: {}", str, durationMs,
399                 (noOfIterations * 1000) / durationMs);
400
401         System.out.println("--------------");
402         System.out.println(gson.toJson(getCharacteristicsResult(startTime)));
403         System.out.println("--------------");
404
405     }
406
407     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
408     @Test
409     void kafkaCharacteristics_pmFilter_s3() throws Exception {
410         // Filter PM reports and sent to two jobs over Kafka
411
412         final String JOB_ID = "kafkaCharacteristics";
413         final String JOB_ID2 = "kafkaCharacteristics2";
414
415         // Register producer, Register types
416         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
417         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
418
419         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
420
421         filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
422
423         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
424                 kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
425         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
426                 kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2, restClient());
427
428         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
429         waitForKafkaListener();
430
431         final int NO_OF_OBJECTS = 10;
432
433         Instant startTime = Instant.now();
434
435         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
436
437         DataStore fileStore = dataStore();
438
439         fileStore.create(DataStore.Bucket.FILES).block();
440         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
441
442         String eventAsString = newFileEvent(FILE_NAME);
443         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
444         sendDataToKafka(dataToSend);
445
446         while (kafkaReceiver.count < NO_OF_OBJECTS) {
447             logger.info("sleeping {}", kafkaReceiver.count);
448             Thread.sleep(1000 * 1);
449         }
450
451         String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
452         assertThat(msgString).contains("pmCounterNumber0");
453         assertThat(msgString).doesNotContain("pmCounterNumber1");
454
455         printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
456         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
457
458         assertThat(kafkaReceiver.count).isEqualTo(NO_OF_OBJECTS);
459     }
460
461     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
462     @Test
463     void kafkaCharacteristics_manyPmJobs() throws Exception {
464         // Filter PM reports and sent to many jobs over Kafka
465
466         this.applicationConfig.setZipOutput(false);
467
468         // Register producer, Register types
469         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
470         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
471
472         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
473         final int NO_OF_COUNTERS = 2;
474         for (int i = 0; i < NO_OF_COUNTERS; ++i) {
475             filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i);
476         }
477
478         final int NO_OF_JOBS = 150;
479
480         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
481         for (int i = 0; i < NO_OF_JOBS; ++i) {
482             final String outputTopic = "manyJobs_" + i;
483             this.icsSimulatorController.addJob(
484                     consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
485                     outputTopic, restClient());
486             KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null);
487             receivers.add(receiver);
488         }
489
490         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
491         waitForKafkaListener();
492
493         final int NO_OF_OBJECTS = 1000;
494
495         Instant startTime = Instant.now();
496
497         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
498
499         DataStore fileStore = dataStore();
500
501         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
502         fileStore.create(DataStore.Bucket.FILES).block();
503         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
504
505         String eventAsString = newFileEvent(FILE_NAME);
506         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
507         sendDataToKafka(dataToSend);
508
509         logger.info("sleeping {}", kafkaReceiver.count);
510         while (receivers.get(0).count < NO_OF_OBJECTS) {
511             if (kafkaReceiver.count > 0) {
512                 logger.info("sleeping {}", receivers.get(0).count);
513             }
514
515             Thread.sleep(1000 * 1);
516         }
517
518         printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
519
520         for (KafkaReceiver receiver : receivers) {
521             if (receiver.count != NO_OF_OBJECTS) {
522                 System.out.println("** Unexpected no of jobs: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
523             }
524         }
525     }
526
527     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
528     @Test
529     void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception {
530         // Filter PM reports and sent to many jobs over Kafka
531
532         this.applicationConfig.setZipOutput(false);
533
534         // Register producer, Register types
535         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
536         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
537
538         final int NO_OF_JOBS = 150;
539         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
540         for (int i = 0; i < NO_OF_JOBS; ++i) {
541             final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
542             final String jobId = "manyJobs_" + i;
543             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
544             filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
545
546             this.icsSimulatorController.addJob(
547                     consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
548                     jobId, restClient());
549
550             KafkaReceiver receiver =
551                     new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
552             receivers.add(receiver);
553         }
554
555         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
556         waitForKafkaListener();
557
558         final int NO_OF_OBJECTS = 1000;
559
560         Instant startTime = Instant.now();
561
562         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
563
564         DataStore fileStore = dataStore();
565
566         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
567         fileStore.create(DataStore.Bucket.FILES).block();
568         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
569
570         String eventAsString = newFileEvent(FILE_NAME);
571         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
572         sendDataToKafka(dataToSend);
573
574         logger.info("sleeping {}", kafkaReceiver.count);
575         for (KafkaReceiver receiver : receivers) {
576             while (receiver.count < NO_OF_OBJECTS) {
577                 if (kafkaReceiver.count > 0) {
578                     logger.info("sleeping {}", receiver.count);
579                 }
580
581                 Thread.sleep(1000 * 1);
582             }
583         }
584
585         printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
586
587         for (KafkaReceiver receiver : receivers) {
588             if (receiver.count != NO_OF_OBJECTS) {
589                 System.out.println("** Unexpected no of objects: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
590             }
591         }
592
593         Thread.sleep(1000 * 5);
594     }
595
596     private String newFileEvent(String fileName) {
597         NewFileEvent event = NewFileEvent.builder() //
598                 .filename(fileName) //
599                 .build();
600         return gson.toJson(event);
601     }
602
603     private DataStore dataStore() {
604         return DataStore.create(this.applicationConfig);
605     }
606
607     @Test
608     void testHistoricalData() throws Exception {
609         // test
610         final String JOB_ID = "testHistoricalData";
611
612         DataStore fileStore = dataStore();
613
614         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
615         fileStore.create(DataStore.Bucket.FILES).block();
616         fileStore.create(DataStore.Bucket.LOCKS).block();
617
618         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
619                 "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block();
620
621         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block();
622
623         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
624
625         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
626         filterData.getSourceNames().add("O-DU-1122");
627         filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
628
629         filterData.setPmRopEndTime(OffsetDateTime.now().toString());
630
631         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
632                 kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
633         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
634
635         await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive());
636     }
637
638 }