Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / datafilecollector / src / test / java / org / oran / datafile / MockDatafile.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020-2023 NordixFoundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.datafile;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.when;
27
28 import com.google.gson.Gson;
29 import com.google.gson.GsonBuilder;
30
31 import java.io.IOException;
32 import java.nio.file.Files;
33 import java.nio.file.Path;
34 import java.nio.file.StandardCopyOption;
35 import java.time.Duration;
36 import java.time.Instant;
37 import java.util.ArrayList;
38 import java.util.List;
39
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.oran.datafile.configuration.AppConfig;
44 import org.oran.datafile.datastore.DataStore;
45 import org.oran.datafile.datastore.DataStore.Bucket;
46 import org.oran.datafile.model.Counters;
47 import org.oran.datafile.model.FileData;
48 import org.oran.datafile.model.FilePublishInformation;
49 import org.oran.datafile.model.FileReadyMessage;
50 import org.oran.datafile.model.FileReadyMessage.MessageMetaData;
51 import org.oran.datafile.tasks.CollectAndReportFiles;
52 import org.oran.datafile.tasks.FileCollector;
53 import org.oran.datafile.tasks.KafkaTopicListener;
54 import org.oran.datafile.tasks.KafkaTopicListener.DataFromTopic;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.boot.test.context.SpringBootTest;
59 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
60 import org.springframework.boot.test.context.TestConfiguration;
61 import org.springframework.boot.test.web.server.LocalServerPort;
62 import org.springframework.context.annotation.Bean;
63 import org.springframework.test.context.TestPropertySource;
64 import org.springframework.util.FileSystemUtils;
65
66 import reactor.core.publisher.Flux;
67 import reactor.core.publisher.Mono;
68
69 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
70 @TestPropertySource(
71     properties = { //
72         "app.ssl.key-store-password-file=./config/ftps_keystore.pass", //
73         "app.ssl.key-store=./config/ftps_keystore.p12", //
74         "app.ssl.trust-store-password-file=./config/truststore.pass", //
75         "app.ssl.trust-store=", // No trust validation
76         "app.collected-files-path=/tmp/osc_datafile/", //
77         "logging.file.name=/tmp/datafile.log", //
78         "spring.main.allow-bean-definition-overriding=true", //
79         "app.s3.endpointOverride=http://localhost:9000", //
80         "app.s3.accessKeyId=minio", //
81         "app.s3.secretAccessKey=miniostorage", //
82         "app.s3.bucket=ropfiles", //
83         "app.s3.locksBucket=locks", //
84         "app.auth-token-file=src/test/resources/jwtToken.b64", //
85         "app.kafka.use-oath-token=false"})
86 @SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
87 class MockDatafile {
88
89     private static final int LAST_EPOCH_MICROSEC = 151983;
90     private static final String SOURCE_NAME = "5GRAN_DU";
91     private static final int START_EPOCH_MICROSEC = 15198378;
92     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
93     private static final String PM_FILE_NAME = "PM_FILE_NAME";
94
95     // This can be any downloadable file on the net
96     private static final String FTPES_LOCATION =
97         "ftpes:// onap:pano@ftp-ftpes-6:2021/A20000626.2315+0200-2330+0200_GNODEB-15-4.xml.gz";
98     private static final String LOCATION =
99         "https://launchpad.net/ubuntu/+source/perf-tools-unstable/1.0+git7ffb3fd-1ubuntu1/+build/13630748/+files/perf-tools-unstable_1.0+git7ffb3fd-1ubuntu1_all.deb";
100     private static final String GZIP_COMPRESSION = "gzip";
101     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
102     private static final String FILE_FORMAT_VERSION = "V10";
103     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
104     private static final String CHANGE_TYPE = "FileReady";
105
106     private static final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
107     private static Gson gson = new GsonBuilder() //
108         .disableHtmlEscaping() //
109         .create(); //
110
111     @LocalServerPort
112     private int port;
113
114     @Autowired
115     AppConfig appConfig;
116
117     @Autowired
118     CollectAndReportFiles scheduledTask;
119
120     private static KafkaReceiver kafkaReceiver;
121
122     private static class KafkaReceiver {
123         public final String topic;
124         private DataFromTopic receivedKafkaOutput;
125         private final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
126
127         int count = 0;
128
129         public KafkaReceiver(AppConfig applicationConfig, String outputTopic) {
130             this.topic = outputTopic;
131
132             // Create a listener to the output topic. The KafkaTopicListener happens to be
133             // suitable for that,
134             AppConfig config = spy(applicationConfig);
135             when(config.getKafkaClientId()).thenReturn("MockDatafile");
136             when(config.getInputTopic()).thenReturn(outputTopic);
137             KafkaTopicListener topicListener = new KafkaTopicListener(config);
138
139             topicListener.getFlux() //
140                 .doOnNext(this::set) //
141                 .doFinally(sig -> logger.info("Finally " + sig)) //
142                 .subscribe();
143         }
144
145         private void set(DataFromTopic receivedKafkaOutput) {
146             this.receivedKafkaOutput = receivedKafkaOutput;
147             this.count++;
148             logger.debug("*** received {}, {}", topic, receivedKafkaOutput);
149         }
150
151         public synchronized String lastKey() {
152             return this.receivedKafkaOutput.key;
153         }
154
155         public synchronized String lastValue() {
156             return this.receivedKafkaOutput.value;
157         }
158
159         public void reset() {
160             count = 0;
161             this.receivedKafkaOutput = new DataFromTopic("", "");
162         }
163     }
164
165     static class FileCollectorMock extends FileCollector {
166         final AppConfig appConfig;
167
168         public FileCollectorMock(AppConfig appConfig) {
169             super(appConfig, new Counters());
170             this.appConfig = appConfig;
171         }
172
173         @Override // (override fetchFile to disable the actual file fetching)
174         public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
175             FileCollector fc = new FileCollector(this.appConfig, new Counters());
176             FilePublishInformation i = fc.createFilePublishInformation(fileData);
177
178             try {
179                 Path from = Path.of("config/application.yaml");
180                 Path to = Path.of(appConfig.getCollectedFilesPath(), fileData.name());
181                 Files.createDirectories(to.getParent());
182                 Files.copy(from, to, StandardCopyOption.REPLACE_EXISTING);
183             } catch (Exception e) {
184                 logger.error("Could not copy file {}", e.getMessage());
185             }
186             return Mono.just(i);
187         }
188     }
189
190     static class CollectAndReportFilesMock extends CollectAndReportFiles {
191         final AppConfig appConfig;
192
193         public CollectAndReportFilesMock(AppConfig appConfig) {
194             super(appConfig);
195             this.appConfig = appConfig;
196         }
197
198         @Override // (override fetchFile to disable the actual file fetching)
199         protected FileCollector createFileCollector() {
200             return new FileCollectorMock(appConfig);
201         }
202     }
203
204     @TestConfiguration
205     static class TestBeanFactory {
206
207         @Bean
208         CollectAndReportFiles collectAndReportFiles(@Autowired AppConfig appConfig) {
209             return new CollectAndReportFilesMock(appConfig);
210         }
211     }
212
213     @BeforeEach
214     void init() {
215         if (kafkaReceiver == null) {
216             kafkaReceiver = new KafkaReceiver(appConfig, appConfig.getCollectedFileTopic());
217         }
218         kafkaReceiver.reset();
219         deleteAllFiles();
220     }
221
222     @AfterEach
223     void afterEach() {
224         DataStore store = DataStore.create(this.appConfig);
225         store.deleteBucket(Bucket.FILES).block();
226         store.deleteBucket(Bucket.LOCKS).block();
227         deleteAllFiles();
228
229     }
230
231     private void deleteAllFiles() {
232         try {
233             FileSystemUtils.deleteRecursively(Path.of(appConfig.getCollectedFilesPath()));
234         } catch (IOException e) {
235         }
236     }
237
238     @Test
239     void clear() {
240
241     }
242
243     @Test
244     void testKafka() throws InterruptedException {
245         waitForKafkaListener();
246
247         this.scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", "junk").blockLast();
248
249         String fileReadyMessage = gson.toJson(fileReadyMessage());
250         this.scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", fileReadyMessage).blockLast();
251
252         await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
253         String rec = kafkaReceiver.lastValue();
254
255         assertThat(rec).contains("Ericsson");
256
257         FilePublishInformation recObj = gson.fromJson(rec, FilePublishInformation.class);
258
259         assertThat(recObj.getName()).isEqualTo(SOURCE_NAME + "/" + PM_FILE_NAME);
260     }
261
262     @Test
263     void testS3Concurrency() throws Exception {
264         waitForKafkaListener();
265
266         final int NO_OF_OBJECTS = 10;
267
268         Instant startTime = Instant.now();
269
270         Flux.range(1, NO_OF_OBJECTS) //
271             .map(i -> gson.toJson(fileReadyMessage("testS3Concurrency_" + i))) //
272             .flatMap(
273                 fileReadyMessage -> scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", fileReadyMessage)) //
274             .blockLast(); //
275
276         while (kafkaReceiver.count < NO_OF_OBJECTS) {
277             logger.info("sleeping {}", kafkaReceiver.count);
278             Thread.sleep(1000 * 1);
279         }
280
281         String rec = kafkaReceiver.lastValue();
282         assertThat(rec).contains("Ericsson");
283
284         final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
285         logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
286     }
287
288     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
289     private static void waitForKafkaListener() throws InterruptedException {
290         Thread.sleep(4000);
291     }
292
293     @Test
294     @SuppressWarnings("squid:S2699")
295     void runMock() throws Exception {
296         logger.warn("**************** Keeping server alive! " + this.port);
297         synchronized (this) {
298             this.wait();
299         }
300     }
301
302     FileReadyMessage.Event event(String fileName) {
303         MessageMetaData messageMetaData = MessageMetaData.builder() //
304             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
305             .sourceName(SOURCE_NAME) //
306             .startEpochMicrosec(START_EPOCH_MICROSEC) //
307             .timeZoneOffset(TIME_ZONE_OFFSET) //
308             .changeIdentifier(CHANGE_IDENTIFIER) //
309             .eventName("Noti_RnNode-Ericsson_FileReady").build();
310
311         FileReadyMessage.FileInfo fileInfo = FileReadyMessage.FileInfo //
312             .builder() //
313             .fileFormatType(FILE_FORMAT_TYPE) //
314             .location(LOCATION) //
315             .fileFormatVersion(FILE_FORMAT_VERSION) //
316             .compression(GZIP_COMPRESSION) //
317             .build();
318
319         FileReadyMessage.ArrayOfNamedHashMap arrayOfNamedHashMap = FileReadyMessage.ArrayOfNamedHashMap //
320             .builder() //
321             .name(fileName) //
322             .hashMap(fileInfo) //
323             .build();
324
325         List<FileReadyMessage.ArrayOfNamedHashMap> arrayOfNamedHashMapList = new ArrayList<>();
326         arrayOfNamedHashMapList.add(arrayOfNamedHashMap);
327
328         FileReadyMessage.NotificationFields notificationFields = FileReadyMessage.NotificationFields //
329             .builder().notificationFieldsVersion("notificationFieldsVersion") //
330             .changeType(CHANGE_TYPE).changeIdentifier(CHANGE_IDENTIFIER) //
331             .arrayOfNamedHashMap(arrayOfNamedHashMapList) //
332             .build();
333
334         return FileReadyMessage.Event.builder() //
335             .commonEventHeader(messageMetaData) //
336             .notificationFields(notificationFields).build();
337     }
338
339     private FileReadyMessage fileReadyMessage(String fileName) {
340         FileReadyMessage message = FileReadyMessage.builder() //
341             .event(event(fileName)) //
342             .build();
343         return message;
344     }
345
346     private FileReadyMessage fileReadyMessage() {
347         return fileReadyMessage(PM_FILE_NAME);
348     }
349
350 }