Documentation updates
[nonrtric/plt/ranpm.git] / pmproducer / src / test / java / org / oran / pmproducer / IntegrationWithKafka.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmproducer;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25
26 import com.google.gson.JsonParser;
27
28 import java.io.IOException;
29 import java.lang.invoke.MethodHandles;
30 import java.nio.file.Path;
31 import java.time.Instant;
32 import java.time.OffsetDateTime;
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.Map;
36
37 import lombok.Builder;
38
39 import org.apache.kafka.clients.producer.ProducerConfig;
40 import org.apache.kafka.clients.producer.ProducerRecord;
41 import org.apache.kafka.common.serialization.ByteArraySerializer;
42 import org.junit.jupiter.api.AfterEach;
43 import org.junit.jupiter.api.BeforeEach;
44 import org.junit.jupiter.api.Test;
45 import org.oran.pmproducer.clients.AsyncRestClient;
46 import org.oran.pmproducer.clients.AsyncRestClientFactory;
47 import org.oran.pmproducer.configuration.ApplicationConfig;
48 import org.oran.pmproducer.configuration.WebClientConfig;
49 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
50 import org.oran.pmproducer.controllers.ProducerCallbacksController;
51 import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection;
52 import org.oran.pmproducer.datastore.DataStore;
53 import org.oran.pmproducer.filter.PmReportFilter;
54 import org.oran.pmproducer.oauth2.SecurityContext;
55 import org.oran.pmproducer.r1.ConsumerJobInfo;
56 import org.oran.pmproducer.repository.InfoType;
57 import org.oran.pmproducer.repository.InfoTypes;
58 import org.oran.pmproducer.repository.Job;
59 import org.oran.pmproducer.repository.Job.Statistics;
60 import org.oran.pmproducer.repository.Jobs;
61 import org.oran.pmproducer.tasks.NewFileEvent;
62 import org.oran.pmproducer.tasks.TopicListener;
63 import org.oran.pmproducer.tasks.TopicListeners;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import org.springframework.beans.factory.annotation.Autowired;
67 import org.springframework.boot.test.context.SpringBootTest;
68 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
69 import org.springframework.boot.test.context.TestConfiguration;
70 import org.springframework.boot.test.web.server.LocalServerPort;
71 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
72 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
73 import org.springframework.context.annotation.Bean;
74 import org.springframework.test.context.TestPropertySource;
75
76 import reactor.core.publisher.Flux;
77 import reactor.kafka.sender.KafkaSender;
78 import reactor.kafka.sender.SenderOptions;
79 import reactor.kafka.sender.SenderRecord;
80
81 @SuppressWarnings("java:S3577") // Rename class
82 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
83 @TestPropertySource(properties = { //
84         "server.ssl.key-store=./config/keystore.jks", //
85         "app.webclient.trust-store=./config/truststore.jks", //
86         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
87         "app.pm-files-path=./src/test/resources/", //
88         "app.s3.locksBucket=ropfilelocks", //
89         "app.pm-files-path=/tmp/pmproducer", //
90         "app.s3.bucket=pmproducertest", //
91         "app.auth-token-file=src/test/resources/jwtToken.b64", //
92         "app.kafka.use-oath-token=false"}) //
93 /**
94  * Tests interwork with Kafka and Minio
95  * Requires that Kafka and ICS is started.
96  */
97 class IntegrationWithKafka {
98
99     final static String PM_TYPE_ID = "PmDataOverKafka";
100
101     @Autowired
102     private ApplicationConfig applicationConfig;
103
104     @Autowired
105     private Jobs jobs;
106
107     @Autowired
108     private InfoTypes types;
109
110     @Autowired
111     private IcsSimulatorController icsSimulatorController;
112
113     @Autowired
114     private TopicListeners topicListeners;
115
116     @Autowired
117     private SecurityContext securityContext;
118
119     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
120
121     private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
122
123     @LocalServerPort
124     int localServerHttpPort;
125
126     static class TestApplicationConfig extends ApplicationConfig {
127         @Override
128         public String getIcsBaseUrl() {
129             return thisProcessUrl();
130         }
131
132         @Override
133         public String getSelfUrl() {
134             return thisProcessUrl();
135         }
136
137         private String thisProcessUrl() {
138             final String url = "https://localhost:" + getLocalServerHttpPort();
139             return url;
140         }
141     }
142
143     /**
144      * Overrides the BeanFactory.
145      */
146     @TestConfiguration
147     static class TestBeanFactory extends BeanFactory {
148
149         @Override
150         @Bean
151         public ServletWebServerFactory servletContainer() {
152             return new TomcatServletWebServerFactory();
153         }
154
155         @Override
156         @Bean
157         public ApplicationConfig getApplicationConfig() {
158             TestApplicationConfig cfg = new TestApplicationConfig();
159             return cfg;
160         }
161     }
162
163     private static class KafkaReceiver {
164         public final String OUTPUT_TOPIC;
165         private TopicListener.DataFromTopic receivedKafkaOutput;
166         private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
167         private final ApplicationConfig applicationConfig;
168
169         int count = 0;
170
171         public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext,
172                 String groupId) {
173             this.applicationConfig = applicationConfig;
174             this.OUTPUT_TOPIC = outputTopic;
175
176             // Create a listener to the output topic. The TopicListener happens to be
177             // suitable for that,
178             InfoType type = InfoType.builder() //
179                     .id("TestReceiver_" + outputTopic) //
180                     .kafkaInputTopic(OUTPUT_TOPIC) //
181                     .build();
182
183             TopicListener topicListener = new TopicListener(applicationConfig, type);
184             if (groupId != null) {
185                 topicListener.setKafkaGroupId(groupId);
186             }
187
188             topicListener.getFlux() //
189                     .map(this::unzip) //
190                     .doOnNext(this::set) //
191                     .doFinally(sig -> logger.info("Finally " + sig)) //
192                     .subscribe();
193         }
194
195         private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
196             if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) {
197                 logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(),
198                         this.applicationConfig.isZipOutput());
199             }
200
201             if (!receivedKafkaOutput.isZipped()) {
202                 return receivedKafkaOutput;
203             }
204             try {
205                 byte[] unzipped = TopicListener.unzip(receivedKafkaOutput.value);
206                 return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key);
207             } catch (IOException e) {
208                 logger.error("********* ERROR ", e.getMessage());
209                 return null;
210             }
211         }
212
213         private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
214             this.receivedKafkaOutput = receivedKafkaOutput;
215             this.count++;
216             if (logger.isDebugEnabled()) {
217                 logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
218                 logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
219                 logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
220             }
221         }
222
223         void reset() {
224             this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null);
225             this.count = 0;
226         }
227     }
228
229     private static KafkaReceiver kafkaReceiver;
230     private static KafkaReceiver kafkaReceiver2;
231
232     @BeforeEach
233     void init() {
234         this.applicationConfig.setZipOutput(false);
235
236         if (kafkaReceiver == null) {
237             kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext, null);
238             kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext, null);
239         }
240         kafkaReceiver.reset();
241         kafkaReceiver2.reset();
242
243         DataStore fileStore = this.dataStore();
244         fileStore.create(DataStore.Bucket.FILES).block();
245         fileStore.create(DataStore.Bucket.LOCKS).block();
246
247     }
248
249     @AfterEach
250     void reset() {
251         for (Job job : this.jobs.getAll()) {
252             this.icsSimulatorController.deleteJob(job.getId(), restClient());
253         }
254         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
255         await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty());
256
257         this.icsSimulatorController.testResults.reset();
258
259         DataStore fileStore = dataStore();
260         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
261         fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
262     }
263
264     private AsyncRestClient restClient(boolean useTrustValidation) {
265         WebClientConfig config = this.applicationConfig.getWebClientConfig();
266         HttpProxyConfig httpProxyConfig = HttpProxyConfig.builder() //
267                 .httpProxyHost("") //
268                 .httpProxyPort(0) //
269                 .build();
270         config = WebClientConfig.builder() //
271                 .keyStoreType(config.getKeyStoreType()) //
272                 .keyStorePassword(config.getKeyStorePassword()) //
273                 .keyStore(config.getKeyStore()) //
274                 .keyPassword(config.getKeyPassword()) //
275                 .isTrustStoreUsed(useTrustValidation) //
276                 .trustStore(config.getTrustStore()) //
277                 .trustStorePassword(config.getTrustStorePassword()) //
278                 .httpProxyConfig(httpProxyConfig).build();
279
280         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
281         return restClientFactory.createRestClientNoHttpProxy(baseUrl());
282     }
283
284     private AsyncRestClient restClient() {
285         return restClient(false);
286     }
287
288     private String baseUrl() {
289         return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
290     }
291
292     private static Object jsonObject(String json) {
293         try {
294             return JsonParser.parseString(json).getAsJsonObject();
295         } catch (Exception e) {
296             throw new NullPointerException(e.toString());
297         }
298     }
299
300     public static ConsumerJobInfo consumerJobInfoKafka(String kafkaBootstrapServers, String topic,
301             PmReportFilter.FilterData filterData) {
302         try {
303             Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
304                     .topic(topic) //
305                     .bootStrapServers(kafkaBootstrapServers) //
306                     .build();
307             Job.Parameters param = Job.Parameters.builder() //
308                     .filter(filterData) //
309                     .deliveryInfo(deliveryInfo) //
310                     .build();
311
312             String str = gson.toJson(param);
313             Object parametersObj = jsonObject(str);
314
315             return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", "");
316         } catch (Exception e) {
317             return null;
318         }
319     }
320
321     private SenderOptions<byte[], byte[]> kafkaSenderOptions() {
322         String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
323
324         Map<String, Object> props = new HashMap<>();
325         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
326         // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
327         props.put(ProducerConfig.ACKS_CONFIG, "all");
328         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
329         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
330
331         // Security
332         this.applicationConfig.addKafkaSecurityProps(props);
333
334         return SenderOptions.create(props);
335     }
336
337     private SenderRecord<byte[], byte[], Integer> kafkaSenderRecord(String data, String key, String typeId) {
338         final InfoType infoType = this.types.get(typeId);
339         int correlationMetadata = 2;
340         return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key.getBytes(), data.getBytes()),
341                 correlationMetadata);
342     }
343
344     private void sendDataToKafka(Flux<SenderRecord<byte[], byte[], Integer>> dataToSend) {
345         final KafkaSender<byte[], byte[]> sender = KafkaSender.create(kafkaSenderOptions());
346
347         sender.send(dataToSend) //
348                 .doOnError(e -> logger.error("Send failed", e)) //
349                 .blockLast();
350
351         sender.close();
352     }
353
354     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
355     private static void waitForKafkaListener() throws InterruptedException {
356         Thread.sleep(4000);
357     }
358
359     private StatisticsCollection getStatistics() {
360         String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
361         String statsResp = restClient().get(targetUri).block();
362         StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class);
363         return stats;
364     }
365
366     @Builder
367     static class CharacteristicsResult {
368         long noOfFilesPerSecond;
369         long noOfSentBytes;
370         long noOfSentGigaBytes;
371         long noOfSentObjects;
372         long inputFileSize;
373         long noOfReceivedFiles;
374         long noOfReceivedBytes;
375         long noOfSubscribers;
376         long sizeOfSentObj;
377         boolean zipOutput;
378     }
379
380     private CharacteristicsResult getCharacteristicsResult(Instant startTime) {
381         final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
382         StatisticsCollection stats = getStatistics();
383         long noOfSentBytes = 0;
384         long noOfSentObjs = 0;
385         for (Statistics s : stats.jobStatistics) {
386             noOfSentBytes += s.getNoOfSentBytes();
387             noOfSentObjs += s.getNoOfSentObjects();
388         }
389
390         Statistics oneJobsStats = stats.jobStatistics.iterator().next();
391
392         return CharacteristicsResult.builder() //
393                 .noOfSentBytes(noOfSentBytes) //
394                 .noOfSentObjects(noOfSentObjs) //
395                 .noOfSentGigaBytes(noOfSentBytes / (1024 * 1024)) //
396                 .noOfSubscribers(stats.jobStatistics.size()) //
397                 .zipOutput(this.applicationConfig.isZipOutput()) //
398                 .noOfFilesPerSecond((oneJobsStats.getNoOfReceivedObjects() * 1000) / durationMs) //
399                 .noOfReceivedBytes(oneJobsStats.getNoOfReceivedBytes()) //
400                 .inputFileSize(oneJobsStats.getNoOfReceivedBytes() / oneJobsStats.getNoOfReceivedObjects()) //
401                 .noOfReceivedFiles(oneJobsStats.getNoOfReceivedObjects()) //
402                 .sizeOfSentObj(oneJobsStats.getNoOfSentBytes() / oneJobsStats.getNoOfSentObjects()) //
403                 .build();
404     }
405
406     private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
407         final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
408         logger.info("*** {} Duration ({} ms),  objects/second: {}", str, durationMs,
409                 (noOfIterations * 1000) / durationMs);
410
411         System.out.println("--------------");
412         System.out.println(gson.toJson(getCharacteristicsResult(startTime)));
413         System.out.println("--------------");
414
415     }
416
417     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
418     @Test
419     void kafkaCharacteristics_pmFilter_s3() throws Exception {
420         // Filter PM reports and sent to two jobs over Kafka
421
422         final String JOB_ID = "kafkaCharacteristics";
423         final String JOB_ID2 = "kafkaCharacteristics2";
424
425         // Register producer, Register types
426         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
427         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
428
429         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
430
431         filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
432
433         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
434                 kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
435         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
436                 kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2, restClient());
437
438         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
439         waitForKafkaListener();
440
441         final int NO_OF_OBJECTS = 10;
442
443         Instant startTime = Instant.now();
444
445         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
446
447         DataStore fileStore = dataStore();
448
449         fileStore.create(DataStore.Bucket.FILES).block();
450         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
451
452         String eventAsString = newFileEvent(FILE_NAME);
453         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
454         sendDataToKafka(dataToSend);
455
456         while (kafkaReceiver.count < NO_OF_OBJECTS) {
457             logger.info("sleeping {}", kafkaReceiver.count);
458             Thread.sleep(1000 * 1);
459         }
460
461         String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
462         assertThat(msgString).contains("pmCounterNumber0");
463         assertThat(msgString).doesNotContain("pmCounterNumber1");
464         assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
465         assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
466                                                                                                          // the file
467
468         printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
469         logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
470
471         assertThat(kafkaReceiver.count).isEqualTo(NO_OF_OBJECTS);
472     }
473
474     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
475     @Test
476     void kafkaCharacteristics_manyPmJobs() throws Exception {
477         // Filter PM reports and sent to many jobs over Kafka
478
479         this.applicationConfig.setZipOutput(false);
480
481         // Register producer, Register types
482         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
483         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
484
485         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
486         final int NO_OF_COUNTERS = 2;
487         for (int i = 0; i < NO_OF_COUNTERS; ++i) {
488             filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i);
489         }
490
491         final int NO_OF_JOBS = 150;
492
493         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
494         for (int i = 0; i < NO_OF_JOBS; ++i) {
495             final String outputTopic = "manyJobs_" + i;
496             this.icsSimulatorController.addJob(
497                     consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
498                     outputTopic, restClient());
499             KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null);
500             receivers.add(receiver);
501         }
502
503         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
504         waitForKafkaListener();
505
506         final int NO_OF_OBJECTS = 1000;
507
508         Instant startTime = Instant.now();
509
510         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
511
512         DataStore fileStore = dataStore();
513
514         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
515         fileStore.create(DataStore.Bucket.FILES).block();
516         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
517
518         String eventAsString = newFileEvent(FILE_NAME);
519         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
520         sendDataToKafka(dataToSend);
521
522         logger.info("sleeping {}", kafkaReceiver.count);
523         while (receivers.get(0).count < NO_OF_OBJECTS) {
524             if (kafkaReceiver.count > 0) {
525                 logger.info("sleeping {}", receivers.get(0).count);
526             }
527
528             Thread.sleep(1000 * 1);
529         }
530
531         printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
532
533         for (KafkaReceiver receiver : receivers) {
534             if (receiver.count != NO_OF_OBJECTS) {
535                 System.out.println("** Unexpected no of jobs: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
536             }
537         }
538     }
539
540     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
541     @Test
542     void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception {
543         // Filter PM reports and sent to many jobs over Kafka
544
545         this.applicationConfig.setZipOutput(false);
546
547         // Register producer, Register types
548         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
549         assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
550
551         final int NO_OF_JOBS = 150;
552         ArrayList<KafkaReceiver> receivers = new ArrayList<>();
553         for (int i = 0; i < NO_OF_JOBS; ++i) {
554             final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
555             final String jobId = "manyJobs_" + i;
556             PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
557             filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
558
559             this.icsSimulatorController.addJob(
560                     consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
561                     jobId, restClient());
562
563             KafkaReceiver receiver =
564                     new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
565             receivers.add(receiver);
566         }
567
568         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
569         waitForKafkaListener();
570
571         final int NO_OF_OBJECTS = 1000;
572
573         Instant startTime = Instant.now();
574
575         final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
576
577         DataStore fileStore = dataStore();
578
579         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
580         fileStore.create(DataStore.Bucket.FILES).block();
581         fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
582
583         String eventAsString = newFileEvent(FILE_NAME);
584         var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
585         sendDataToKafka(dataToSend);
586
587         logger.info("sleeping {}", kafkaReceiver.count);
588         for (KafkaReceiver receiver : receivers) {
589             while (receiver.count < NO_OF_OBJECTS) {
590                 if (kafkaReceiver.count > 0) {
591                     logger.info("sleeping {}", receiver.count);
592                 }
593
594                 Thread.sleep(1000 * 1);
595             }
596         }
597
598         printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
599
600         for (KafkaReceiver receiver : receivers) {
601             if (receiver.count != NO_OF_OBJECTS) {
602                 System.out.println("** Unexpected no of objects: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
603             }
604         }
605
606         Thread.sleep(1000 * 5);
607     }
608
609     private String newFileEvent(String fileName) {
610         NewFileEvent event = NewFileEvent.builder() //
611                 .filename(fileName) //
612                 .build();
613         return gson.toJson(event);
614     }
615
616     private DataStore dataStore() {
617         return DataStore.create(this.applicationConfig);
618     }
619
620     @Test
621     void testHistoricalData() throws Exception {
622         // test that it works to get already fetched data
623         final String JOB_ID = "testHistoricalData";
624
625         DataStore fileStore = dataStore();
626
627         fileStore.deleteBucket(DataStore.Bucket.FILES).block();
628         fileStore.create(DataStore.Bucket.FILES).block();
629         fileStore.create(DataStore.Bucket.LOCKS).block();
630
631         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
632                 "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block();
633
634         fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block();
635
636         await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
637
638         PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
639         filterData.getSourceNames().add("O-DU-1122");
640         filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
641
642         filterData.setPmRopEndTime(OffsetDateTime.now().toString());
643
644         this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
645                 kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
646         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
647
648         await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive());
649     }
650
651 }