Removed the bucket from the NewFileEvenet since the S3 bucket needs to be configured anyway to query
already collected PM data.
Fixed some bug.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Idaac044bd3578cbf97f700b70b73f9ed8ac568b1
public Flux<String> listFiles(Bucket bucket, String prefix);
- public Mono<String> readFile(Bucket bucket, String fileName);
-
- public Mono<String> readFile(String bucket, String fileName);
+ public Mono<byte[]> readFile(Bucket bucket, String fileName);
public Mono<Boolean> createLock(String name);
import java.util.stream.Stream;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
-import org.oran.dmaapadapter.exceptions.ServiceException;
-import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
return fullName.substring(applicationConfig.getPmFilesPath().length());
}
- public Mono<String> readFile(String bucket, String fileName) {
- return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
- }
-
@Override
- public Mono<String> readFile(Bucket bucket, String fileName) {
+ public Mono<byte[]> readFile(Bucket bucket, String fileName) {
try {
- String contents = Files.readString(path(fileName));
+ byte[] contents = Files.readAllBytes(path(fileName));
return Mono.just(contents);
} catch (Exception e) {
return Mono.error(e);
package org.oran.dmaapadapter.datastore;
import java.net.URI;
-import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
}
@Override
- public Mono<String> readFile(Bucket bucket, String fileName) {
+ public Mono<byte[]> readFile(Bucket bucket, String fileName) {
return getDataFromS3Object(bucket(bucket), fileName);
}
- @Override
- public Mono<String> readFile(String bucket, String fileName) {
- return getDataFromS3Object(bucket, fileName);
- }
-
public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
PutObjectRequest request = PutObjectRequest.builder() //
.bucket(bucket(bucket)) //
;
}
- private Mono<String> getDataFromS3Object(String bucket, String key) {
+ private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
GetObjectRequest request = GetObjectRequest.builder() //
.bucket(bucket) //
s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
return Mono.fromFuture(future) //
- .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
- .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
+ .map(b -> b.asByteArray()) //
+ .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket,
+ t.getMessage())) //
.doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
.onErrorResume(t -> Mono.empty());
}
+++ /dev/null
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2021 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oran.dmaapadapter.tasks;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Stream;
-
-import org.oran.dmaapadapter.configuration.ApplicationConfig;
-import org.oran.dmaapadapter.exceptions.ServiceException;
-import org.springframework.http.HttpStatus;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-public class FileStore implements DataStore {
-
- ApplicationConfig applicationConfig;
-
- public FileStore(ApplicationConfig applicationConfig) {
- this.applicationConfig = applicationConfig;
- }
-
- @Override
- public Flux<String> listFiles(Bucket bucket, String prefix) {
- Path root = Path.of(applicationConfig.getPmFilesPath(), prefix);
- if (!root.toFile().exists()) {
- root = root.getParent();
- }
-
- List<String> result = new ArrayList<>();
- try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
-
- stream.forEach(path -> filterListFiles(path, prefix, result));
-
- return Flux.fromIterable(result);
- } catch (Exception e) {
- return Flux.error(e);
- }
- }
-
- private void filterListFiles(Path path, String prefix, List<String> result) {
- if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
- result.add(externalName(path));
- }
- }
-
- private String externalName(Path f) {
- String fullName = f.toString();
- return fullName.substring(applicationConfig.getPmFilesPath().length());
- }
-
- public Mono<byte[]> readFile(String bucket, String fileName) {
- return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
- }
-
- @Override
- public Mono<byte[]> readFile(Bucket bucket, String fileName) {
- try {
- byte[] contents = Files.readAllBytes(path(fileName));
- return Mono.just(contents);
- } catch (Exception e) {
- return Mono.error(e);
- }
- }
-
- @Override
- public Mono<Boolean> createLock(String name) {
- File file = path(name).toFile();
- try {
- boolean res = file.createNewFile();
- return Mono.just(res || isAged(file));
- } catch (Exception e) {
- return Mono.just(file.exists() && isAged(file));
- }
- }
-
- public Mono<String> copyFileTo(Path from, String to) {
- try {
- Path toPath = path(to);
- Files.createDirectories(toPath);
- Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING);
- return Mono.just(to);
- } catch (Exception e) {
- return Mono.error(e);
- }
- }
-
- private boolean isAged(File file) {
- final long MAX_AGE_SECONDS = (long) 60 * 5;
- return Instant.now().getEpochSecond() - file.lastModified() > MAX_AGE_SECONDS;
-
- }
-
- @Override
- public Mono<Boolean> deleteLock(String name) {
- return deleteObject(Bucket.LOCKS, name);
- }
-
- @Override
- public Mono<Boolean> deleteObject(Bucket bucket, String name) {
- try {
- Files.delete(path(name));
- return Mono.just(true);
- } catch (Exception e) {
- return Mono.just(false);
- }
- }
-
- private Path path(String name) {
- return Path.of(applicationConfig.getPmFilesPath(), name);
- }
-
-}
import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import lombok.Getter;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.datastore.DataStore;
+import org.oran.dmaapadapter.datastore.FileStore;
+import org.oran.dmaapadapter.datastore.S3ObjectStore;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.Filter;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.Job;
-import org.oran.dmaapadapter.tasks.KafkaTopicListener.NewFileEvent;
import org.oran.dmaapadapter.tasks.TopicListener.DataFromTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Flux<TopicListener.DataFromTopic> createFakeEvent(String fileName) {
- NewFileEvent ev = new NewFileEvent(fileName, this.applConfig.getS3Bucket());
+ NewFileEvent ev = new NewFileEvent(fileName);
return Flux.just(new TopicListener.DataFromTopic("", gson.toJson(ev)));
}
try {
NewFileEvent ev = gson.fromJson(data.value, NewFileEvent.class);
- if (ev.getObjectStoreBucket() != null) {
- if (this.applConfig.isS3Enabled()) {
- return fileStore.readFile(ev.getObjectStoreBucket(), ev.getFilename()) //
- .map(str -> unzip(str, ev.getFilename())) //
- .map(str -> new DataFromTopic(data.key, str));
- } else {
- logger.error("S3 is not configured in application.yaml, ignoring: {}", data);
- return Mono.empty();
- }
- } else {
- if (applConfig.getPmFilesPath().isEmpty() || ev.getFilename() == null) {
- logger.debug("Passing data {}", data);
- return Mono.just(data);
- } else {
- Path path = Path.of(this.applConfig.getPmFilesPath(), ev.getFilename());
- String pmReportJson = Files.readString(path, Charset.defaultCharset());
- return Mono.just(new DataFromTopic(data.key, pmReportJson));
- }
+
+ if (ev.getFilename() == null) {
+ logger.warn("Ignoring received message: {}", data);
+ return Mono.empty();
}
+
+ return fileStore.readFile(DataStore.Bucket.FILES, ev.getFilename()) //
+ .map(bytes -> unzip(bytes, ev.getFilename())) //
+ .map(bytes -> new DataFromTopic(data.key, bytes));
+
} catch (Exception e) {
return Mono.just(data);
}
import java.util.HashMap;
import java.util.Map;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.ToString;
-
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
private final InfoType type;
private Flux<DataFromTopic> dataFromTopic;
- @ToString
- @Builder
- public static class NewFileEvent {
- @Getter
- private String filename;
-
- @Getter
- private String objectStoreBucket;
- }
-
public KafkaTopicListener(ApplicationConfig applConfig, InfoType type) {
this.applicationConfig = applConfig;
this.type = type;
package org.oran.dmaapadapter.tasks;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-public interface DataStore {
- public enum Bucket {
- FILES, LOCKS
- }
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
- public Flux<String> listFiles(Bucket bucket, String prefix);
- public Mono<byte[]> readFile(Bucket bucket, String fileName);
-
- public Mono<byte[]> readFile(String bucket, String fileName);
-
- public Mono<Boolean> createLock(String name);
-
- public Mono<Boolean> deleteLock(String name);
-
- public Mono<Boolean> deleteObject(Bucket bucket, String name);
+@ToString
+@Builder
+public class NewFileEvent {
+ @Getter
+ private String filename;
}
+++ /dev/null
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2021 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oran.dmaapadapter.tasks;
-
-import java.net.URI;
-import java.nio.file.Path;
-import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
-
-import org.oran.dmaapadapter.configuration.ApplicationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.ResponseBytes;
-import software.amazon.awssdk.core.async.AsyncRequestBody;
-import software.amazon.awssdk.core.async.AsyncResponseTransformer;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
-import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
-import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
-import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
-import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
-import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
-import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
-import software.amazon.awssdk.services.s3.model.GetObjectRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
-import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
-import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.PutObjectResponse;
-import software.amazon.awssdk.services.s3.model.S3Object;
-
-public class S3ObjectStore implements DataStore {
- private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
- private final ApplicationConfig applicationConfig;
-
- private static S3AsyncClient s3AsynchClient;
-
- public S3ObjectStore(ApplicationConfig applicationConfig) {
- this.applicationConfig = applicationConfig;
-
- getS3AsynchClient(applicationConfig);
- }
-
- private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
- if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
- s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
- }
- return s3AsynchClient;
- }
-
- private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
- URI uri = URI.create(applicationConfig.getS3EndpointOverride());
- return S3AsyncClient.builder() //
- .region(Region.US_EAST_1) //
- .endpointOverride(uri) //
- .credentialsProvider(StaticCredentialsProvider.create( //
- AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
- applicationConfig.getS3SecretAccessKey())));
-
- }
-
- @Override
- public Flux<String> listFiles(Bucket bucket, String prefix) {
- return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
- }
-
- @Override
- public Mono<Boolean> createLock(String name) {
- return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
- .onErrorResume(t -> createLock(name, null));
- }
-
- private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
- final long MAX_AGE_SECONDS = (long) 60 * 5;
-
- if (head == null || head.lastModified().getEpochSecond() - Instant.now().getEpochSecond() > MAX_AGE_SECONDS) {
-
- return this.putObject(Bucket.LOCKS, name, "") //
- .flatMap(resp -> Mono.just(true)) //
- .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
- .onErrorResume(t -> Mono.just(false));
- } else {
- return Mono.just(false);
- }
- }
-
- @Override
- public Mono<Boolean> deleteLock(String name) {
- return deleteObject(Bucket.LOCKS, name);
- }
-
- @Override
- public Mono<Boolean> deleteObject(Bucket bucket, String name) {
-
- DeleteObjectRequest request = DeleteObjectRequest.builder() //
- .bucket(bucket(bucket)) //
- .key(name) //
- .build();
-
- CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
-
- return Mono.fromFuture(future).map(resp -> true);
- }
-
- @Override
- public Mono<byte[]> readFile(Bucket bucket, String fileName) {
- return getDataFromS3Object(bucket(bucket), fileName);
- }
-
- @Override
- public Mono<byte[]> readFile(String bucket, String fileName) {
- return getDataFromS3Object(bucket, fileName);
- }
-
- public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
- PutObjectRequest request = PutObjectRequest.builder() //
- .bucket(bucket(bucket)) //
- .key(fileName) //
- .build();
-
- AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
-
- CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
-
- return Mono.fromFuture(future) //
- .map(putObjectResponse -> fileName) //
- .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
- }
-
- public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
- return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
- }
-
- public Mono<String> createS3Bucket(Bucket bucket) {
- return createS3Bucket(bucket(bucket));
- }
-
- private Mono<String> createS3Bucket(String s3Bucket) {
-
- CreateBucketRequest request = CreateBucketRequest.builder() //
- .bucket(s3Bucket) //
- .build();
-
- CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
-
- return Mono.fromFuture(future) //
- .map(f -> s3Bucket) //
- .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
- .onErrorResume(t -> Mono.just(s3Bucket));
- }
-
- public Mono<String> deleteBucket(Bucket bucket) {
- return listFiles(bucket, "") //
- .flatMap(key -> deleteObject(bucket, key)) //
- .collectList() //
- .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
- .map(resp -> "OK")
- .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
- .onErrorResume(t -> Mono.just("NOK"));
- }
-
- private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
- DeleteBucketRequest request = DeleteBucketRequest.builder() //
- .bucket(bucket(bucket)) //
- .build();
-
- CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
-
- return Mono.fromFuture(future);
- }
-
- private String bucket(Bucket bucket) {
- return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
- }
-
- private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
-
- PutObjectRequest request = PutObjectRequest.builder() //
- .bucket(s3Bucket) //
- .key(s3Key) //
- .build();
-
- AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
-
- CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
-
- return Mono.fromFuture(future) //
- .map(f -> s3Key) //
- .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
-
- }
-
- private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
- HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
-
- CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
- return Mono.fromFuture(future);
- }
-
- private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
- ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() //
- .bucket(bucket) //
- .maxKeys(1100) //
- .prefix(prefix) //
- .build();
- CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
-
- return Mono.fromFuture(future) //
- .map(ListObjectsResponse::contents) //
- .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
- .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
- .flatMapMany(Flux::fromIterable) //
- .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) //
-
- ;
- }
-
- private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
-
- GetObjectRequest request = GetObjectRequest.builder() //
- .bucket(bucket) //
- .key(key) //
- .build();
-
- CompletableFuture<ResponseBytes<GetObjectResponse>> future =
- s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
-
- return Mono.fromFuture(future) //
- .map(b -> b.asByteArray()) //
- .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
- .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
- .onErrorResume(t -> Mono.empty());
- }
-
-}
@Getter
@Setter
+ @ToString.Exclude
private PmReport cachedPmReport;
public DataFromTopic(String key, String value) {
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.datastore.FileStore;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.PmReport;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.tasks.JobDataDistributor;
+import org.oran.dmaapadapter.tasks.NewFileEvent;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
"app.webclient.trust-store=./config/truststore.jks", //
"app.webclient.trust-store-used=true", //
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
- "app.s3.endpointOverride="})
+ "app.pm-files-path=/tmp", "app.s3.endpointOverride="})
class ApplicationTest {
@Autowired
// Return one messagefrom DMAAP and verify that the job (consumer) receives a
// filtered PM message
- String path = "./src/test/resources/pm_report.json";
- String pmReportJson = Files.readString(Path.of(path), Charset.defaultCharset());
+ String path = "./src/test/resources/pm_report.json.gz";
+ FileStore fs = new FileStore(this.applicationConfig);
+ fs.copyFileTo(Path.of(path), "pm_report.json.gz");
+
+ NewFileEvent event = NewFileEvent.builder().filename("pm_report.json.gz").build();
+
DmaapSimulatorController.addPmResponse("{}"); // This should just be discarded
- DmaapSimulatorController.addPmResponse(pmReportJson);
+ DmaapSimulatorController.addPmResponse(gson.toJson(event));
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.tasks.KafkaTopicListener;
+import org.oran.dmaapadapter.tasks.NewFileEvent;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
import org.slf4j.Logger;
kafkaReceiver2.reset();
S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
- fileStore.deleteBucket(DataStore.Bucket.FILES).block();
- fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
+ fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
+ fileStore.createS3Bucket(DataStore.Bucket.LOCKS).block();
+
}
@AfterEach
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();
+
+ S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
+ fileStore.deleteBucket(DataStore.Bucket.FILES).block();
+ fileStore.deleteBucket(DataStore.Bucket.LOCKS).block();
}
private AsyncRestClient restClient(boolean useTrustValidation) {
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 100;
+ final int NO_OF_OBJECTS = 50000;
Instant startTime = Instant.now();
- KafkaTopicListener.NewFileEvent event = KafkaTopicListener.NewFileEvent.builder() //
- .filename("pm_report.json.gz").objectStoreBucket(applicationConfig.getS3Bucket()) //
+ final String FILE_NAME = "pm_report.json.gz";
+
+ NewFileEvent event = NewFileEvent.builder() //
+ .filename(FILE_NAME) //
.build();
S3ObjectStore fileStore = new S3ObjectStore(applicationConfig);
fileStore.createS3Bucket(DataStore.Bucket.FILES).block();
- fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/pm_report.json.gz"),
- "pm_report.json.gz").block();
+ fileStore.copyFileToS3(DataStore.Bucket.FILES, Path.of("./src/test/resources/" + FILE_NAME), FILE_NAME).block();
String eventAsString = gson.toJson(event);