Infinit number of files in S3 handled.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: Ie2fe4c853ce0c9ab4ca49c7fbc8a60ba44231bfe
public Mono<Boolean> createLock(String name) {
File file = path(name).toFile();
try {
+ Files.createDirectories(path(name).getParent());
boolean res = file.createNewFile();
return Mono.just(res);
} catch (Exception e) {
- return Mono.just(file.exists());
+ logger.warn("Could not create lock file: {}, reason: {}", file.getPath(), e.getMessage());
+ return Mono.just(!file.exists());
}
}
return Mono.fromFuture(future);
}
- private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
- ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() //
+ private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
+ ListObjectsResponse prevResponse) {
+ ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
.bucket(bucket) //
- .maxKeys(1100) //
- .prefix(prefix) //
- .build();
+ .maxKeys(1000) //
+ .prefix(prefix);
+
+ if (prevResponse != null) {
+ if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
+ builder.marker(prevResponse.nextMarker());
+ } else {
+ return Mono.empty();
+ }
+ }
+
+ ListObjectsRequest listObjectsRequest = builder.build();
CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+ return Mono.fromFuture(future);
+ }
- return Mono.fromFuture(future) //
+ private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+
+ return listObjectsRequest(bucket, prefix, null) //
+ .expand(response -> listObjectsRequest(bucket, prefix, response)) //
.map(ListObjectsResponse::contents) //
.doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
.doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
- .flatMapMany(Flux::fromIterable) //
- .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) //
-
- ;
+ .flatMap(Flux::fromIterable) //
+ .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
}
private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
private final ApplicationConfig applConfig;
private class ErrorStats {
+ @Getter
private int consumerFaultCounter = 0;
- private boolean irrecoverableError = false; // eg. overflow
public void handleOkFromConsumer() {
this.consumerFaultCounter = 0;
}
public void handleException(Throwable t) {
- if (t instanceof WebClientResponseException) {
- ++this.consumerFaultCounter;
- } else {
- irrecoverableError = true;
- }
- }
-
- public boolean isItHopeless() {
- final int STOP_AFTER_ERRORS = 5;
- return irrecoverableError || consumerFaultCounter > STOP_AFTER_ERRORS;
- }
-
- public void resetIrrecoverableErrors() {
- irrecoverableError = false;
+ ++this.consumerFaultCounter;
}
}
this.dataStore = DataStore.create(applConfig);
this.dataStore.create(DataStore.Bucket.FILES).subscribe();
this.dataStore.create(DataStore.Bucket.LOCKS).subscribe();
-
- this.errorStats.resetIrrecoverableErrors();
}
public void start(Flux<TopicListener.DataFromTopic> input) {
}
private TopicListener.DataFromTopic createFakeEvent(String fileName) {
-
NewFileEvent ev = new NewFileEvent(fileName);
-
return new TopicListener.DataFromTopic(null, gson.toJson(ev).getBytes(), false);
}
private Mono<String> handleError(Throwable t) {
logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
this.errorStats.handleException(t);
- if (this.errorStats.isItHopeless()) {
- logger.error("Giving up: {} job: {}", t.getMessage(), job.getId());
- return Mono.error(t);
- } else {
- return Mono.empty(); // Ignore
- }
+ return Mono.empty(); // Ignore
}
private void handleSentOk(String data) {
filterData.getMeasTypes().add("pmCounterNumber1");
filterData.getMeasObjClass().add("NRCellCU");
- this.applicationConfig.setZipOutput(true);
- final int NO_OF_JOBS = 150;
+ this.applicationConfig.setZipOutput(false);
+ final int NO_OF_JOBS = 100;
ArrayList<KafkaReceiver> receivers = new ArrayList<>();
for (int i = 0; i < NO_OF_JOBS; ++i) {
final String outputTopic = "manyJobs_" + i;
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(NO_OF_JOBS));
waitForKafkaListener();
- final int NO_OF_OBJECTS = 1000;
+ final int NO_OF_OBJECTS = 10000;
Instant startTime = Instant.now();