Improve Test coverage of InfluxLogger
[nonrtric/plt/ranpm.git] / influxlogger / src / test / java / org / oran / pmlog / Integration.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmlog;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24
25 import java.nio.charset.Charset;
26 import java.nio.file.Files;
27 import java.nio.file.Path;
28 import java.util.HashMap;
29 import java.util.Map;
30
31 import org.apache.kafka.clients.producer.ProducerConfig;
32 import org.apache.kafka.clients.producer.ProducerRecord;
33 import org.apache.kafka.common.serialization.ByteArraySerializer;
34 import org.junit.jupiter.api.AfterEach;
35 import org.junit.jupiter.api.BeforeEach;
36 import org.junit.jupiter.api.Test;
37 import org.oran.pmlog.configuration.ApplicationConfig;
38 import org.oran.pmlog.configuration.ConsumerJobInfo;
39 import org.oran.pmlog.configuration.ConsumerJobInfo.PmFilterData;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.boot.test.context.SpringBootTest;
44 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
45 import org.springframework.boot.test.context.TestConfiguration;
46 import org.springframework.boot.test.web.server.LocalServerPort;
47 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
48 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
49 import org.springframework.context.annotation.Bean;
50 import org.springframework.test.context.TestPropertySource;
51
52 import reactor.core.publisher.Flux;
53 import reactor.kafka.sender.KafkaSender;
54 import reactor.kafka.sender.SenderOptions;
55 import reactor.kafka.sender.SenderRecord;
56
57 @SuppressWarnings("java:S3577") // Rename class
58 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
59 @TestPropertySource(properties = { //
60         "server.ssl.key-store=./config/keystore.jks", //
61         "app.webclient.trust-store=./config/truststore.jks", //
62         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
63         "app.pm-files-path=./src/test/resources/", //
64         "app.auth-token-file=src/test/resources/jwtToken.b64", //
65         "app.kafka.use-oath-token=false" //
66 }) //
67 class Integration {
68
69     @Autowired
70     private ApplicationConfig applicationConfig;
71
72     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
73
74     private final Logger logger = LoggerFactory.getLogger(Integration.class);
75
76     @LocalServerPort
77     int localServerHttpPort;
78
79     static class TestApplicationConfig extends ApplicationConfig {
80         String thisProcessUrl() {
81             final String url = "https://localhost:" + getLocalServerHttpsPort();
82             return url;
83         }
84     }
85
86     /**
87      * Overrides the BeanFactory.
88      */
89     @TestConfiguration
90     static class TestBeanFactory extends BeanFactory {
91
92         @Override
93         @Bean
94         public ServletWebServerFactory servletContainer() {
95             return new TomcatServletWebServerFactory();
96         }
97
98         @Override
99         @Bean
100         public ApplicationConfig getApplicationConfig() {
101             TestApplicationConfig cfg = new TestApplicationConfig();
102             return cfg;
103         }
104     }
105
106     @BeforeEach
107     void init() {}
108
109     @AfterEach
110     void reset() {}
111
112     private SenderOptions<byte[], byte[]> kafkaSenderOptions() {
113         String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
114
115         Map<String, Object> props = new HashMap<>();
116         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
117         // props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
118         props.put(ProducerConfig.ACKS_CONFIG, "all");
119         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
120         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
121         this.applicationConfig.addKafkaSecurityProps(props);
122         return SenderOptions.create(props);
123     }
124
125     private SenderRecord<byte[], byte[], Integer> kafkaSenderRecord(String data, String key) {
126         String topic = this.applicationConfig.getKafkaInputTopic();
127         int correlationMetadata = 2;
128         return SenderRecord.create(new ProducerRecord<>(topic, key.getBytes(), data.getBytes()), correlationMetadata);
129     }
130
131     private void sendDataToKafka(Flux<SenderRecord<byte[], byte[], Integer>> dataToSend) {
132         final KafkaSender<byte[], byte[]> sender = KafkaSender.create(kafkaSenderOptions());
133
134         sender.send(dataToSend) //
135                 .doOnError(e -> logger.error("Send failed", e)) //
136                 .blockLast();
137
138         sender.close();
139     }
140
141     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
142     private static void waitForKafkaListener() throws InterruptedException {
143         Thread.sleep(4000);
144     }
145
146     private String generateCounterValue(int sequenceValue, int noOfObjects, String counterName, String resourceFdn) {
147         long value = (random.nextInt() % 100) * sequenceValue + (counterName.hashCode() % 5000);
148         return Long.toString(value);
149     }
150
151     static java.util.Random random = new java.util.Random(System.currentTimeMillis());
152
153     private long currentEpochMicroSeconds() {
154         return java.util.concurrent.TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
155     }
156
157     private String measType(PmReport.MeasResult measResult, PmReport.MeasInfoList measInfoList) {
158         return measInfoList.getMeasTypes().getMeasType(measResult.getP());
159     }
160
161     // Set end time. Now - (GP * sequenceValue)
162     private void setEndTime(PmReport report, int sequenceValue, int noOfObjects) {
163         long gpMicro = report.event.getPerf3gppFields().getMeasDataCollection().getGranularityPeriod() * 1000 * 1000;
164         long endTime = currentEpochMicroSeconds() - ((noOfObjects - sequenceValue - 1) * gpMicro);
165         report.event.getCommonEventHeader().setLastEpochMicrosec(endTime);
166
167     }
168
169     final String PM_REPORT_FILE_BIG = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
170     final String PM_REPORT_FILE = "./src/test/resources/pm_report.json";
171
172     /**
173      * Generate a PM report
174      * 
175      * @param sequenceValue used for generating counter values
176      * @param noOfObjects used for generating counter values
177      * @return
178      */
179     String pmReport(int sequenceValue, int noOfObjects) {
180         try {
181             String str = Files.readString(Path.of(PM_REPORT_FILE), Charset.defaultCharset());
182             PmReport report = gson.fromJson(str, PmReport.class);
183             PmReport.MeasDataCollection measDataCollection = report.event.getPerf3gppFields().getMeasDataCollection();
184
185             setEndTime(report, sequenceValue, noOfObjects);
186
187             // Fill it with generated values
188             for (PmReport.MeasInfoList measInfoList : measDataCollection.getMeasInfoList()) {
189                 for (PmReport.MeasValuesList measValueList : measInfoList.getMeasValuesList()) {
190                     for (PmReport.MeasResult measResult : measValueList.getMeasResults()) {
191                         String value = this.generateCounterValue(sequenceValue, noOfObjects,
192                                 measType(measResult, measInfoList), report.fullDistinguishedName(measValueList));
193                         measResult.setSValue(value);
194                     }
195                 }
196             }
197             return gson.toJson(report);
198         } catch (Exception e) {
199             logger.error("Could not loadPM report {}", e.getMessage(), e);
200             return null;
201         }
202
203     }
204
205     // Store PM data for 24 hours in influx. The data contains genenerated
206     // counter values
207     // that varies over time.
208     @SuppressWarnings("java:S2699")
209     @Test
210     void testStoreReportsInflux() throws Exception {
211         final int NO_OF_OBJECTS = 24 * 4;
212         InfluxStore influxStore = new InfluxStore(this.applicationConfig);
213
214         Flux<DataFromKafkaTopic> input = Flux.range(0, NO_OF_OBJECTS) //
215                 .map(i -> pmReport(i, NO_OF_OBJECTS)) //
216                 .map(str -> new DataFromKafkaTopic(null, null, str.getBytes()));
217
218         influxStore.start(input);
219     }
220
221     @SuppressWarnings({"squid:S2925", "java:S2699"}) // "Thread.sleep" should not be used in tests.
222     @Test
223     void sendPmReportsThroughKafka() throws Exception {
224         waitForKafkaListener();
225
226         final int NO_OF_OBJECTS = 20;
227
228         var dataToSend = Flux.range(0, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(pmReport(i, NO_OF_OBJECTS), "key"));
229         sendDataToKafka(dataToSend);
230     }
231
232     @Test
233     void printConfiguration() {
234         PmFilterData f = new PmFilterData();
235         f.getMeasObjInstIds().add("measObj");
236         PmFilterData.MeasTypeSpec spec = new PmFilterData.MeasTypeSpec();
237         spec.setMeasuredObjClass("measuredObjClass");
238         spec.getMeasTypes().add("measType");
239         f.getMeasTypeSpecs().add(spec);
240         f.getSourceNames().add("sourceName");
241         ConsumerJobInfo.KafkaDeliveryInfo deliveryInfo = ConsumerJobInfo.KafkaDeliveryInfo.builder() //
242                 .topic("topic").bootStrapServers("bootStrapServers") //
243                 .build();
244         ConsumerJobInfo.PmJobParameters params = ConsumerJobInfo.PmJobParameters.builder() //
245                 .filter(f) //
246                 .deliveryInfo(deliveryInfo).build();
247
248         ConsumerJobInfo info = new ConsumerJobInfo("type", params, "owner");
249         String str = gson.toJson(info);
250         System.out.print(str);
251         assertEquals("type", info.infoTypeId);
252     }
253
254 }