2 * ========================LICENSE_START=================================
5 * Copyright (C) 2023 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.pmlog;
23 import java.nio.charset.Charset;
24 import java.nio.file.Files;
25 import java.nio.file.Path;
26 import java.util.HashMap;
29 import org.apache.kafka.clients.producer.ProducerConfig;
30 import org.apache.kafka.clients.producer.ProducerRecord;
31 import org.apache.kafka.common.serialization.ByteArraySerializer;
32 import org.junit.jupiter.api.AfterEach;
33 import org.junit.jupiter.api.BeforeEach;
34 import org.junit.jupiter.api.Test;
35 import org.oran.pmlog.configuration.ApplicationConfig;
36 import org.oran.pmlog.configuration.ConsumerJobInfo;
37 import org.oran.pmlog.configuration.ConsumerJobInfo.PmFilterData;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.boot.test.context.SpringBootTest;
42 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
43 import org.springframework.boot.test.context.TestConfiguration;
44 import org.springframework.boot.test.web.server.LocalServerPort;
45 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
46 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
47 import org.springframework.context.annotation.Bean;
48 import org.springframework.test.context.TestPropertySource;
50 import reactor.core.publisher.Flux;
51 import reactor.kafka.sender.KafkaSender;
52 import reactor.kafka.sender.SenderOptions;
53 import reactor.kafka.sender.SenderRecord;
55 @SuppressWarnings("java:S3577") // Rename class
56 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
57 @TestPropertySource(properties = { //
58 "server.ssl.key-store=./config/keystore.jks", //
59 "app.webclient.trust-store=./config/truststore.jks", //
60 "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
61 "app.pm-files-path=./src/test/resources/", //
62 "app.auth-token-file=src/test/resources/jwtToken.b64", //
63 "app.kafka.use-oath-token=false" //
68 private ApplicationConfig applicationConfig;
70 private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
72 private final Logger logger = LoggerFactory.getLogger(Integration.class);
75 int localServerHttpPort;
77 static class TestApplicationConfig extends ApplicationConfig {
78 String thisProcessUrl() {
79 final String url = "https://localhost:" + getLocalServerHttpsPort();
85 * Overrides the BeanFactory.
88 static class TestBeanFactory extends BeanFactory {
92 public ServletWebServerFactory servletContainer() {
93 return new TomcatServletWebServerFactory();
98 public ApplicationConfig getApplicationConfig() {
99 TestApplicationConfig cfg = new TestApplicationConfig();
110 private SenderOptions<byte[], byte[]> kafkaSenderOptions() {
111 String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
113 Map<String, Object> props = new HashMap<>();
114 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
115 // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
116 props.put(ProducerConfig.ACKS_CONFIG, "all");
117 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
118 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
119 this.applicationConfig.addKafkaSecurityProps(props);
120 return SenderOptions.create(props);
123 private SenderRecord<byte[], byte[], Integer> kafkaSenderRecord(String data, String key) {
124 String topic = this.applicationConfig.getKafkaInputTopic();
125 int correlationMetadata = 2;
126 return SenderRecord.create(new ProducerRecord<>(topic, key.getBytes(), data.getBytes()), correlationMetadata);
129 private void sendDataToKafka(Flux<SenderRecord<byte[], byte[], Integer>> dataToSend) {
130 final KafkaSender<byte[], byte[]> sender = KafkaSender.create(kafkaSenderOptions());
132 sender.send(dataToSend) //
133 .doOnError(e -> logger.error("Send failed", e)) //
139 @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
140 private static void waitForKafkaListener() throws InterruptedException {
144 private String generateCounterValue(int sequenceValue, int noOfObjects, String counterName, String resourceFdn) {
145 long value = (random.nextInt() % 100) * sequenceValue + (counterName.hashCode() % 5000);
146 return Long.toString(value);
149 static java.util.Random random = new java.util.Random(System.currentTimeMillis());
151 private long currentEpochMicroSeconds() {
152 return java.util.concurrent.TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
155 private String measType(PmReport.MeasResult measResult, PmReport.MeasInfoList measInfoList) {
156 return measInfoList.getMeasTypes().getMeasType(measResult.getP());
159 // Set end time. Now - (GP * sequenceValue)
160 private void setEndTime(PmReport report, int sequenceValue, int noOfObjects) {
161 long gpMicro = report.event.getPerf3gppFields().getMeasDataCollection().getGranularityPeriod() * 1000 * 1000;
162 long endTime = currentEpochMicroSeconds() - ((noOfObjects - sequenceValue - 1) * gpMicro);
163 report.event.getCommonEventHeader().setLastEpochMicrosec(endTime);
167 final String PM_REPORT_FILE_BIG = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
168 final String PM_REPORT_FILE = "./src/test/resources/pm_report.json";
170 String pmReport(int sequenceValue, int noOfObjects) {
172 String str = Files.readString(Path.of(PM_REPORT_FILE), Charset.defaultCharset());
173 PmReport report = gson.fromJson(str, PmReport.class);
174 PmReport.MeasDataCollection measDataCollection = report.event.getPerf3gppFields().getMeasDataCollection();
176 setEndTime(report, sequenceValue, noOfObjects);
178 // Fill it with generated values
179 for (PmReport.MeasInfoList measInfoList : measDataCollection.getMeasInfoList()) {
180 for (PmReport.MeasValuesList measValueList : measInfoList.getMeasValuesList()) {
181 for (PmReport.MeasResult measResult : measValueList.getMeasResults()) {
182 String value = this.generateCounterValue(sequenceValue, noOfObjects,
183 measType(measResult, measInfoList), report.fullDistinguishedName(measValueList));
184 measResult.setSValue(value);
188 return gson.toJson(report);
189 } catch (Exception e) {
190 logger.error("Could not loadPM report {}", e.getMessage(), e);
197 void testStoreReportsInflux() throws Exception {
198 final int NO_OF_OBJECTS = 24 * 4;
199 InfluxStore influxStore = new InfluxStore(this.applicationConfig);
201 Flux<DataFromKafkaTopic> input = Flux.range(0, NO_OF_OBJECTS) //
202 .map(i -> pmReport(i, NO_OF_OBJECTS)) //
203 .map(str -> new DataFromKafkaTopic(null, null, str.getBytes()));
205 influxStore.start(input);
209 @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
211 void sendPmReportsThroughKafka() throws Exception {
212 waitForKafkaListener();
214 final int NO_OF_OBJECTS = 20;
216 var dataToSend = Flux.range(0, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(pmReport(i, NO_OF_OBJECTS), "key"));
217 sendDataToKafka(dataToSend);
219 Thread.sleep(1000 * 1000);
223 void printConfiguration() {
224 PmFilterData f = new PmFilterData();
225 f.getMeasObjInstIds().add("measObj");
226 PmFilterData.MeasTypeSpec spec = new PmFilterData.MeasTypeSpec();
227 spec.setMeasuredObjClass("measuredObjClass");
228 spec.getMeasTypes().add("measType");
229 f.getMeasTypeSpecs().add(spec);
230 f.getSourceNames().add("sourceName");
231 ConsumerJobInfo.KafkaDeliveryInfo deliveryInfo = ConsumerJobInfo.KafkaDeliveryInfo.builder() //
232 .topic("topic").bootStrapServers("bootStrapServers") //
234 ConsumerJobInfo.PmJobParameters params = ConsumerJobInfo.PmJobParameters.builder() //
236 .deliveryInfo(deliveryInfo).build();
238 ConsumerJobInfo info = new ConsumerJobInfo("type", params, "owner");
239 String str = gson.toJson(info);
240 System.out.print(str);
243 @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
245 void tet() throws Exception {
246 Thread.sleep(1000 * 1000);