2 * ========================LICENSE_START=================================
5 * Copyright (C) 2020-2023 NordixFoundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.datafile;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.when;
28 import com.google.gson.Gson;
29 import com.google.gson.GsonBuilder;
31 import java.io.IOException;
32 import java.nio.file.Files;
33 import java.nio.file.Path;
34 import java.nio.file.StandardCopyOption;
35 import java.time.Duration;
36 import java.time.Instant;
37 import java.util.ArrayList;
38 import java.util.List;
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.oran.datafile.configuration.AppConfig;
44 import org.oran.datafile.datastore.DataStore;
45 import org.oran.datafile.datastore.DataStore.Bucket;
46 import org.oran.datafile.model.Counters;
47 import org.oran.datafile.model.FileData;
48 import org.oran.datafile.model.FilePublishInformation;
49 import org.oran.datafile.model.FileReadyMessage;
50 import org.oran.datafile.model.FileReadyMessage.MessageMetaData;
51 import org.oran.datafile.oauth2.SecurityContext;
52 import org.oran.datafile.tasks.CollectAndReportFiles;
53 import org.oran.datafile.tasks.FileCollector;
54 import org.oran.datafile.tasks.KafkaTopicListener;
55 import org.oran.datafile.tasks.KafkaTopicListener.DataFromTopic;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import org.springframework.beans.factory.annotation.Autowired;
59 import org.springframework.boot.test.context.SpringBootTest;
60 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
61 import org.springframework.boot.test.context.TestConfiguration;
62 import org.springframework.boot.test.web.server.LocalServerPort;
63 import org.springframework.context.annotation.Bean;
64 import org.springframework.test.context.TestPropertySource;
65 import org.springframework.util.FileSystemUtils;
67 import reactor.core.publisher.Flux;
68 import reactor.core.publisher.Mono;
70 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
73 "app.ssl.key-store-password-file=./config/ftps_keystore.pass", //
74 "app.ssl.key-store=./config/ftps_keystore.p12", //
75 "app.ssl.trust-store-password-file=./config/truststore.pass", //
76 "app.ssl.trust-store=", // No trust validation
77 "app.collected-files-path=/tmp/osc_datafile/", //
78 "logging.file.name=/tmp/datafile.log", //
79 "spring.main.allow-bean-definition-overriding=true", //
80 "app.s3.endpointOverride=http://localhost:9000", //
81 "app.s3.accessKeyId=minio", //
82 "app.s3.secretAccessKey=miniostorage", //
83 "app.s3.bucket=ropfiles", //
84 "app.s3.locksBucket=locks", //
85 "app.auth-token-file=src/test/resources/jwtToken.b64", //
86 "app.kafka.use-oath-token=false"})
87 @SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
90 private static final int LAST_EPOCH_MICROSEC = 151983;
91 private static final String SOURCE_NAME = "5GRAN_DU";
92 private static final int START_EPOCH_MICROSEC = 15198378;
93 private static final String TIME_ZONE_OFFSET = "UTC+05:00";
94 private static final String PM_FILE_NAME = "PM_FILE_NAME";
96 // This can be any downloadable file on the net
97 private static final String FTPES_LOCATION =
98 "ftpes:// onap:pano@ftp-ftpes-6:2021/A20000626.2315+0200-2330+0200_GNODEB-15-4.xml.gz";
99 private static final String LOCATION =
100 "https://launchpad.net/ubuntu/+source/perf-tools-unstable/1.0+git7ffb3fd-1ubuntu1/+build/13630748/+files/perf-tools-unstable_1.0+git7ffb3fd-1ubuntu1_all.deb";
101 private static final String GZIP_COMPRESSION = "gzip";
102 private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
103 private static final String FILE_FORMAT_VERSION = "V10";
104 private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
105 private static final String CHANGE_TYPE = "FileReady";
107 private static final Logger logger = LoggerFactory.getLogger(Integration.class);
108 private static Gson gson = new GsonBuilder() //
109 .disableHtmlEscaping() //
119 CollectAndReportFiles scheduledTask;
121 static final SecurityContext securityContext = new SecurityContext("");
123 private static KafkaReceiver kafkaReceiver;
125 private static class KafkaReceiver {
126 public final String topic;
127 private DataFromTopic receivedKafkaOutput;
128 private final Logger logger = LoggerFactory.getLogger(Integration.class);
132 public KafkaReceiver(AppConfig applicationConfig, String outputTopic) {
133 this.topic = outputTopic;
135 // Create a listener to the output topic. The KafkaTopicListener happens to be
136 // suitable for that,
137 AppConfig config = spy(applicationConfig);
138 when(config.getKafkaClientId()).thenReturn("MockDatafile");
139 when(config.getInputTopic()).thenReturn(outputTopic);
140 KafkaTopicListener topicListener = new KafkaTopicListener(config);
142 topicListener.getFlux() //
143 .doOnNext(this::set) //
144 .doFinally(sig -> logger.info("Finally " + sig)) //
148 private void set(DataFromTopic receivedKafkaOutput) {
149 this.receivedKafkaOutput = receivedKafkaOutput;
151 logger.debug("*** received {}, {}", topic, receivedKafkaOutput);
154 public synchronized String lastKey() {
155 return this.receivedKafkaOutput.key;
158 public synchronized String lastValue() {
159 return this.receivedKafkaOutput.value;
162 public void reset() {
164 this.receivedKafkaOutput = new DataFromTopic("", "");
168 static class FileCollectorMock extends FileCollector {
169 final AppConfig appConfig;
171 public FileCollectorMock(AppConfig appConfig) {
172 super(securityContext, appConfig, new Counters());
173 this.appConfig = appConfig;
176 @Override // (override fetchFile to disable the actual file fetching)
177 public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
178 FileCollector fc = new FileCollector(securityContext, this.appConfig, new Counters());
179 FilePublishInformation i = fc.createFilePublishInformation(fileData);
182 Path from = Path.of("config/application.yaml");
183 Path to = Path.of(appConfig.getCollectedFilesPath(), fileData.name());
184 Files.createDirectories(to.getParent());
185 Files.copy(from, to, StandardCopyOption.REPLACE_EXISTING);
186 } catch (Exception e) {
187 logger.error("Could not copy file {}", e.getMessage());
193 static class CollectAndReportFilesMock extends CollectAndReportFiles {
194 final AppConfig appConfig;
196 public CollectAndReportFilesMock(AppConfig appConfig) {
197 super(securityContext, appConfig);
198 this.appConfig = appConfig;
201 @Override // (override fetchFile to disable the actual file fetching)
202 protected FileCollector createFileCollector() {
203 return new FileCollectorMock(appConfig);
208 static class TestBeanFactory {
211 CollectAndReportFiles collectAndReportFiles(@Autowired AppConfig appConfig) {
212 return new CollectAndReportFilesMock(appConfig);
218 if (kafkaReceiver == null) {
219 kafkaReceiver = new KafkaReceiver(appConfig, appConfig.getCollectedFileTopic());
221 kafkaReceiver.reset();
227 DataStore store = DataStore.create(this.appConfig);
228 store.deleteBucket(Bucket.FILES).block();
229 store.deleteBucket(Bucket.LOCKS).block();
234 private void deleteAllFiles() {
236 FileSystemUtils.deleteRecursively(Path.of(appConfig.getCollectedFilesPath()));
237 } catch (IOException e) {
247 void testKafka() throws InterruptedException {
248 waitForKafkaListener();
250 this.scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", "junk").blockLast();
252 String fileReadyMessage = gson.toJson(fileReadyMessage());
253 this.scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", fileReadyMessage).blockLast();
255 await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
256 String rec = kafkaReceiver.lastValue();
258 assertThat(rec).contains("Ericsson");
260 FilePublishInformation recObj = gson.fromJson(rec, FilePublishInformation.class);
262 assertThat(recObj.getName()).isEqualTo(SOURCE_NAME + "/" + PM_FILE_NAME);
266 void testS3Concurrency() throws Exception {
267 waitForKafkaListener();
269 final int NO_OF_OBJECTS = 10;
271 Instant startTime = Instant.now();
273 Flux.range(1, NO_OF_OBJECTS) //
274 .map(i -> gson.toJson(fileReadyMessage("testS3Concurrency_" + i))) //
276 fileReadyMessage -> scheduledTask.sendDataToStream(appConfig.getInputTopic(), "key", fileReadyMessage)) //
279 while (kafkaReceiver.count < NO_OF_OBJECTS) {
280 logger.info("sleeping {}", kafkaReceiver.count);
281 Thread.sleep(1000 * 1);
284 String rec = kafkaReceiver.lastValue();
285 assertThat(rec).contains("Ericsson");
287 final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
288 logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
291 @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
292 private static void waitForKafkaListener() throws InterruptedException {
297 @SuppressWarnings("squid:S2699")
298 void runMock() throws Exception {
299 logger.warn("**************** Keeping server alive! " + this.port);
300 synchronized (this) {
305 FileReadyMessage.Event event(String fileName) {
306 MessageMetaData messageMetaData = MessageMetaData.builder() //
307 .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
308 .sourceName(SOURCE_NAME) //
309 .startEpochMicrosec(START_EPOCH_MICROSEC) //
310 .timeZoneOffset(TIME_ZONE_OFFSET) //
311 .changeIdentifier(CHANGE_IDENTIFIER) //
312 .eventName("Noti_RnNode-Ericsson_FileReady").build();
314 FileReadyMessage.FileInfo fileInfo = FileReadyMessage.FileInfo //
316 .fileFormatType(FILE_FORMAT_TYPE) //
317 .location(LOCATION) //
318 .fileFormatVersion(FILE_FORMAT_VERSION) //
319 .compression(GZIP_COMPRESSION) //
322 FileReadyMessage.ArrayOfNamedHashMap arrayOfNamedHashMap = FileReadyMessage.ArrayOfNamedHashMap //
325 .hashMap(fileInfo) //
328 List<FileReadyMessage.ArrayOfNamedHashMap> arrayOfNamedHashMapList = new ArrayList<>();
329 arrayOfNamedHashMapList.add(arrayOfNamedHashMap);
331 FileReadyMessage.NotificationFields notificationFields = FileReadyMessage.NotificationFields //
332 .builder().notificationFieldsVersion("notificationFieldsVersion") //
333 .changeType(CHANGE_TYPE).changeIdentifier(CHANGE_IDENTIFIER) //
334 .arrayOfNamedHashMap(arrayOfNamedHashMapList) //
337 return FileReadyMessage.Event.builder() //
338 .commonEventHeader(messageMetaData) //
339 .notificationFields(notificationFields).build();
342 private FileReadyMessage fileReadyMessage(String fileName) {
343 FileReadyMessage message = FileReadyMessage.builder() //
344 .event(event(fileName)) //
349 private FileReadyMessage fileReadyMessage() {
350 return fileReadyMessage(PM_FILE_NAME);