Merge "Added support for using oauth token for Kafka"
[nonrtric/plt/ranpm.git] / datafilecollector / src / test / java / org / onap / dcaegen2 / collectors / datafile / MockDatafile.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25
26 import com.google.gson.Gson;
27 import com.google.gson.GsonBuilder;
28
29 import java.io.IOException;
30 import java.nio.file.Files;
31 import java.nio.file.Path;
32 import java.nio.file.StandardCopyOption;
33 import java.time.Duration;
34 import java.time.Instant;
35 import java.util.ArrayList;
36 import java.util.List;
37
38 import org.junit.jupiter.api.AfterEach;
39 import org.junit.jupiter.api.BeforeEach;
40 import org.junit.jupiter.api.Test;
41 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
42 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
43 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
44 import org.onap.dcaegen2.collectors.datafile.model.Counters;
45 import org.onap.dcaegen2.collectors.datafile.model.FileData;
46 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
47 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
48 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage.MessageMetaData;
49 import org.onap.dcaegen2.collectors.datafile.tasks.CollectAndReportFiles;
50 import org.onap.dcaegen2.collectors.datafile.tasks.FileCollector;
51 import org.onap.dcaegen2.collectors.datafile.tasks.KafkaTopicListener;
52 import org.onap.dcaegen2.collectors.datafile.tasks.KafkaTopicListener.DataFromTopic;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.springframework.beans.factory.annotation.Autowired;
56 import org.springframework.boot.test.context.SpringBootTest;
57 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
58 import org.springframework.boot.test.context.TestConfiguration;
59 import org.springframework.boot.test.web.server.LocalServerPort;
60 import org.springframework.context.annotation.Bean;
61 import org.springframework.test.context.TestPropertySource;
62 import org.springframework.util.FileSystemUtils;
63
64 import reactor.core.publisher.Flux;
65 import reactor.core.publisher.Mono;
66
67 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
68 @TestPropertySource(
69     properties = { //
70         "app.ssl.key-store-password-file=./config/ftps_keystore.pass", //
71         "app.ssl.key-store=./config/ftps_keystore.p12", //
72         "app.ssl.trust-store-password-file=./config/truststore.pass", //
73         "app.ssl.trust-store=", // No trust validation
74         "app.collected-files-path=/tmp/osc_datafile/", //
75         "logging.file.name=/tmp/datafile.log", //
76         "spring.main.allow-bean-definition-overriding=true", //
77         "app.s3.endpointOverride=http://localhost:9000", //
78         "app.s3.accessKeyId=minio", //
79         "app.s3.secretAccessKey=miniostorage", //
80         "app.s3.bucket=ropfiles", //
81         "app.s3.locksBucket=locks"})
82 @SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
83 class MockDatafile {
84
85     private static final int LAST_EPOCH_MICROSEC = 151983;
86     private static final String SOURCE_NAME = "5GRAN_DU";
87     private static final int START_EPOCH_MICROSEC = 15198378;
88     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
89     private static final String PM_FILE_NAME = "PM_FILE_NAME";
90
91     // This can be any downloadable file on the net
92     private static final String FTPES_LOCATION =
93         "ftpes:// onap:pano@ftp-ftpes-6:2021/A20000626.2315+0200-2330+0200_GNODEB-15-4.xml.gz";
94     private static final String LOCATION =
95         "https://launchpad.net/ubuntu/+source/perf-tools-unstable/1.0+git7ffb3fd-1ubuntu1/+build/13630748/+files/perf-tools-unstable_1.0+git7ffb3fd-1ubuntu1_all.deb";
96     private static final String GZIP_COMPRESSION = "gzip";
97     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
98     private static final String FILE_FORMAT_VERSION = "V10";
99     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
100     private static final String CHANGE_TYPE = "FileReady";
101
102     private static final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
103     private static Gson gson = new GsonBuilder() //
104         .disableHtmlEscaping() //
105         .create(); //
106
107     @LocalServerPort
108     private int port;
109
110     @Autowired
111     AppConfig appConfig;
112
113     @Autowired
114     CollectAndReportFiles scheduledTask;
115
116     private static KafkaReceiver kafkaReceiver;
117
118     private static class KafkaReceiver {
119         public final String topic;
120         private DataFromTopic receivedKafkaOutput;
121         private final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
122
123         int count = 0;
124
125         public KafkaReceiver(AppConfig applicationConfig, String outputTopic) {
126             this.topic = outputTopic;
127
128             // Create a listener to the output topic. The KafkaTopicListener happens to be
129             // suitable for that,
130
131             KafkaTopicListener topicListener =
132                 new KafkaTopicListener(applicationConfig.getKafkaBootStrapServers(), "MockDatafile", outputTopic);
133
134             topicListener.getFlux() //
135                 .doOnNext(this::set) //
136                 .doFinally(sig -> logger.info("Finally " + sig)) //
137                 .subscribe();
138         }
139
140         private void set(DataFromTopic receivedKafkaOutput) {
141             this.receivedKafkaOutput = receivedKafkaOutput;
142             this.count++;
143             logger.debug("*** received {}, {}", topic, receivedKafkaOutput);
144         }
145
146         public synchronized String lastKey() {
147             return this.receivedKafkaOutput.key;
148         }
149
150         public synchronized String lastValue() {
151             return this.receivedKafkaOutput.value;
152         }
153
154         public void reset() {
155             count = 0;
156             this.receivedKafkaOutput = new DataFromTopic("", "");
157         }
158     }
159
160     static class FileCollectorMock extends FileCollector {
161         final AppConfig appConfig;
162
163         public FileCollectorMock(AppConfig appConfig) {
164             super(appConfig, new Counters());
165             this.appConfig = appConfig;
166         }
167
168         @Override // (override fetchFile to disable the actual file fetching)
169         public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
170             FileCollector fc = new FileCollector(this.appConfig, new Counters());
171             FilePublishInformation i = fc.createFilePublishInformation(fileData);
172
173             try {
174                 Path from = Path.of("config/application.yaml");
175                 Path to = Path.of(this.appConfig.collectedFilesPath, fileData.name());
176                 Files.createDirectories(to.getParent());
177                 Files.copy(from, to, StandardCopyOption.REPLACE_EXISTING);
178             } catch (Exception e) {
179                 logger.error("Could not copy file {}", e.getMessage());
180             }
181             return Mono.just(i);
182         }
183     }
184
185     static class CollectAndReportFilesMock extends CollectAndReportFiles {
186         final AppConfig appConfig;
187
188         public CollectAndReportFilesMock(AppConfig appConfig) {
189             super(appConfig);
190             this.appConfig = appConfig;
191         }
192
193         @Override // (override fetchFile to disable the actual file fetching)
194         protected FileCollector createFileCollector() {
195             return new FileCollectorMock(appConfig);
196         }
197     }
198
199     @TestConfiguration
200     static class TestBeanFactory {
201
202         @Bean
203         CollectAndReportFiles collectAndReportFiles(@Autowired AppConfig appConfig) {
204             return new CollectAndReportFilesMock(appConfig);
205         }
206     }
207
208     @BeforeEach
209     void init() {
210         if (kafkaReceiver == null) {
211             kafkaReceiver = new KafkaReceiver(this.appConfig, this.appConfig.collectedFileTopic);
212         }
213         kafkaReceiver.reset();
214         deleteAllFiles();
215     }
216
217     @AfterEach
218     void afterEach() {
219         DataStore store = DataStore.create(this.appConfig);
220         store.deleteBucket(Bucket.FILES).block();
221         store.deleteBucket(Bucket.LOCKS).block();
222         deleteAllFiles();
223
224     }
225
226     private void deleteAllFiles() {
227         try {
228             FileSystemUtils.deleteRecursively(Path.of(this.appConfig.collectedFilesPath));
229         } catch (IOException e) {
230         }
231     }
232
233     @Test
234     void clear() {
235
236     }
237
238     @Test
239     void testKafka() throws InterruptedException {
240         waitForKafkaListener();
241
242         this.scheduledTask.sendDataToStream(this.appConfig.fileReadyEventTopic, "key", "junk").blockLast();
243
244         String fileReadyMessage = gson.toJson(fileReadyMessage());
245         this.scheduledTask.sendDataToStream(this.appConfig.fileReadyEventTopic, "key", fileReadyMessage).blockLast();
246
247         await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
248         String rec = kafkaReceiver.lastValue();
249
250         assertThat(rec).contains("Ericsson");
251
252         FilePublishInformation recObj = gson.fromJson(rec, FilePublishInformation.class);
253
254         assertThat(recObj.getName()).isEqualTo(SOURCE_NAME + "/" + PM_FILE_NAME);
255     }
256
257     @Test
258     void testS3Concurrency() throws Exception {
259         waitForKafkaListener();
260
261         final int NO_OF_OBJECTS = 10;
262
263         Instant startTime = Instant.now();
264
265         Flux.range(1, NO_OF_OBJECTS) //
266             .map(i -> gson.toJson(fileReadyMessage("testS3Concurrency_" + i))) //
267             .flatMap(fileReadyMessage -> scheduledTask.sendDataToStream(appConfig.fileReadyEventTopic, "key",
268                 fileReadyMessage)) //
269             .blockLast(); //
270
271         while (kafkaReceiver.count < NO_OF_OBJECTS) {
272             logger.info("sleeping {}", kafkaReceiver.count);
273             Thread.sleep(1000 * 1);
274         }
275
276         String rec = kafkaReceiver.lastValue();
277         assertThat(rec).contains("Ericsson");
278
279         final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
280         logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
281     }
282
283     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
284     private static void waitForKafkaListener() throws InterruptedException {
285         Thread.sleep(4000);
286     }
287
288     @Test
289     @SuppressWarnings("squid:S2699")
290     void runMock() throws Exception {
291         logger.warn("**************** Keeping server alive! " + this.port);
292         synchronized (this) {
293             this.wait();
294         }
295     }
296
297     FileReadyMessage.Event event(String fileName) {
298         MessageMetaData messageMetaData = MessageMetaData.builder() //
299             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
300             .sourceName(SOURCE_NAME) //
301             .startEpochMicrosec(START_EPOCH_MICROSEC) //
302             .timeZoneOffset(TIME_ZONE_OFFSET) //
303             .changeIdentifier(CHANGE_IDENTIFIER) //
304             .eventName("Noti_RnNode-Ericsson_FileReady").build();
305
306         FileReadyMessage.FileInfo fileInfo = FileReadyMessage.FileInfo //
307             .builder() //
308             .fileFormatType(FILE_FORMAT_TYPE) //
309             .location(LOCATION) //
310             .fileFormatVersion(FILE_FORMAT_VERSION) //
311             .compression(GZIP_COMPRESSION) //
312             .build();
313
314         FileReadyMessage.ArrayOfNamedHashMap arrayOfNamedHashMap = FileReadyMessage.ArrayOfNamedHashMap //
315             .builder() //
316             .name(fileName) //
317             .hashMap(fileInfo) //
318             .build();
319
320         List<FileReadyMessage.ArrayOfNamedHashMap> arrayOfNamedHashMapList = new ArrayList<>();
321         arrayOfNamedHashMapList.add(arrayOfNamedHashMap);
322
323         FileReadyMessage.NotificationFields notificationFields = FileReadyMessage.NotificationFields //
324             .builder().notificationFieldsVersion("notificationFieldsVersion") //
325             .changeType(CHANGE_TYPE).changeIdentifier(CHANGE_IDENTIFIER) //
326             .arrayOfNamedHashMap(arrayOfNamedHashMapList) //
327             .build();
328
329         return FileReadyMessage.Event.builder() //
330             .commonEventHeader(messageMetaData) //
331             .notificationFields(notificationFields).build();
332     }
333
334     private FileReadyMessage fileReadyMessage(String fileName) {
335         FileReadyMessage message = FileReadyMessage.builder() //
336             .event(event(fileName)) //
337             .build();
338         return message;
339     }
340
341     private FileReadyMessage fileReadyMessage() {
342         return fileReadyMessage(PM_FILE_NAME);
343     }
344
345 }