2 * ========================LICENSE_START=================================
5 * Copyright (C) 2023 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.pmproducer;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
26 import com.google.gson.JsonParser;
28 import java.io.IOException;
29 import java.lang.invoke.MethodHandles;
30 import java.nio.file.Path;
31 import java.time.Instant;
32 import java.time.OffsetDateTime;
33 import java.util.ArrayList;
34 import java.util.HashMap;
37 import lombok.Builder;
39 import org.apache.kafka.clients.producer.ProducerConfig;
40 import org.apache.kafka.clients.producer.ProducerRecord;
41 import org.apache.kafka.common.serialization.ByteArraySerializer;
42 import org.junit.jupiter.api.AfterEach;
43 import org.junit.jupiter.api.BeforeEach;
44 import org.junit.jupiter.api.Test;
45 import org.oran.pmproducer.clients.AsyncRestClient;
46 import org.oran.pmproducer.clients.AsyncRestClientFactory;
47 import org.oran.pmproducer.configuration.ApplicationConfig;
48 import org.oran.pmproducer.configuration.WebClientConfig;
49 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
50 import org.oran.pmproducer.controllers.ProducerCallbacksController;
51 import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection;
52 import org.oran.pmproducer.datastore.DataStore;
53 import org.oran.pmproducer.filter.PmReportFilter;
54 import org.oran.pmproducer.oauth2.SecurityContext;
55 import org.oran.pmproducer.r1.ConsumerJobInfo;
56 import org.oran.pmproducer.repository.InfoType;
57 import org.oran.pmproducer.repository.InfoTypes;
58 import org.oran.pmproducer.repository.Job;
59 import org.oran.pmproducer.repository.Job.Statistics;
60 import org.oran.pmproducer.repository.Jobs;
61 import org.oran.pmproducer.tasks.NewFileEvent;
62 import org.oran.pmproducer.tasks.TopicListener;
63 import org.oran.pmproducer.tasks.TopicListeners;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import org.springframework.beans.factory.annotation.Autowired;
67 import org.springframework.boot.test.context.SpringBootTest;
68 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
69 import org.springframework.boot.test.context.TestConfiguration;
70 import org.springframework.boot.test.web.server.LocalServerPort;
71 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
72 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
73 import org.springframework.context.annotation.Bean;
74 import org.springframework.test.context.TestPropertySource;
76 import reactor.core.publisher.Flux;
77 import reactor.kafka.sender.KafkaSender;
78 import reactor.kafka.sender.SenderOptions;
79 import reactor.kafka.sender.SenderRecord;
81 @SuppressWarnings("java:S3577") // Rename class
82 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
83 @TestPropertySource(properties = { //
84 "server.ssl.key-store=./config/keystore.jks", //
85 "app.webclient.trust-store=./config/truststore.jks", //
86 "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
87 "app.pm-files-path=./src/test/resources/", //
88 "app.s3.locksBucket=ropfilelocks", //
89 "app.pm-files-path=/tmp/pmproducer", //
90 "app.s3.bucket=pmproducertest", //
91 "app.auth-token-file=src/test/resources/jwtToken.b64", //
92 "app.kafka.use-oath-token=false"}) //
94 * Tests interwork with Kafka and Minio
95 * Requires that Kafka and ICS is started.
97 class IntegrationWithKafka {
99 final static String PM_TYPE_ID = "PmDataOverKafka";
102 private ApplicationConfig applicationConfig;
108 private InfoTypes types;
111 private IcsSimulatorController icsSimulatorController;
114 private TopicListeners topicListeners;
117 private SecurityContext securityContext;
119 private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
121 private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
124 int localServerHttpPort;
126 static class TestApplicationConfig extends ApplicationConfig {
128 public String getIcsBaseUrl() {
129 return thisProcessUrl();
133 public String getSelfUrl() {
134 return thisProcessUrl();
137 private String thisProcessUrl() {
138 final String url = "https://localhost:" + getLocalServerHttpPort();
144 * Overrides the BeanFactory.
147 static class TestBeanFactory extends BeanFactory {
151 public ServletWebServerFactory servletContainer() {
152 return new TomcatServletWebServerFactory();
157 public ApplicationConfig getApplicationConfig() {
158 TestApplicationConfig cfg = new TestApplicationConfig();
163 private static class KafkaReceiver {
164 public final String OUTPUT_TOPIC;
165 private TopicListener.DataFromTopic receivedKafkaOutput;
166 private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
167 private final ApplicationConfig applicationConfig;
171 public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext,
173 this.applicationConfig = applicationConfig;
174 this.OUTPUT_TOPIC = outputTopic;
176 // Create a listener to the output topic. The TopicListener happens to be
177 // suitable for that,
178 InfoType type = InfoType.builder() //
179 .id("TestReceiver_" + outputTopic) //
180 .kafkaInputTopic(OUTPUT_TOPIC) //
183 TopicListener topicListener = new TopicListener(applicationConfig, type);
184 if (groupId != null) {
185 topicListener.setKafkaGroupId(groupId);
188 topicListener.getFlux() //
190 .doOnNext(this::set) //
191 .doFinally(sig -> logger.info("Finally " + sig)) //
195 private TopicListener.DataFromTopic unzip(TopicListener.DataFromTopic receivedKafkaOutput) {
196 if (this.applicationConfig.isZipOutput() != receivedKafkaOutput.isZipped()) {
197 logger.error("********* ERROR received zipped: {}, exp zipped: {}", receivedKafkaOutput.isZipped(),
198 this.applicationConfig.isZipOutput());
201 if (!receivedKafkaOutput.isZipped()) {
202 return receivedKafkaOutput;
205 byte[] unzipped = TopicListener.unzip(receivedKafkaOutput.value);
206 return new TopicListener.DataFromTopic("typeId", null, unzipped, receivedKafkaOutput.key);
207 } catch (IOException e) {
208 logger.error("********* ERROR ", e.getMessage());
213 private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
214 this.receivedKafkaOutput = receivedKafkaOutput;
216 if (logger.isDebugEnabled()) {
217 logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
218 logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
219 logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
224 this.receivedKafkaOutput = new TopicListener.DataFromTopic("", null, null, null);
229 private static KafkaReceiver kafkaReceiver;
230 private static KafkaReceiver kafkaReceiver2;
234 this.applicationConfig.setZipOutput(false);
236 if (kafkaReceiver == null) {
237 kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext, null);
238 kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext, null);
240 kafkaReceiver.reset();
241 kafkaReceiver2.reset();
243 DataStore fileStore = this.dataStore();
244 fileStore.create(DataStore.Bucket.FILES).block();
245 fileStore.create(DataStore.Bucket.LOCKS).block();
251 for (Job job : this.jobs.getAll()) {
252 this.icsSimulatorController.deleteJob(job.getId(), restClient());
254 await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
255 await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty());
257 this.icsSimulatorController.testResults.reset();
259 DataStore fileStore = dataStore();
260 fileStore.deleteBucket(DataStore.Bucket.FILES).block();
261 fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
264 private AsyncRestClient restClient(boolean useTrustValidation) {
265 WebClientConfig config = this.applicationConfig.getWebClientConfig();
266 HttpProxyConfig httpProxyConfig = HttpProxyConfig.builder() //
267 .httpProxyHost("") //
270 config = WebClientConfig.builder() //
271 .keyStoreType(config.getKeyStoreType()) //
272 .keyStorePassword(config.getKeyStorePassword()) //
273 .keyStore(config.getKeyStore()) //
274 .keyPassword(config.getKeyPassword()) //
275 .isTrustStoreUsed(useTrustValidation) //
276 .trustStore(config.getTrustStore()) //
277 .trustStorePassword(config.getTrustStorePassword()) //
278 .httpProxyConfig(httpProxyConfig).build();
280 AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
281 return restClientFactory.createRestClientNoHttpProxy(baseUrl());
284 private AsyncRestClient restClient() {
285 return restClient(false);
288 private String baseUrl() {
289 return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
292 private static Object jsonObject(String json) {
294 return JsonParser.parseString(json).getAsJsonObject();
295 } catch (Exception e) {
296 throw new NullPointerException(e.toString());
300 public static ConsumerJobInfo consumerJobInfoKafka(String kafkaBootstrapServers, String topic,
301 PmReportFilter.FilterData filterData) {
303 Job.Parameters.KafkaDeliveryInfo deliveryInfo = Job.Parameters.KafkaDeliveryInfo.builder() //
305 .bootStrapServers(kafkaBootstrapServers) //
307 Job.Parameters param = Job.Parameters.builder() //
308 .filter(filterData) //
309 .deliveryInfo(deliveryInfo) //
312 String str = gson.toJson(param);
313 Object parametersObj = jsonObject(str);
315 return new ConsumerJobInfo(PM_TYPE_ID, parametersObj, "owner", "");
316 } catch (Exception e) {
321 private SenderOptions<byte[], byte[]> kafkaSenderOptions() {
322 String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
324 Map<String, Object> props = new HashMap<>();
325 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
326 // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
327 props.put(ProducerConfig.ACKS_CONFIG, "all");
328 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
329 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
332 this.applicationConfig.addKafkaSecurityProps(props);
334 return SenderOptions.create(props);
337 private SenderRecord<byte[], byte[], Integer> kafkaSenderRecord(String data, String key, String typeId) {
338 final InfoType infoType = this.types.get(typeId);
339 int correlationMetadata = 2;
340 return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key.getBytes(), data.getBytes()),
341 correlationMetadata);
344 private void sendDataToKafka(Flux<SenderRecord<byte[], byte[], Integer>> dataToSend) {
345 final KafkaSender<byte[], byte[]> sender = KafkaSender.create(kafkaSenderOptions());
347 sender.send(dataToSend) //
348 .doOnError(e -> logger.error("Send failed", e)) //
354 @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
355 private static void waitForKafkaListener() throws InterruptedException {
359 private StatisticsCollection getStatistics() {
360 String targetUri = baseUrl() + ProducerCallbacksController.STATISTICS_URL;
361 String statsResp = restClient().get(targetUri).block();
362 StatisticsCollection stats = gson.fromJson(statsResp, StatisticsCollection.class);
367 static class CharacteristicsResult {
368 long noOfFilesPerSecond;
370 long noOfSentGigaBytes;
371 long noOfSentObjects;
373 long noOfReceivedFiles;
374 long noOfReceivedBytes;
375 long noOfSubscribers;
380 private CharacteristicsResult getCharacteristicsResult(Instant startTime) {
381 final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
382 StatisticsCollection stats = getStatistics();
383 long noOfSentBytes = 0;
384 long noOfSentObjs = 0;
385 for (Statistics s : stats.jobStatistics) {
386 noOfSentBytes += s.getNoOfSentBytes();
387 noOfSentObjs += s.getNoOfSentObjects();
390 Statistics oneJobsStats = stats.jobStatistics.iterator().next();
392 return CharacteristicsResult.builder() //
393 .noOfSentBytes(noOfSentBytes) //
394 .noOfSentObjects(noOfSentObjs) //
395 .noOfSentGigaBytes(noOfSentBytes / (1024 * 1024)) //
396 .noOfSubscribers(stats.jobStatistics.size()) //
397 .zipOutput(this.applicationConfig.isZipOutput()) //
398 .noOfFilesPerSecond((oneJobsStats.getNoOfReceivedObjects() * 1000) / durationMs) //
399 .noOfReceivedBytes(oneJobsStats.getNoOfReceivedBytes()) //
400 .inputFileSize(oneJobsStats.getNoOfReceivedBytes() / oneJobsStats.getNoOfReceivedObjects()) //
401 .noOfReceivedFiles(oneJobsStats.getNoOfReceivedObjects()) //
402 .sizeOfSentObj(oneJobsStats.getNoOfSentBytes() / oneJobsStats.getNoOfSentObjects()) //
406 private void printCharacteristicsResult(String str, Instant startTime, int noOfIterations) {
407 final long durationMs = Instant.now().toEpochMilli() - startTime.toEpochMilli();
408 logger.info("*** {} Duration ({} ms), objects/second: {}", str, durationMs,
409 (noOfIterations * 1000) / durationMs);
411 System.out.println("--------------");
412 System.out.println(gson.toJson(getCharacteristicsResult(startTime)));
413 System.out.println("--------------");
417 @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
419 void kafkaCharacteristics_pmFilter_s3() throws Exception {
420 // Filter PM reports and sent to two jobs over Kafka
422 final String JOB_ID = "kafkaCharacteristics";
423 final String JOB_ID2 = "kafkaCharacteristics2";
425 // Register producer, Register types
426 await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
427 assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
429 PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
431 filterData.addMeasTypes("NRCellCU", "pmCounterNumber0");
433 this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
434 kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
435 this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
436 kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2, restClient());
438 await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
439 waitForKafkaListener();
441 final int NO_OF_OBJECTS = 10;
443 Instant startTime = Instant.now();
445 final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
447 DataStore fileStore = dataStore();
449 fileStore.create(DataStore.Bucket.FILES).block();
450 fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
452 String eventAsString = newFileEvent(FILE_NAME);
453 var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
454 sendDataToKafka(dataToSend);
456 while (kafkaReceiver.count < NO_OF_OBJECTS) {
457 logger.info("sleeping {}", kafkaReceiver.count);
458 Thread.sleep(1000 * 1);
461 String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
462 assertThat(msgString)
463 .contains("pmCounterNumber0")
464 .doesNotContain("pmCounterNumber1");
465 assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
466 assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
469 printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
470 logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
472 assertThat(kafkaReceiver.count).isEqualTo(NO_OF_OBJECTS);
475 @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
477 void kafkaCharacteristics_manyPmJobs() throws Exception {
478 // Filter PM reports and sent to many jobs over Kafka
480 this.applicationConfig.setZipOutput(false);
482 // Register producer, Register types
483 await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
484 assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
486 PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
487 final int NO_OF_COUNTERS = 2;
488 for (int i = 0; i < NO_OF_COUNTERS; ++i) {
489 filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i);
492 final int NO_OF_JOBS = 150;
494 ArrayList<KafkaReceiver> receivers = new ArrayList<>();
495 for (int i = 0; i < NO_OF_JOBS; ++i) {
496 final String outputTopic = "manyJobs_" + i;
497 this.icsSimulatorController.addJob(
498 consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
499 outputTopic, restClient());
500 KafkaReceiver receiver = new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, null);
501 receivers.add(receiver);
504 await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
505 waitForKafkaListener();
507 final int NO_OF_OBJECTS = 1000;
509 Instant startTime = Instant.now();
511 final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
513 DataStore fileStore = dataStore();
515 fileStore.deleteBucket(DataStore.Bucket.FILES).block();
516 fileStore.create(DataStore.Bucket.FILES).block();
517 fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
519 String eventAsString = newFileEvent(FILE_NAME);
520 var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
521 sendDataToKafka(dataToSend);
523 logger.info("sleeping {}", kafkaReceiver.count);
524 while (receivers.get(0).count < NO_OF_OBJECTS) {
525 if (kafkaReceiver.count > 0) {
526 logger.info("sleeping {}", receivers.get(0).count);
529 Thread.sleep(1000 * 1);
532 printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
534 for (KafkaReceiver receiver : receivers) {
535 if (receiver.count != NO_OF_OBJECTS) {
536 System.out.println("** Unexpected no of jobs: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
541 @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
543 void kafkaCharacteristics_manyPmJobs_sharedTopic() throws Exception {
544 // Filter PM reports and sent to many jobs over Kafka
546 this.applicationConfig.setZipOutput(false);
548 // Register producer, Register types
549 await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
550 assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
552 final int NO_OF_JOBS = 150;
553 ArrayList<KafkaReceiver> receivers = new ArrayList<>();
554 for (int i = 0; i < NO_OF_JOBS; ++i) {
555 final String outputTopic = "kafkaCharacteristics_onePmJobs_manyReceivers";
556 final String jobId = "manyJobs_" + i;
557 PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
558 filterData.addMeasTypes("NRCellCU", "pmCounterNumber" + i); // all counters will be added
560 this.icsSimulatorController.addJob(
561 consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(), outputTopic, filterData),
562 jobId, restClient());
564 KafkaReceiver receiver =
565 new KafkaReceiver(this.applicationConfig, outputTopic, this.securityContext, "group_" + i);
566 receivers.add(receiver);
569 await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
570 waitForKafkaListener();
572 final int NO_OF_OBJECTS = 1000;
574 Instant startTime = Instant.now();
576 final String FILE_NAME = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json.gz";
578 DataStore fileStore = dataStore();
580 fileStore.deleteBucket(DataStore.Bucket.FILES).block();
581 fileStore.create(DataStore.Bucket.FILES).block();
582 fileStore.copyFileTo(Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
584 String eventAsString = newFileEvent(FILE_NAME);
585 var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
586 sendDataToKafka(dataToSend);
588 logger.info("sleeping {}", kafkaReceiver.count);
589 for (KafkaReceiver receiver : receivers) {
590 while (receiver.count < NO_OF_OBJECTS) {
591 if (kafkaReceiver.count > 0) {
592 logger.info("sleeping {}", receiver.count);
595 Thread.sleep(1000 * 1);
599 printCharacteristicsResult("kafkaCharacteristics_manyPmJobs", startTime, NO_OF_OBJECTS);
601 for (KafkaReceiver receiver : receivers) {
602 if (receiver.count != NO_OF_OBJECTS) {
603 System.out.println("** Unexpected no of objects: " + receiver.OUTPUT_TOPIC + " " + receiver.count);
607 Thread.sleep(1000 * 5);
610 private String newFileEvent(String fileName) {
611 NewFileEvent event = NewFileEvent.builder() //
612 .filename(fileName) //
614 return gson.toJson(event);
617 private DataStore dataStore() {
618 return DataStore.create(this.applicationConfig);
622 void testHistoricalData() throws Exception {
623 // test that it works to get already fetched data
624 final String JOB_ID = "testHistoricalData";
626 DataStore fileStore = dataStore();
628 fileStore.deleteBucket(DataStore.Bucket.FILES).block();
629 fileStore.create(DataStore.Bucket.FILES).block();
630 fileStore.create(DataStore.Bucket.LOCKS).block();
632 fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
633 "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.json").block();
635 fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), "OTHER_SOURCENAME/test.json").block();
637 await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
639 PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
640 filterData.getSourceNames().add("O-DU-1122");
641 filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
643 filterData.setPmRopEndTime(OffsetDateTime.now().toString());
645 this.icsSimulatorController.addJob(consumerJobInfoKafka(this.applicationConfig.getKafkaBootStrapServers(),
646 kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, restClient());
647 await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
649 await().untilAsserted(() -> assertThat(kafkaReceiver.count).isPositive());