170f555ae842936d4b5b9ef19b157060f34a4b50
[nonrtric/plt/ranpm.git] / pmproducer / src / test / java / org / oran / pmproducer / IntegrationWithKafka.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmproducer;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25
26 import com.google.gson.JsonParser;
27
28 import java.io.IOException;
29 import java.lang.invoke.MethodHandles;
30 import java.nio.file.Path;
31 import java.time.Instant;
32 import java.time.OffsetDateTime;
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.Map;
36
37 import lombok.Builder;
38
39 import org.apache.kafka.clients.producer.ProducerConfig;
40 import org.apache.kafka.clients.producer.ProducerRecord;
41 import org.apache.kafka.common.serialization.ByteArraySerializer;
42 import org.junit.jupiter.api.AfterEach;
43 import org.junit.jupiter.api.BeforeEach;
44 import org.junit.jupiter.api.Test;
45 import org.oran.pmproducer.clients.AsyncRestClient;
46 import org.oran.pmproducer.clients.AsyncRestClientFactory;
47 import org.oran.pmproducer.configuration.ApplicationConfig;
48 import org.oran.pmproducer.configuration.WebClientConfig;
49 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
50 import org.oran.pmproducer.controllers.ProducerCallbacksController;
51 import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection;
52 import org.oran.pmproducer.datastore.DataStore;
53 import org.oran.pmproducer.filter.PmReportFilter;
54 import org.oran.pmproducer.oauth2.SecurityContext;
55 import org.oran.pmproducer.r1.ConsumerJobInfo;
56 import org.oran.pmproducer.repository.InfoType;
57 import org.oran.pmproducer.repository.InfoTypes;
58 import org.oran.pmproducer.repository.Job;
59 import org.oran.pmproducer.repository.Job.Statistics;
60 import org.oran.pmproducer.repository.Jobs;
61 import org.oran.pmproducer.tasks.NewFileEvent;
62 import org.oran.pmproducer.tasks.TopicListener;
63 import org.oran.pmproducer.tasks.TopicListeners;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import org.springframework.beans.factory.annotation.Autowired;
67 import org.springframework.boot.test.context.SpringBootTest;
68 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
69 import org.springframework.boot.test.context.TestConfiguration;
70 import org.springframework.boot.test.web.server.LocalServerPort;
71 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
72 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
73 import org.springframework.context.annotation.Bean;
74 import org.springframework.test.context.TestPropertySource;
75
76 import reactor.core.publisher.Flux;
77 import reactor.kafka.sender.KafkaSender;
78 import reactor.kafka.sender.SenderOptions;
79 import reactor.kafka.sender.SenderRecord;
80
81 @SuppressWarnings("java:S3577") // Rename class
82 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
83 @TestPropertySource(properties = { //
84         "server.ssl.key-store=./config/keystore.jks", //
85         "app.webclient.trust-store=./config/truststore.jks", //
86         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
87         "app.pm-files-path=./src/test/resources/", //
88         "app.s3.locksBucket=ropfilelocks", //
89         "app.pm-files-path=/tmp/pmproducer", //
90         "app.s3.bucket=pmproducertest", //
91         "app.auth-token-file=src/test/resources/jwtToken.b64", //
92         "app.kafka.use-oath-token=false"}) //
93 class IntegrationWithKafka {
94
95     final static String PM_TYPE_ID = "PmDataOverKafka";
96
97     @Autowired
98     private ApplicationConfig applicationConfig;
99
100     @Autowired
101     private Jobs jobs;
102
103     @Autowired
104     private InfoTypes types;
105
106     @Autowired
107     private IcsSimulatorController icsSimulatorController;
108
109     @Autowired
110     private TopicListeners topicListeners;
111
112     @Autowired
113     private SecurityContext securityContext;
114
115     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
116
117     private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
118
119     @LocalServerPort
120     int localServerHttpPort;
121
122     static class TestApplicationConfig extends ApplicationConfig {
123         @Override
124         public String getIcsBaseUrl() {
125             return thisProcessUrl();
126         }
127
128         @Override
129         public String getSelfUrl() {
130             return thisProcessUrl();
131         }
132
133         private String thisProcessUrl() {
134             final String url = "https://localhost:" + getLocalServerHttpPort();
135             return url;
136         }
137     }
138
139     /**
140      * Overrides the BeanFactory.
141      */
142     @TestConfiguration
143     static class TestBeanFactory extends BeanFactory {
144
145         @Override
146         @Bean
147         public ServletWebServerFactory servletContainer() {
148             return new TomcatServletWebServerFactory();
149         }
150
151         @Override
152         @Bean
153         public ApplicationConfig getApplicationConfig() {
154             TestApplicationConfig cfg = new TestApplicationConfig();
155             return cfg;
156         }
157     }
158
159     private static class KafkaReceiver {
160         public final String OUTPUT_TOPIC;
161         private TopicListener.DataFromTopic receivedKafkaOutput;
162         private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
163         private final ApplicationConfig applicationConfig;
164
165         int count = 0;
166
167         public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext,
168                 String groupId) {
169             this.applicationConfig = applicationConfig;
170             this.OUTPUT_TOPIC = outputTopic;
171
172             // Create a listener to the output topic. The TopicListener happens to be
173             // suitable for that,
174             InfoType type = InfoType.builder() //
175                     .id("TestReceiver_" + outputTopic) //
176                     .kafkaInputTopic(OUTPUT_TOPIC) //
177                     .build();
178
179             TopicListener topicListener = new TopicListener(applicationConfig, type);
180             if (groupId != null) {
181                 topicListener.setKafkaGroupId(groupId);
182             }
183
184             topicListener.getFlux() //
185                     .map(this::unzip) //
186                     .doOnNext(this::set) //
187                     .doFinally(sig -> logger.info("Finally " + sig)) //
188                     .subscribe();
189         }
190
191         private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
192             if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) {
193                 logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(),
194                         this.applicationConfig.isZipOutput());
195             }
196
197             if (!receivedKafkaOutput.isZipped()) {
198                 return receivedKafkaOutput;
199             }
200             try {
201                 byte[] unzipped = TopicListener.unzip(receivedKafkaOutput.value);
202                 return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key);
203             } catch (IOException e) {
204                 logger.error("********* ERROR ", e.getMessage());
205                 return null;
206             }
207         }
208
209         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
210             this.receivedKafkaOutput = receivedKafkaOutput;
211             this.count++;
212             if (logger.isDebugEnabled()) {
213                 logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
214                 logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
215                 logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
216             }
217         }
218
219         void reset() {
220             this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null);
221             this.count = 0;
222         }
223     }
224
225     private static KafkaReceiver kafkaReceiver;
226     private static KafkaReceiver kafkaReceiver2;
227
228     @BeforeEach
229     void init() {
230         this.applicationConfig.setZipOutput(false);
231
232         if (kafkaReceiver == null) {
233             kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext, null);
234             kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext, null);
235         }
236         kafkaReceiver.reset();
237         kafkaReceiver2.reset();
238
239         DataStore fileStore = this.dataStore();
240         fileStore.create(DataStore.Bucket.FILES).block();
241         fileStore.create(DataStore.Bucket.LOCKS).block();
242
243     }
244
245     @AfterEach
246     void reset() {
247         for (Job job : this.jobs.getAll()) {
248             this.icsSimulatorController.deleteJob(job.getId(), restClient());
249         }
250         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
251         await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty());
252
253         this.icsSimulatorController.testResults.reset();
254
255         DataStore fileStore = dataStore();
256         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
257         fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
258     }
259
260     private AsyncRestClient restClient(boolean useTrustValidation) {
261         WebClientConfig config = this.applicationConfig.getWebClientConfig();
262         HttpProxyConfig httpProxyConfig = HttpProxyConfig.builder() //
263                 .httpProxyHost("") //
264                 .httpProxyPort(0) //
265                 .build();
266         config = WebClientConfig.builder() //
267                 .keyStoreType(config.getKeyStoreType()) //
268                 .keyStorePassword(config.getKeyStorePassword()) //
269                 .keyStore(config.getKeyStore()) //
270                 .keyPassword(config.getKeyPassword()) //
271                 .isTrustStoreUsed(useTrustValidation) //
272                 .trustStore(config.getTrustStore()) //
273                 .trustStorePassword(config.getTrustStorePassword()) //
274                 .httpProxyConfig(httpProxyConfig).build();
275
276         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
277         return restClientFactory.createRestClientNoHttpProxy(baseUrl());
278     }
279
280     private AsyncRestClient restClient() {
281         return restClient(false);
282     }
283
284     private String baseUrl() {
285         return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
286     }
287
288     private static Object jsonObject(String json) {
289         try {
290             return JsonParser.parseString(json).getAsJsonObject();
291         } catch (Exception e) {
292             throw new NullPointerException(e.toString());
293         }
294     }
295
296     public static ConsumerJobInfo consumerJobInfoKafka(String kafkaBootstrapServers, String topic,
297             PmReportFilter.FilterData filterData) {
298         try {
299             Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
300                     .topic(topic) //
301                     .bootStrapServers(kafkaBootstrapServers) //
302                     .build();
303             Job.Parameters param = Job.Parameters.builder() //
304                     .filter(filterData) //
305                     .deliveryInfo(deliveryInfo) //
306                     .build();
307
308             String str = gson.toJson(param);
309             Object parametersObj = jsonObject(str);
310
311             return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", "");
312         } catch (Exception e) {
313             return null;
314         }
315     }
316
317     private SenderOptions<byte[], byte[]> kafkaSenderOptions() {
318         String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
319
320         Map<String, Object> props = new HashMap<>();
321         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
322         // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
323         props.put(ProducerConfig.ACKS_CONFIG, "all");
324         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
325         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
326
327         // Security
328         this.applicationConfig.addKafkaSecurityProps(props);
329
330         return SenderOptions.create(props);
331     }
332
333     private SenderRecord<byte[], byte[], Integer> kafkaSenderRecord(String data, String key, String typeId) {
334         final InfoType infoType = this.types.get(typeId);
335         int correlationMetadata = 2;
336         return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key.getBytes(), data.getBytes()),
337                 correlationMetadata);
338     }
339
340     private void sendDataToKafka(Flux<SenderRecord<byte[], byte[], Integer>> dataToSend) {
341         final KafkaSender<byte[], byte[]> sender = KafkaSender.create(kafkaSenderOptions());
342
343         sender.send(dataToSend) //
344                 .doOnError(e -> logger.error("Send failed", e)) //
345                 .blockLast();
346
347         sender.close();
348     }
349
350     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
351     private static void waitForKafkaListener() throws InterruptedException {
352         Thread.sleep(4000);
353     }
354
355     private StatisticsCollection getStatistics() {
356         String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
357         String statsResp = restClient().get(targetUri).block();
358         StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class);
359         return stats;
360     }
361
362     @Builder
363     static class CharacteristicsResult {
364         long noOfFilesPerSecond;
365         long noOfSentBytes;
366         long noOfSentGigaBytes;
367         long noOfSentObjects;
368         long inputFileSize;
369         long noOfReceivedFiles;
370         long noOfReceivedBytes;
371         long noOfSubscribers;
372         long sizeOfSentObj;
373         boolean zipOutput;
374     }
375
376     private CharacteristicsResult getCharacteristicsResult(Instant startTime) {
377         final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
378         StatisticsCollection stats = getStatistics();
379         long noOfSentBytes = 0;
380         long noOfSentObjs = 0;
381         for (Statistics s : stats.jobStatistics) {
382             noOfSentBytes += s.getNoOfSentBytes();
383             noOfSentObjs += s.getNoOfSentObjects();
384         }
385
386         Statistics oneJobsStats = stats.jobStatistics.iterator().next();
387
388         return CharacteristicsResult.builder() //
389                 .noOfSentBytes(noOfSentBytes) //
390                 .noOfSentObjects(noOfSentObjs) //
391                 .noOfSentGigaBytes(noOfSentBytes / (1024 * 1024)) //
392                 .noOfSubscribers(stats.jobStatistics.size()) //
393                 .zipOutput(this.applicationConfig.isZipOutput()) //
394                 .noOfFilesPerSecond((oneJobsStats.getNoOfReceivedObjects() * 1000) / durationMs) //
395                 .noOfReceivedBytes(oneJobsStats.getNoOfReceivedBytes()) //
396                 .inputFileSize(oneJobsStats.getNoOfReceivedBytes() / oneJobsStats.getNoOfReceivedObjects()) //
397                 .noOfReceivedFiles(oneJobsStats.getNoOfReceivedObjects()) //
398                 .sizeOfSentObj(oneJobsStats.getNoOfSentBytes() / oneJobsStats.getNoOfSentObjects()) //
399                 .build();
400     }
401
402     private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
403         final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
404         logger.info("*** {} Duration ({} ms),  objects/second: {}", str, durationMs,
405                 (noOfIterations * 1000) / durationMs);
406
407         System.out.println("--------------");
408         System.out.println(gson.toJson(getCharacteristicsResult(startTime)));
409         System.out.println("--------------");
410
411     }
412
413     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
414     @Test
415     void kafkaCharacteristics_pmFilter_s3() throws Exception {
416         // Filter PM reports and sent to two jobs over Kafka
417
418         final String JOB_ID = "kafkaCharacteristics";
419         final String JOB_ID2 = "kafkaCharacteristics2";
420
421         // Register producer, Register types
422         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
423         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
424
425         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
426
427         filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
428
429         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
430                 kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
431         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
432                 kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2, restClient());
433
434         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
435         waitForKafkaListener();
436
437         final int NO_OF_OBJECTS = 10;
438
439         Instant startTime = Instant.now();
440
441         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
442
443         DataStore fileStore = dataStore();
444
445         fileStore.create(DataStore.Bucket.FILES).block();
446         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
447
448         String eventAsString = newFileEvent(FILE_NAME);
449         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
450         sendDataToKafka(dataToSend);
451
452         while (kafkaReceiver.count < NO_OF_OBJECTS) {
453             logger.info("sleeping {}", kafkaReceiver.count);
454             Thread.sleep(1000 * 1);
455         }
456
457         String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
458         assertThat(msgString).contains("pmCounterNumber0");
459         assertThat(msgString).doesNotContain("pmCounterNumber1");
460         assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
461         assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
462                                                                                                          // the file
463
464         printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
465         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
466
467         assertThat(kafkaReceiver.count).isEqualTo(NO_OF_OBJECTS);
468     }
469
470     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
471     @Test
472     void kafkaCharacteristics_manyPmJobs() throws Exception {
473         // Filter PM reports and sent to many jobs over Kafka
474
475         this.applicationConfig.setZipOutput(false);
476
477         // Register producer, Register types
478         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
479         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
480
481         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
482         final int NO_OF_COUNTERS = 2;
483         for (int i = 0; i < NO_OF_COUNTERS; ++i) {
484             filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i);
485         }
486
487         final int NO_OF_JOBS = 150;
488
489         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
490         for (int i = 0; i < NO_OF_JOBS; ++i) {
491             final String outputTopic = "manyJobs_" + i;
492             this.icsSimulatorController.addJob(
493                     consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
494                     outputTopic, restClient());
495             KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null);
496             receivers.add(receiver);
497         }
498
499         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
500         waitForKafkaListener();
501
502         final int NO_OF_OBJECTS = 1000;
503
504         Instant startTime = Instant.now();
505
506         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
507
508         DataStore fileStore = dataStore();
509
510         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
511         fileStore.create(DataStore.Bucket.FILES).block();
512         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
513
514         String eventAsString = newFileEvent(FILE_NAME);
515         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
516         sendDataToKafka(dataToSend);
517
518         logger.info("sleeping {}", kafkaReceiver.count);
519         while (receivers.get(0).count < NO_OF_OBJECTS) {
520             if (kafkaReceiver.count > 0) {
521                 logger.info("sleeping {}", receivers.get(0).count);
522             }
523
524             Thread.sleep(1000 * 1);
525         }
526
527         printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
528
529         for (KafkaReceiver receiver : receivers) {
530             if (receiver.count != NO_OF_OBJECTS) {
531                 System.out.println("** Unexpected no of jobs: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
532             }
533         }
534     }
535
536     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
537     @Test
538     void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception {
539         // Filter PM reports and sent to many jobs over Kafka
540
541         this.applicationConfig.setZipOutput(false);
542
543         // Register producer, Register types
544         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
545         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
546
547         final int NO_OF_JOBS = 150;
548         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
549         for (int i = 0; i < NO_OF_JOBS; ++i) {
550             final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
551             final String jobId = "manyJobs_" + i;
552             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
553             filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
554
555             this.icsSimulatorController.addJob(
556                     consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
557                     jobId, restClient());
558
559             KafkaReceiver receiver =
560                     new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
561             receivers.add(receiver);
562         }
563
564         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
565         waitForKafkaListener();
566
567         final int NO_OF_OBJECTS = 1000;
568
569         Instant startTime = Instant.now();
570
571         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
572
573         DataStore fileStore = dataStore();
574
575         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
576         fileStore.create(DataStore.Bucket.FILES).block();
577         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
578
579         String eventAsString = newFileEvent(FILE_NAME);
580         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
581         sendDataToKafka(dataToSend);
582
583         logger.info("sleeping {}", kafkaReceiver.count);
584         for (KafkaReceiver receiver : receivers) {
585             while (receiver.count < NO_OF_OBJECTS) {
586                 if (kafkaReceiver.count > 0) {
587                     logger.info("sleeping {}", receiver.count);
588                 }
589
590                 Thread.sleep(1000 * 1);
591             }
592         }
593
594         printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
595
596         for (KafkaReceiver receiver : receivers) {
597             if (receiver.count != NO_OF_OBJECTS) {
598                 System.out.println("** Unexpected no of objects: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
599             }
600         }
601
602         Thread.sleep(1000 * 5);
603     }
604
605     private String newFileEvent(String fileName) {
606         NewFileEvent event = NewFileEvent.builder() //
607                 .filename(fileName) //
608                 .build();
609         return gson.toJson(event);
610     }
611
612     private DataStore dataStore() {
613         return DataStore.create(this.applicationConfig);
614     }
615
616     @Test
617     void testHistoricalData() throws Exception {
618         // test
619         final String JOB_ID = "testHistoricalData";
620
621         DataStore fileStore = dataStore();
622
623         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
624         fileStore.create(DataStore.Bucket.FILES).block();
625         fileStore.create(DataStore.Bucket.LOCKS).block();
626
627         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
628                 "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block();
629
630         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block();
631
632         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
633
634         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
635         filterData.getSourceNames().add("O-DU-1122");
636         filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
637
638         filterData.setPmRopEndTime(OffsetDateTime.now().toString());
639
640         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
641                 kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
642         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
643
644         await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive());
645     }
646
647 }