Testing Gerrit Code Review Process
[nonrtric/plt/ranpm.git] / datafilecollector / src / test / java / org / oran / datafile / MockDatafile.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020-2023 NordixFoundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.datafile;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.when;
27
28 import com.google.gson.Gson;
29 import com.google.gson.GsonBuilder;
30
31 import java.io.IOException;
32 import java.nio.file.Files;
33 import java.nio.file.Path;
34 import java.nio.file.StandardCopyOption;
35 import java.time.Duration;
36 import java.time.Instant;
37 import java.util.ArrayList;
38 import java.util.List;
39
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.oran.datafile.configuration.AppConfig;
44 import org.oran.datafile.datastore.DataStore;
45 import org.oran.datafile.datastore.DataStore.Bucket;
46 import org.oran.datafile.model.Counters;
47 import org.oran.datafile.model.FileData;
48 import org.oran.datafile.model.FilePublishInformation;
49 import org.oran.datafile.model.FileReadyMessage;
50 import org.oran.datafile.model.FileReadyMessage.MessageMetaData;
51 import org.oran.datafile.oauth2.SecurityContext;
52 import org.oran.datafile.tasks.CollectAndReportFiles;
53 import org.oran.datafile.tasks.FileCollector;
54 import org.oran.datafile.tasks.KafkaTopicListener;
55 import org.oran.datafile.tasks.KafkaTopicListener.DataFromTopic;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import org.springframework.beans.factory.annotation.Autowired;
59 import org.springframework.boot.test.context.SpringBootTest;
60 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
61 import org.springframework.boot.test.context.TestConfiguration;
62 import org.springframework.boot.test.web.server.LocalServerPort;
63 import org.springframework.context.annotation.Bean;
64 import org.springframework.test.context.TestPropertySource;
65 import org.springframework.util.FileSystemUtils;
66
67 import reactor.core.publisher.Flux;
68 import reactor.core.publisher.Mono;
69
70 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
71 @TestPropertySource(
72     properties = { //
73         "app.ssl.key-store-password-file=./config/ftps_keystore.pass", //
74         "app.ssl.key-store=./config/ftps_keystore.p12", //
75         "app.ssl.trust-store-password-file=./config/truststore.pass", //
76         "app.ssl.trust-store=", // No trust validation
77         "app.collected-files-path=/tmp/osc_datafile/", //
78         "logging.file.name=/tmp/datafile.log", //
79         "spring.main.allow-bean-definition-overriding=true", //
80         "app.s3.endpointOverride=http://localhost:9000", //
81         "app.s3.accessKeyId=minio", //
82         "app.s3.secretAccessKey=miniostorage", //
83         "app.s3.bucket=ropfiles", //
84         "app.s3.locksBucket=locks", //
85         "app.auth-token-file=src/test/resources/jwtToken.b64", //
86         "app.kafka.use-oath-token=false"})
87 @SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
88 class MockDatafile {
89
90     private static final int LAST_EPOCH_MICROSEC = 151983;
91     private static final String SOURCE_NAME = "5GRAN_DU";
92     private static final int START_EPOCH_MICROSEC = 15198378;
93     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
94     private static final String PM_FILE_NAME = "PM_FILE_NAME";
95
96     // This can be any downloadable file on the net
97     private static final String FTPES_LOCATION =
98         "ftpes:// onap:pano@ftp-ftpes-6:2021/A20000626.2315+0200-2330+0200_GNODEB-15-4.xml.gz";
99     private static final String LOCATION =
100         "https://launchpad.net/ubuntu/+source/perf-tools-unstable/1.0+git7ffb3fd-1ubuntu1/+build/13630748/+files/perf-tools-unstable_1.0+git7ffb3fd-1ubuntu1_all.deb";
101     private static final String GZIP_COMPRESSION = "gzip";
102     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
103     private static final String FILE_FORMAT_VERSION = "V10";
104     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
105     private static final String CHANGE_TYPE = "FileReady";
106
107     private static final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
108     private static Gson gson = new GsonBuilder() //
109         .disableHtmlEscaping() //
110         .create(); //
111
112     @LocalServerPort
113     private int port;
114
115     @Autowired
116     AppConfig appConfig;
117
118     @Autowired
119     CollectAndReportFiles scheduledTask;
120
121     static final SecurityContext securityContext = new SecurityContext("");
122
123     private static KafkaReceiver kafkaReceiver;
124
125     private static class KafkaReceiver {
126         public final String topic;
127         private DataFromTopic receivedKafkaOutput;
128         private final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
129
130         int count = 0;
131
132         public KafkaReceiver(AppConfig applicationConfig, String outputTopic) {
133             this.topic = outputTopic;
134
135             // Create a listener to the output topic. The KafkaTopicListener happens to be
136             // suitable for that,
137             AppConfig config = spy(applicationConfig);
138             when(config.getKafkaClientId()).thenReturn("MockDatafile");
139             when(config.getInputTopic()).thenReturn(outputTopic);
140             KafkaTopicListener topicListener = new KafkaTopicListener(config);
141
142             topicListener.getFlux() //
143                 .doOnNext(this::set) //
144                 .doFinally(sig -> logger.info("Finally " + sig)) //
145                 .subscribe();
146         }
147
148         private void set(DataFromTopic receivedKafkaOutput) {
149             this.receivedKafkaOutput = receivedKafkaOutput;
150             this.count++;
151             logger.debug("*** received {}, {}", topic, receivedKafkaOutput);
152         }
153
154         public synchronized String lastKey() {
155             return this.receivedKafkaOutput.key;
156         }
157
158         public synchronized String lastValue() {
159             return this.receivedKafkaOutput.value;
160         }
161
162         public void reset() {
163             count = 0;
164             this.receivedKafkaOutput = new DataFromTopic("", "");
165         }
166     }
167
168     static class FileCollectorMock extends FileCollector {
169         final AppConfig appConfig;
170
171         public FileCollectorMock(AppConfig appConfig) {
172             super(securityContext, appConfig, new Counters());
173             this.appConfig = appConfig;
174         }
175
176         @Override // (override fetchFile to disable the actual file fetching)
177         public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
178             FileCollector fc = new FileCollector(securityContext, this.appConfig, new Counters());
179             FilePublishInformation i = fc.createFilePublishInformation(fileData);
180
181             try {
182                 Path from = Path.of("config/application.yaml");
183                 Path to = Path.of(appConfig.getCollectedFilesPath(), fileData.name());
184                 Files.createDirectories(to.getParent());
185                 Files.copy(from, to, StandardCopyOption.REPLACE_EXISTING);
186             } catch (Exception e) {
187                 logger.error("Could not copy file {}", e.getMessage());
188             }
189             return Mono.just(i);
190         }
191     }
192
193     static class CollectAndReportFilesMock extends CollectAndReportFiles {
194         final AppConfig appConfig;
195
196         public CollectAndReportFilesMock(AppConfig appConfig) {
197             super(securityContext, appConfig);
198             this.appConfig = appConfig;
199         }
200
201         @Override // (override fetchFile to disable the actual file fetching)
202         protected FileCollector createFileCollector() {
203             return new FileCollectorMock(appConfig);
204         }
205     }
206
207     @TestConfiguration
208     static class TestBeanFactory {
209
210         @Bean
211         CollectAndReportFiles collectAndReportFiles(@Autowired AppConfig appConfig) {
212             return new CollectAndReportFilesMock(appConfig);
213         }
214     }
215
216     @BeforeEach
217     void init() {
218         if (kafkaReceiver == null) {
219             kafkaReceiver = new KafkaReceiver(appConfig, appConfig.getCollectedFileTopic());
220         }
221         kafkaReceiver.reset();
222         deleteAllFiles();
223     }
224
225     @AfterEach
226     void afterEach() {
227         DataStore store = DataStore.create(this.appConfig);
228         store.deleteBucket(Bucket.FILES).block();
229         store.deleteBucket(Bucket.LOCKS).block();
230         deleteAllFiles();
231
232     }
233
234     private void deleteAllFiles() {
235         try {
236             FileSystemUtils.deleteRecursively(Path.of(appConfig.getCollectedFilesPath()));
237         } catch (IOException e) {
238         }
239     }
240
241     @Test
242     void clear() {
243
244     }
245
246     @Test
247     void testKafka() throws InterruptedException {
248         waitForKafkaListener();
249
250         this.scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", "junk").blockLast();
251
252         String fileReadyMessage = gson.toJson(fileReadyMessage());
253         this.scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", fileReadyMessage).blockLast();
254
255         await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
256         String rec = kafkaReceiver.lastValue();
257
258         assertThat(rec).contains("Ericsson");
259
260         FilePublishInformation recObj = gson.fromJson(rec, FilePublishInformation.class);
261
262         assertThat(recObj.getName()).isEqualTo(SOURCE_NAME + "/" + PM_FILE_NAME);
263     }
264
265     @Test
266     void testS3Concurrency() throws Exception {
267         waitForKafkaListener();
268
269         final int NO_OF_OBJECTS = 10;
270
271         Instant startTime = Instant.now();
272
273         Flux.range(1, NO_OF_OBJECTS) //
274             .map(i -> gson.toJson(fileReadyMessage("testS3Concurrency_" + i))) //
275             .flatMap(
276                 fileReadyMessage -> scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", fileReadyMessage)) //
277             .blockLast(); //
278
279         while (kafkaReceiver.count < NO_OF_OBJECTS) {
280             logger.info("sleeping {}", kafkaReceiver.count);
281             Thread.sleep(1000 * 1);
282         }
283
284         String rec = kafkaReceiver.lastValue();
285         assertThat(rec).contains("Ericsson");
286
287         final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
288         logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
289     }
290
291     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
292     private static void waitForKafkaListener() throws InterruptedException {
293         Thread.sleep(4000);
294     }
295
296     @Test
297     @SuppressWarnings("squid:S2699")
298     void runMock() throws Exception {
299         logger.warn("**************** Keeping server alive! " + this.port);
300         synchronized (this) {
301             this.wait();
302         }
303     }
304
305     FileReadyMessage.Event event(String fileName) {
306         MessageMetaData messageMetaData = MessageMetaData.builder() //
307             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
308             .sourceName(SOURCE_NAME) //
309             .startEpochMicrosec(START_EPOCH_MICROSEC) //
310             .timeZoneOffset(TIME_ZONE_OFFSET) //
311             .changeIdentifier(CHANGE_IDENTIFIER) //
312             .eventName("Noti_RnNode-Ericsson_FileReady").build();
313
314         FileReadyMessage.FileInfo fileInfo = FileReadyMessage.FileInfo //
315             .builder() //
316             .fileFormatType(FILE_FORMAT_TYPE) //
317             .location(LOCATION) //
318             .fileFormatVersion(FILE_FORMAT_VERSION) //
319             .compression(GZIP_COMPRESSION) //
320             .build();
321
322         FileReadyMessage.ArrayOfNamedHashMap arrayOfNamedHashMap = FileReadyMessage.ArrayOfNamedHashMap //
323             .builder() //
324             .name(fileName) //
325             .hashMap(fileInfo) //
326             .build();
327
328         List<FileReadyMessage.ArrayOfNamedHashMap> arrayOfNamedHashMapList = new ArrayList<>();
329         arrayOfNamedHashMapList.add(arrayOfNamedHashMap);
330
331         FileReadyMessage.NotificationFields notificationFields = FileReadyMessage.NotificationFields //
332             .builder().notificationFieldsVersion("notificationFieldsVersion") //
333             .changeType(CHANGE_TYPE).changeIdentifier(CHANGE_IDENTIFIER) //
334             .arrayOfNamedHashMap(arrayOfNamedHashMapList) //
335             .build();
336
337         return FileReadyMessage.Event.builder() //
338             .commonEventHeader(messageMetaData) //
339             .notificationFields(notificationFields).build();
340     }
341
342     private FileReadyMessage fileReadyMessage(String fileName) {
343         FileReadyMessage message = FileReadyMessage.builder() //
344             .event(event(fileName)) //
345             .build();
346         return message;
347     }
348
349     private FileReadyMessage fileReadyMessage() {
350         return fileReadyMessage(PM_FILE_NAME);
351     }
352
353 }