* O-RAN-SC
* %%
* Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
/**
* The class streams data from a multi cast sink and sends the data to the Job
public void start(Flux<TopicListener.DataFromTopic> input) {
logger.debug("Starting distribution, to topic: {}", jobGroup.getId());
PmReportFilter filter = getPmReportFilter(this.jobGroup);
+ final RetryBackoffSpec retry = Retry.fixedDelay(3, Duration.ofSeconds(5));
if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
this.subscription = filterAndBuffer(input, this.jobGroup) //
- .flatMap(this::sendToClient) //
+ .doOnNext(filtered -> logger.debug("received data"))
+ .flatMap(filtered -> this.sendToClient(filtered).retryWhen(retry)) // //
.onErrorResume(this::handleError) //
.subscribe(this::handleSentOk, //
this::handleExceptionInStream, //
dataStore), 100)
.map(jobGroup::filter) //
.map(this::gzip) //
- .flatMap(this::sendToClient, 1) //
+ .flatMap(filtered -> this.sendToClient(filtered).retryWhen(retry), 1) //
.onErrorResume(this::handleCollectHistoricalDataError) //
.doFinally(sig -> sendLastStoredRecord()) //
.subscribe();
this.errorStats.handleOkFromConsumer();
}
-}
+}
\ No newline at end of file