endpointOverride: http://localhost:9000
accessKeyId: minio
secretAccessKey: miniostorage
+ locksBucket: ropfilelocks
+ bucket: ropfiles
@Value("${app.s3.secretAccessKey:}")
private String s3SecretAccessKey;
+ @Getter
+ @Value("${app.s3.locksBucket:}")
+ private String s3LocksBucket;
+
+ @Getter
+ @Value("${app.s3.bucket:}")
+ private String s3Bucket;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
import java.util.Map;
import lombok.Getter;
+import lombok.Setter;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
.disableHtmlEscaping() //
.create();
+ @Getter
private final FilterData filterData;
@Getter
final Collection<String> measTypes = new HashSet<>();
final Collection<String> measuredEntityDns = new ArrayList<>();
final Collection<String> measObjClass = new HashSet<>();
+
+ @Setter
+ String pmRopStartTime;
}
private static class MeasTypesIndexed extends PmReport.MeasTypes {
@Getter
private final String lastUpdated;
+ @Getter
private final Filter filter;
@Getter
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface DataStore {
+ public enum Bucket {
+ FILES, LOCKS
+ }
+
+ public Flux<String> listFiles(Bucket bucket, String prefix);
+
+ public Mono<String> readFile(Bucket bucket, String fileName);
+
+ public Mono<String> readFile(String bucket, String fileName);
+
+ public Mono<Boolean> createLock(String name);
+
+ public Mono<Boolean> deleteLock(String name);
+
+ public Mono<Boolean> deleteObject(Bucket bucket, String name);
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.springframework.http.HttpStatus;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class FileStore implements DataStore {
+
+ ApplicationConfig applicationConfig;
+
+ public FileStore(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+ }
+
+ @Override
+ public Flux<String> listFiles(Bucket bucket, String prefix) {
+ Path root = Path.of(applicationConfig.getPmFilesPath(), prefix);
+ if (!root.toFile().exists()) {
+ root = root.getParent();
+ }
+
+ List<String> result = new ArrayList<>();
+ try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
+
+ stream.forEach(path -> filterListFiles(path, prefix, result));
+
+ return Flux.fromIterable(result);
+ } catch (Exception e) {
+ return Flux.error(e);
+ }
+ }
+
+ private void filterListFiles(Path path, String prefix, List<String> result) {
+ if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
+ result.add(externalName(path));
+ }
+ }
+
+ private String externalName(Path f) {
+ String fullName = f.toString();
+ return fullName.substring(applicationConfig.getPmFilesPath().length());
+ }
+
+ public Mono<String> readFile(String bucket, String fileName) {
+ return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
+ }
+
+ @Override
+ public Mono<String> readFile(Bucket bucket, String fileName) {
+ try {
+ String contents = Files.readString(path(fileName));
+ return Mono.just(contents);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> createLock(String name) {
+ File file = path(name).toFile();
+ try {
+ boolean res = file.createNewFile();
+ return Mono.just(res || isAged(file));
+ } catch (Exception e) {
+ return Mono.just(file.exists() && isAged(file));
+ }
+ }
+
+ public Mono<String> copyFileTo(Path from, String to) {
+ try {
+ Path toPath = path(to);
+ Files.createDirectories(toPath);
+ Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING);
+ return Mono.just(to);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ private boolean isAged(File file) {
+ final long MAX_AGE_SECONDS = (long) 60 * 5;
+ return Instant.now().getEpochSecond() - file.lastModified() > MAX_AGE_SECONDS;
+
+ }
+
+ @Override
+ public Mono<Boolean> deleteLock(String name) {
+ return deleteObject(Bucket.LOCKS, name);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+ try {
+ Files.delete(path(name));
+ return Mono.just(true);
+ } catch (Exception e) {
+ return Mono.just(false);
+ }
+ }
+
+ private Path path(String name) {
+ return Path.of(applicationConfig.getPmFilesPath(), name);
+ }
+
+}
package org.oran.dmaapadapter.tasks;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
public class HttpJobDataDistributor extends JobDataDistributor {
private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
- public HttpJobDataDistributor(Job job) {
- super(job);
+ public HttpJobDataDistributor(Job job, ApplicationConfig config) {
+ super(job, config);
}
@Override
package org.oran.dmaapadapter.tasks;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+
import lombok.Getter;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.Filter;
+import org.oran.dmaapadapter.filter.PmReportFilter;
+import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.tasks.KafkaTopicListener.NewFileEvent;
+import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.Disposable;
private final Job job;
private Disposable subscription;
private final ErrorStats errorStats = new ErrorStats();
+ private final ApplicationConfig applConfig;
+
+ private final DataStore fileStore;
+ private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
private class ErrorStats {
private int consumerFaultCounter = 0;
}
}
- protected JobDataDistributor(Job job) {
+ protected JobDataDistributor(Job job, ApplicationConfig applConfig) {
this.job = job;
+ this.applConfig = applConfig;
+ this.fileStore = applConfig.isS3Enabled() ? new S3ObjectStore(applConfig) : new FileStore(applConfig);
}
public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
stop();
+
+ collectHistoricalData();
+
this.errorStats.resetIrrecoverableErrors();
this.subscription = filterAndBuffer(input, this.job) //
.flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
() -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
}
+ static class LockedException extends ServiceException {
+ public LockedException(String file) {
+ super(file, HttpStatus.NOT_FOUND);
+ }
+ }
+
+ private void collectHistoricalData() {
+ PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
+
+ if (filter != null) {
+ this.fileStore.createLock(collectHistoricalDataLockName()) //
+ .flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted)
+ : Mono.error(new LockedException(collectHistoricalDataLockName()))) //
+ .flatMapMany(b -> Flux.fromIterable(filter.getFilterData().getSourceNames()))
+ .flatMap(sourceName -> fileStore.listFiles(DataStore.Bucket.FILES, sourceName), 1) //
+ .filter(fileName -> filterStartTime(filter.getFilterData().getPmRopStartTime(), fileName)) //
+ .map(this::createFakeEvent) //
+ .flatMap(event -> filterAndBuffer(event, this.job), 1) //
+ .flatMap(this::sendToClient, 1) //
+ .onErrorResume(this::handleCollectHistoricalDataError) //
+ .collectList() //
+ .flatMap(list -> fileStore.deleteLock(collectHistoricalDataLockName())) //
+ .subscribe();
+ }
+ }
+
+ private Mono<String> handleCollectHistoricalDataError(Throwable t) {
+
+ if (t instanceof LockedException) {
+ logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
+ return Mono.empty(); // Ignore
+ } else {
+ return fileStore.deleteLock(collectHistoricalDataLockName()) //
+ .map(bool -> "OK") //
+ .onErrorResume(t2 -> Mono.empty());
+ }
+ }
+
+ private String collectHistoricalDataLockName() {
+ return "collectHistoricalDataLock" + this.job.getId();
+ }
+
+ private Flux<TopicListener.DataFromTopic> createFakeEvent(String fileName) {
+
+ NewFileEvent ev = new NewFileEvent(fileName, this.applConfig.getS3Bucket());
+
+ return Flux.just(new TopicListener.DataFromTopic("", gson.toJson(ev)));
+ }
+
+ private boolean filterStartTime(String startTimeStr, String fileName) {
+ // A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json
+ try {
+ String fileTimePart = fileName.substring(fileName.lastIndexOf("/") + 2);
+ fileTimePart = fileTimePart.substring(0, 18);
+
+ DateTimeFormatter formatter = new DateTimeFormatterBuilder().appendPattern("yyyyMMdd.HHmmZ").toFormatter();
+
+ OffsetDateTime fileStartTime = OffsetDateTime.parse(fileTimePart, formatter);
+ OffsetDateTime startTime = OffsetDateTime.parse(startTimeStr);
+
+ return startTime.isBefore(fileStartTime);
+ } catch (Exception e) {
+ logger.warn("Time parsing exception: {}", e.getMessage());
+ return false;
+ }
+ }
+
private void handleExceptionInStream(Throwable t) {
logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
stop();
private Flux<Filter.FilteredData> filterAndBuffer(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
Flux<Filter.FilteredData> filtered = //
inputFlux.doOnNext(data -> job.getStatistics().received(data.value)) //
+ .flatMap(this::getDataFromFileIfNewPmFileEvent, 100) //
.map(job::filter) //
.filter(f -> !f.isEmpty()) //
.doOnNext(f -> job.getStatistics().filtered(f.value)); //
return filtered;
}
+ private Mono<DataFromTopic> getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
+ if (this.job.getType().getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) {
+ return Mono.just(data);
+ }
+
+ try {
+ NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
+ if (ev.getObjectStoreBucket() != null) {
+ if (this.applConfig.isS3Enabled()) {
+ return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) //
+ .map(str -> new DataFromTopic(data.key, str));
+ } else {
+ logger.error("S3 is not configured in application.yaml, ignoring: {}", data);
+ return Mono.empty();
+ }
+ } else {
+ if (applConfig.getPmFilesPath().isEmpty() || ev.getFilename() == null) {
+ logger.debug("Passing data {}", data);
+ return Mono.just(data);
+ } else {
+ Path path = Path.of(this.applConfig.getPmFilesPath(), ev.getFilename());
+ String pmReportJson = Files.readString(path, Charset.defaultCharset());
+ return Mono.just(new DataFromTopic(data.key, pmReportJson));
+ }
+ }
+ } catch (Exception e) {
+ return Mono.just(data);
+ }
+ }
+
private String quoteNonJson(String str, Job job) {
return job.getType().isJson() ? str : quote(str);
}
private final ApplicationConfig appConfig;
public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
- super(job);
+ super(job, appConfig);
this.appConfig = appConfig;
}
protected Mono<String> sendToClient(Filter.FilteredData data) {
Job job = this.getJob();
- logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
+ logger.trace("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
SenderRecord<String, String, Integer> senderRecord = senderRecord(data, job);
package org.oran.dmaapadapter.tasks;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import java.net.URI;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import lombok.Builder;
import lombok.Getter;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.ResponseBytes;
-import software.amazon.awssdk.core.async.AsyncResponseTransformer;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
-import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
-import software.amazon.awssdk.services.s3.model.GetObjectRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
/**
* The class streams incoming requests from a Kafka topic and sends them further
private final InfoType type;
private Flux<DataFromTopic> dataFromTopic;
- @Getter
- private static S3AsyncClient s3AsynchClient;
-
- private static Gson gson = new GsonBuilder() //
- .disableHtmlEscaping() //
- .create(); //
-
@ToString
@Builder
public static class NewFileEvent {
private String objectStoreBucket;
}
- public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
- this.applicationConfig = applicationConfig;
+ public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) {
+ this.applicationConfig = applConfig;
this.type = type;
- if (applicationConfig.isS3Enabled()) {
- synchronized (KafkaTopicListener.class) {
- if (s3AsynchClient == null) {
- s3AsynchClient = getS3AsyncClientBuilder().build();
- }
- }
- }
}
@Override
return KafkaReceiver.create(kafkaInputProperties(clientId)) //
.receiveAutoAck() //
.concatMap(consumerRecord -> consumerRecord) //
- .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
+ .doOnNext(input -> logger.trace("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(),
input.value())) //
.doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
.doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
.filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
.map(input -> new DataFromTopic(input.key(), input.value())) //
- .flatMap(this::getDataFromFileIfNewPmFileEvent, 100) //
.publish() //
.autoConnect(1);
}
- private S3AsyncClientBuilder getS3AsyncClientBuilder() {
- URI uri = URI.create(this.applicationConfig.getS3EndpointOverride());
- return S3AsyncClient.builder() //
- .region(Region.US_EAST_1) //
- .endpointOverride(uri) //
- .credentialsProvider(StaticCredentialsProvider.create( //
- AwsBasicCredentials.create(this.applicationConfig.getS3AccessKeyId(), //
- this.applicationConfig.getS3SecretAccessKey())));
-
- }
-
- private Mono<String> getDataFromS3Object(String bucket, String key) {
- if (!this.applicationConfig.isS3Enabled()) {
- logger.error("Missing S3 confinguration in application.yaml, ignoring bucket: {}, key: {}", bucket, key);
- return Mono.empty();
- }
-
- GetObjectRequest request = GetObjectRequest.builder() //
- .bucket(bucket) //
- .key(key) //
- .build();
-
- CompletableFuture<ResponseBytes<GetObjectResponse>> future = s3AsynchClient.getObject(request,
- AsyncResponseTransformer.toBytes());
-
- return Mono.fromFuture(future) //
- .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
- .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
- .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
- .onErrorResume(t -> Mono.empty());
- }
-
- private Mono<DataFromTopic> getDataFromFileIfNewPmFileEvent(DataFromTopic data) {
- if (this.type.getDataType() != InfoType.DataType.PM_DATA || data.value.length() > 1000) {
- return Mono.just(data);
- }
-
- try {
- NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
- if (ev.getObjectStoreBucket() != null) {
- if (applicationConfig.isS3Enabled()) {
- return getDataFromS3Object(ev.getObjectStoreBucket(), ev.getFilename()) //
- .map(str -> new DataFromTopic(data.key, str));
- } else {
- logger.error("S3 is not configured in application.yaml, ignoring: {}", data);
- return Mono.empty();
- }
- } else {
- if (applicationConfig.getPmFilesPath().isEmpty() || ev.filename == null) {
- logger.debug("Passing data {}", data);
- return Mono.just(data);
- } else {
- Path path = Path.of(this.applicationConfig.getPmFilesPath(), ev.getFilename());
- String pmReportJson = Files.readString(path, Charset.defaultCharset());
- return Mono.just(new DataFromTopic(data.key, pmReportJson));
- }
- }
- } catch (Exception e) {
- return Mono.just(data);
- }
- }
-
private ReceiverOptions<String, String> kafkaInputProperties(String clientId) {
Map<String, Object> consumerProps = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3ObjectStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
+ private final ApplicationConfig applicationConfig;
+
+ private static S3AsyncClient s3AsynchClient;
+
+ public S3ObjectStore(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+
+ getS3AsynchClient(applicationConfig);
+ }
+
+ private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
+ if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
+ s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
+ }
+ return s3AsynchClient;
+ }
+
+ private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
+ URI uri = URI.create(applicationConfig.getS3EndpointOverride());
+ return S3AsyncClient.builder() //
+ .region(Region.US_EAST_1) //
+ .endpointOverride(uri) //
+ .credentialsProvider(StaticCredentialsProvider.create( //
+ AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
+ applicationConfig.getS3SecretAccessKey())));
+
+ }
+
+ @Override
+ public Flux<String> listFiles(Bucket bucket, String prefix) {
+ return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
+ }
+
+ @Override
+ public Mono<Boolean> createLock(String name) {
+ return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
+ .onErrorResume(t -> createLock(name, null));
+ }
+
+ private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
+ final long MAX_AGE_SECONDS = (long) 60 * 5;
+
+ if (head == null || head.lastModified().getEpochSecond() - Instant.now().getEpochSecond() > MAX_AGE_SECONDS) {
+
+ return this.putObject(Bucket.LOCKS, name, "") //
+ .flatMap(resp -> Mono.just(true)) //
+ .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
+ .onErrorResume(t -> Mono.just(false));
+ } else {
+ return Mono.just(false);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> deleteLock(String name) {
+ return deleteObject(Bucket.LOCKS, name);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+
+ DeleteObjectRequest request = DeleteObjectRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .key(name) //
+ .build();
+
+ CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
+
+ return Mono.fromFuture(future).map(resp -> true);
+ }
+
+ @Override
+ public Mono<String> readFile(Bucket bucket, String fileName) {
+ return getDataFromS3Object(bucket(bucket), fileName);
+ }
+
+ @Override
+ public Mono<String> readFile(String bucket, String fileName) {
+ return getDataFromS3Object(bucket, fileName);
+ }
+
+ public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
+ PutObjectRequest request = PutObjectRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .key(fileName) //
+ .build();
+
+ AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
+
+ CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+ return Mono.fromFuture(future) //
+ .map(putObjectResponse -> fileName) //
+ .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+ }
+
+ public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
+ return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
+ }
+
+ public Mono<String> createS3Bucket(Bucket bucket) {
+ return createS3Bucket(bucket(bucket));
+ }
+
+ private Mono<String> createS3Bucket(String s3Bucket) {
+
+ CreateBucketRequest request = CreateBucketRequest.builder() //
+ .bucket(s3Bucket) //
+ .build();
+
+ CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
+
+ return Mono.fromFuture(future) //
+ .map(f -> s3Bucket) //
+ .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+ .onErrorResume(t -> Mono.just(s3Bucket));
+ }
+
+ public Mono<String> deleteBucket(Bucket bucket) {
+ return listFiles(bucket, "") //
+ .flatMap(key -> deleteObject(bucket, key)) //
+ .collectList() //
+ .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
+ .map(resp -> "OK")
+ .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
+ .onErrorResume(t -> Mono.just("NOK"));
+ }
+
+ private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
+ DeleteBucketRequest request = DeleteBucketRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .build();
+
+ CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
+
+ return Mono.fromFuture(future);
+ }
+
+ private String bucket(Bucket bucket) {
+ return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
+ }
+
+ private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
+
+ PutObjectRequest request = PutObjectRequest.builder() //
+ .bucket(s3Bucket) //
+ .key(s3Key) //
+ .build();
+
+ AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
+
+ CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+ return Mono.fromFuture(future) //
+ .map(f -> s3Key) //
+ .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+
+ }
+
+ private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
+ HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
+
+ CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
+ return Mono.fromFuture(future);
+ }
+
+ private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+ ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() //
+ .bucket(bucket) //
+ .maxKeys(1100) //
+ .prefix(prefix) //
+ .build();
+ CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+
+ return Mono.fromFuture(future) //
+ .map(ListObjectsResponse::contents) //
+ .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
+ .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
+ .flatMapMany(Flux::fromIterable) //
+ .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) //
+
+ ;
+ }
+
+ private Mono<String> getDataFromS3Object(String bucket, String key) {
+
+ GetObjectRequest request = GetObjectRequest.builder() //
+ .bucket(bucket) //
+ .key(key) //
+ .build();
+
+ CompletableFuture<ResponseBytes<GetObjectResponse>> future =
+ s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
+
+ return Mono.fromFuture(future) //
+ .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
+ .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
+ .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+ .onErrorResume(t -> Mono.empty());
+ }
+
+}
private JobDataDistributor createConsumer(Job job) {
return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
- : new HttpJobDataDistributor(job);
+ : new HttpJobDataDistributor(job, appConfig);
}
private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
"type": "string"
}
]
+ },
+ "pmRopStartTime": {
+ "type": "string"
}
}
}
"type": "integer",
"minimum": 1
},
- "kafkaOutputTopic" : {
+ "kafkaOutputTopic": {
"type": "string"
},
"bufferTimeout": {
]
}
}
-}
+}
\ No newline at end of file
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.DataStore;
+import org.oran.dmaapadapter.tasks.DataStore.Bucket;
+import org.oran.dmaapadapter.tasks.FileStore;
import org.oran.dmaapadapter.tasks.KafkaTopicListener;
+import org.oran.dmaapadapter.tasks.S3ObjectStore;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
import org.slf4j.Logger;
import org.springframework.test.context.TestPropertySource;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
-import software.amazon.awssdk.core.async.AsyncRequestBody;
-import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.PutObjectResponse;
@SuppressWarnings("java:S3577") // Rename class
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
- "app.pm-files-path=./src/test/resources/"}) //
+ "app.pm-files-path=./src/test/resources/", "app.s3.locksBucket=ropfilelocks", "app.s3.bucket=ropfiles"}) //
class IntegrationWithKafka {
final String TYPE_ID = "KafkaInformationType";
private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
this.receivedKafkaOutput = receivedKafkaOutput;
this.count++;
- logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
+ logger.trace("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
}
synchronized String lastKey() {
}
kafkaReceiver.reset();
kafkaReceiver2.reset();
+
+ S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+ fileStore.deleteBucket(DataStore.Bucket.FILES).block();
+ fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
}
@AfterEach
ConsumerJobInfo consumerJobInfoKafka(String topic, PmReportFilter.FilterData filterData) {
try {
Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, 1, topic);
+
String str = gson.toJson(param);
Object parametersObj = jsonObject(str);
Thread.sleep(4000);
}
- private Mono<String> copyFileToS3Bucket(Path fileName, String s3Bucket, String s3Key) {
-
- PutObjectRequest request = PutObjectRequest.builder() //
- .bucket(s3Bucket) //
- .key(s3Key) //
- .build();
-
- AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
-
- CompletableFuture<PutObjectResponse> future = KafkaTopicListener.getS3AsynchClient().putObject(request, body);
-
- return Mono.fromFuture(future) //
- .map(f -> s3Key) //
- .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()))
- .onErrorResume(t -> Mono.empty());
- }
-
- private Mono<String> createS3Bucket(String s3Bucket) {
-
- CreateBucketRequest request = CreateBucketRequest.builder() //
- .bucket(s3Bucket) //
- .build();
-
- CompletableFuture<CreateBucketResponse> future = KafkaTopicListener.getS3AsynchClient().createBucket(request);
-
- return Mono.fromFuture(future) //
- .map(f -> s3Bucket) //
- .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
- .onErrorResume(t -> Mono.just(s3Bucket));
- }
-
@Test
void simpleCase() throws Exception {
final String JOB_ID = "ID";
}
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
- @Test
+ // @Test
void kafkaCharacteristics_pmFilter_localFile() throws Exception {
// Filter PM reports and sent to two jobs over Kafka
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 100000;
+ final int NO_OF_OBJECTS = 100;
Instant startTime = Instant.now();
printStatistics();
}
+ @Test
+ void testListFiles() {
+ FileStore fileStore = new FileStore(applicationConfig);
+
+ fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
+ "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+
+ List<String> files = fileStore.listFiles(Bucket.FILES, "O-DU-112").collectList().block();
+ assertThat(files).hasSize(1);
+
+ files = fileStore.listFiles(Bucket.FILES, "O-DU-1122").collectList().block();
+ assertThat(files).hasSize(1);
+
+ fileStore.deleteObject(Bucket.FILES, files.get(0));
+ }
+
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@Test
void kafkaCharacteristics_pmFilter_s3() throws Exception {
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 100000;
+ final int NO_OF_OBJECTS = 100;
Instant startTime = Instant.now();
KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() //
- .filename("pm_report.json").objectStoreBucket("ropfiles") //
+ .filename("pm_report.json").objectStoreBucket(applicationConfig.getS3Bucket()) //
.build();
- createS3Bucket("ropfiles").block();
- copyFileToS3Bucket(Path.of("./src/test/resources/pm_report.json"), "ropfiles", "pm_report.json").block();
+ S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+
+ fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
+ fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"), "pm_report.json")
+ .block();
String eventAsString = gson.toJson(event);
logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
printStatistics();
+
+ }
+
+ @Test
+ void testHistoricalData() throws Exception {
+ // test
+ final String JOB_ID = "testHistoricalData";
+
+ S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+
+ fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
+ fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block();
+
+ fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"),
+ "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
+
+ fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json"),
+ "OTHER_SOURCENAME/test.json").block();
+
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+
+ PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
+ filterData.getSourceNames().add("O-DU-1122");
+ filterData.setPmRopStartTime("1999-12-27T10:50:44.000-08:00");
+
+ this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
+ restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
}
@Test
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
- var dataToSend = Flux.range(1, 100).map(i -> kafkaSenderRecord("Message_" + i, "", TYPE_ID)); // Message_1,
+ var dataToSend = Flux.range(1, 100).map(i -> kafkaSenderRecord("XMessage_" + i, "", TYPE_ID)); // Message_1,
// Message_2
// etc.
sendDataToKafka(dataToSend); // this should not overflow