16a36c0652783dd2235f9d9e06388e769117c165
[nonrtric/plt/ranpm.git] / datafilecollector / src / test / java / org / onap / dcaegen2 / collectors / datafile / MockDatafile.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.dcaegen2.collectors.datafile;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25
26 import com.google.gson.Gson;
27 import com.google.gson.GsonBuilder;
28
29 import java.io.File;
30 import java.io.IOException;
31 import java.time.Duration;
32 import java.time.Instant;
33 import java.util.ArrayList;
34 import java.util.List;
35
36 import org.apache.commons.io.FileUtils;
37 import org.junit.jupiter.api.AfterEach;
38 import org.junit.jupiter.api.BeforeEach;
39 import org.junit.jupiter.api.Test;
40 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
41 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
42 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
43 import org.onap.dcaegen2.collectors.datafile.model.Counters;
44 import org.onap.dcaegen2.collectors.datafile.model.FileData;
45 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
46 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
47 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage.MessageMetaData;
48 import org.onap.dcaegen2.collectors.datafile.tasks.CollectAndReportFiles;
49 import org.onap.dcaegen2.collectors.datafile.tasks.FileCollector;
50 import org.onap.dcaegen2.collectors.datafile.tasks.KafkaTopicListener;
51 import org.onap.dcaegen2.collectors.datafile.tasks.KafkaTopicListener.DataFromTopic;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import org.springframework.beans.factory.annotation.Autowired;
55 import org.springframework.boot.test.context.SpringBootTest;
56 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
57 import org.springframework.boot.test.web.server.LocalServerPort;
58 import org.springframework.boot.test.context.TestConfiguration;
59 import org.springframework.context.annotation.Bean;
60 import org.springframework.test.context.TestPropertySource;
61
62 import reactor.core.publisher.Flux;
63 import reactor.core.publisher.Mono;
64
65 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
66 @TestPropertySource(properties = { //
67         "app.ssl.key-store-password-file=./config/ftps_keystore.pass", //
68         "app.ssl.key-store=./config/ftps_keystore.p12", //
69         "app.ssl.trust-store-password-file=./config/truststore.pass", //
70         "app.ssl.trust-store=", // No trust validation
71         "app.collected-files-path=/tmp/osc_datafile/", //
72         "logging.file.name=/tmp/datafile.log", //
73         "spring.main.allow-bean-definition-overriding=true", //
74         "app.s3.endpointOverride=http://localhost:9000", //
75         "app.s3.accessKeyId=minio", //
76         "app.s3.secretAccessKey=miniostorage", //
77         "app.s3.bucket=ropfiles", //
78         "app.s3.locksBucket=locks" })
79 @SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
80 class MockDatafile {
81
82     private static final int LAST_EPOCH_MICROSEC = 151983;
83     private static final String SOURCE_NAME = "5GRAN_DU";
84     private static final int START_EPOCH_MICROSEC = 15198378;
85     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
86     private static final String PM_FILE_NAME = "PM_FILE_NAME";
87
88     // This can be any downloadable file on the net
89     private static final String FTPES_LOCATION = "ftpes:// onap:pano@ftp-ftpes-6:2021/A20000626.2315+0200-2330+0200_GNODEB-15-4.xml.gz";
90     private static final String LOCATION = "https://launchpad.net/ubuntu/+source/perf-tools-unstable/1.0+git7ffb3fd-1ubuntu1/+build/13630748/+files/perf-tools-unstable_1.0+git7ffb3fd-1ubuntu1_all.deb";
91     private static final String GZIP_COMPRESSION = "gzip";
92     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
93     private static final String FILE_FORMAT_VERSION = "V10";
94     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
95     private static final String CHANGE_TYPE = "FileReady";
96
97     private static final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
98     private static Gson gson = new GsonBuilder() //
99             .disableHtmlEscaping() //
100             .create(); //
101
102     @LocalServerPort
103     private int port;
104
105     @Autowired
106     AppConfig appConfig;
107
108     @Autowired
109     CollectAndReportFiles scheduledTask;
110
111     private static KafkaReceiver kafkaReceiver;
112
113     private static class KafkaReceiver {
114         public final String topic;
115         private DataFromTopic receivedKafkaOutput;
116         private final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
117
118         int count = 0;
119
120         public KafkaReceiver(AppConfig applicationConfig, String outputTopic) {
121             this.topic = outputTopic;
122
123             // Create a listener to the output topic. The KafkaTopicListener happens to be
124             // suitable for that,
125
126             KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig.getKafkaBootStrapServers(),
127                     "MockDatafile", outputTopic);
128
129             topicListener.getFlux() //
130                     .doOnNext(this::set) //
131                     .doFinally(sig -> logger.info("Finally " + sig)) //
132                     .subscribe();
133         }
134
135         private void set(DataFromTopic receivedKafkaOutput) {
136             this.receivedKafkaOutput = receivedKafkaOutput;
137             this.count++;
138             logger.debug("*** received {}, {}", topic, receivedKafkaOutput);
139         }
140
141         public synchronized String lastKey() {
142             return this.receivedKafkaOutput.key;
143         }
144
145         public synchronized String lastValue() {
146             return this.receivedKafkaOutput.value;
147         }
148
149         public void reset() {
150             count = 0;
151             this.receivedKafkaOutput = new DataFromTopic("", "");
152         }
153     }
154
155     static class FileCollectorMock extends FileCollector {
156         final AppConfig appConfig;
157
158         public FileCollectorMock(AppConfig appConfig) {
159             super(appConfig, new Counters());
160             this.appConfig = appConfig;
161         }
162
163         @Override // (override fetchFile to disable the actual file fetching)
164         public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
165             FileCollector fc = new FileCollector(this.appConfig, new Counters());
166             FilePublishInformation i = fc.createFilePublishInformation(fileData);
167
168             try {
169                 File from = new File("config/application.yaml");
170                 File to = new File(this.appConfig.collectedFilesPath + "/" + fileData.name());
171                 FileUtils.forceMkdirParent(to);
172                 com.google.common.io.Files.copy(from, to);
173             } catch (Exception e) {
174                 logger.error("Could not copy file {}", e.getMessage());
175             }
176             return Mono.just(i);
177         }
178     }
179
180     static class CollectAndReportFilesMock extends CollectAndReportFiles {
181         final AppConfig appConfig;
182
183         public CollectAndReportFilesMock(AppConfig appConfig) {
184             super(appConfig);
185             this.appConfig = appConfig;
186         }
187
188         @Override // (override fetchFile to disable the actual file fetching)
189         protected FileCollector createFileCollector() {
190             return new FileCollectorMock(appConfig);
191         }
192     }
193
194     @TestConfiguration
195     static class TestBeanFactory {
196
197         @Bean
198         CollectAndReportFiles collectAndReportFiles(@Autowired AppConfig appConfig) {
199             return new CollectAndReportFilesMock(appConfig);
200         }
201     }
202
203     @BeforeEach
204     void init() {
205         if (kafkaReceiver == null) {
206             kafkaReceiver = new KafkaReceiver(this.appConfig, this.appConfig.collectedFileTopic);
207         }
208         kafkaReceiver.reset();
209         deleteAllFiles();
210     }
211
212     @AfterEach
213     void afterEach() {
214         DataStore store = DataStore.create(this.appConfig);
215         store.deleteBucket(Bucket.FILES).block();
216         store.deleteBucket(Bucket.LOCKS).block();
217         deleteAllFiles();
218
219     }
220
221     private void deleteAllFiles() {
222
223         try {
224             FileUtils.deleteDirectory(new File(this.appConfig.collectedFilesPath));
225         } catch (IOException e) {
226         }
227     }
228
229     @Test
230     void clear() {
231
232     }
233
234     @Test
235     void testKafka() throws InterruptedException {
236         waitForKafkaListener();
237
238         this.scheduledTask.sendDataToStream(this.appConfig.fileReadyEventTopic, "key", "junk").blockLast();
239
240         String fileReadyMessage = gson.toJson(fileReadyMessage());
241         this.scheduledTask.sendDataToStream(this.appConfig.fileReadyEventTopic, "key", fileReadyMessage).blockLast();
242
243         await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
244         String rec = kafkaReceiver.lastValue();
245
246         assertThat(rec).contains("Ericsson");
247
248         FilePublishInformation recObj = gson.fromJson(rec, FilePublishInformation.class);
249
250         assertThat(recObj.getName()).isEqualTo(SOURCE_NAME + "/" + PM_FILE_NAME);
251     }
252
253     @Test
254     void testS3Concurrency() throws Exception {
255         waitForKafkaListener();
256
257         final int NO_OF_OBJECTS = 10;
258
259         Instant startTime = Instant.now();
260
261         Flux.range(1, NO_OF_OBJECTS) //
262                 .map(i -> gson.toJson(fileReadyMessage("testS3Concurrency_" + i))) //
263                 .flatMap(fileReadyMessage -> scheduledTask.sendDataToStream(appConfig.fileReadyEventTopic, "key",
264                         fileReadyMessage)) //
265                 .blockLast(); //
266
267         while (kafkaReceiver.count < NO_OF_OBJECTS) {
268             logger.info("sleeping {}", kafkaReceiver.count);
269             Thread.sleep(1000 * 1);
270         }
271
272         String rec = kafkaReceiver.lastValue();
273         assertThat(rec).contains("Ericsson");
274
275         final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
276         logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
277     }
278
279     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
280     private static void waitForKafkaListener() throws InterruptedException {
281         Thread.sleep(4000);
282     }
283
284     @Test
285     @SuppressWarnings("squid:S2699")
286     void runMock() throws Exception {
287         logger.warn("**************** Keeping server alive! " + this.port);
288         synchronized (this) {
289             this.wait();
290         }
291     }
292
293     FileReadyMessage.Event event(String fileName) {
294         MessageMetaData messageMetaData = MessageMetaData.builder() //
295                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
296                 .sourceName(SOURCE_NAME) //
297                 .startEpochMicrosec(START_EPOCH_MICROSEC) //
298                 .timeZoneOffset(TIME_ZONE_OFFSET) //
299                 .changeIdentifier(CHANGE_IDENTIFIER) //
300                 .eventName("Noti_RnNode-Ericsson_FileReady").build();
301
302         FileReadyMessage.FileInfo fileInfo = FileReadyMessage.FileInfo //
303                 .builder() //
304                 .fileFormatType(FILE_FORMAT_TYPE) //
305                 .location(LOCATION) //
306                 .fileFormatVersion(FILE_FORMAT_VERSION) //
307                 .compression(GZIP_COMPRESSION) //
308                 .build();
309
310         FileReadyMessage.ArrayOfNamedHashMap arrayOfNamedHashMap = FileReadyMessage.ArrayOfNamedHashMap //
311                 .builder() //
312                 .name(fileName) //
313                 .hashMap(fileInfo) //
314                 .build();
315
316         List<FileReadyMessage.ArrayOfNamedHashMap> arrayOfNamedHashMapList = new ArrayList<>();
317         arrayOfNamedHashMapList.add(arrayOfNamedHashMap);
318
319         FileReadyMessage.NotificationFields notificationFields = FileReadyMessage.NotificationFields //
320                 .builder().notificationFieldsVersion("notificationFieldsVersion") //
321                 .changeType(CHANGE_TYPE).changeIdentifier(CHANGE_IDENTIFIER) //
322                 .arrayOfNamedHashMap(arrayOfNamedHashMapList) //
323                 .build();
324
325         return FileReadyMessage.Event.builder() //
326                 .commonEventHeader(messageMetaData) //
327                 .notificationFields(notificationFields).build();
328     }
329
330     private FileReadyMessage fileReadyMessage(String fileName) {
331         FileReadyMessage message = FileReadyMessage.builder() //
332                 .event(event(fileName)) //
333                 .build();
334         return message;
335     }
336
337     private FileReadyMessage fileReadyMessage() {
338         return fileReadyMessage(PM_FILE_NAME);
339     }
340
341 }