From 0ce7926cadf6f0e3049ce3542bbfa38387cf1ac2 Mon Sep 17 00:00:00 2001 From: "aravind.est" Date: Wed, 10 Apr 2024 11:16:43 +0100 Subject: [PATCH] DMaapAdapter - Add some retries is send to consumer fails Change-Id: Ic8bbd6b82d005e677463ccf55012386e17ea1e9f Signed-off-by: aravind.est Issue-ID: NONRTRIC-996 --- .../java/org/oran/dmaapadapter/tasks/JobDataDistributor.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 5e150dd..bea26b3 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -3,6 +3,7 @@ * O-RAN-SC * %% * Copyright (C) 2021 Nordix Foundation + * Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved. * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +23,7 @@ package org.oran.dmaapadapter.tasks; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.time.Duration; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -43,6 +45,8 @@ import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; /** * The class streams data from a multi cast sink and sends the data to the Job @@ -85,9 +89,11 @@ public abstract class JobDataDistributor { public void start(Flux input) { logger.debug("Starting distribution, to topic: {}", jobGroup.getId()); PmReportFilter filter = getPmReportFilter(this.jobGroup); + final RetryBackoffSpec retry = Retry.fixedDelay(3, Duration.ofSeconds(5)); if (filter == null || filter.getFilterData().getPmRopEndTime() == null) { this.subscription = filterAndBuffer(input, this.jobGroup) // - .flatMap(this::sendToClient) // + .doOnNext(filtered -> logger.debug("received data")) + .flatMap(filtered -> this.sendToClient(filtered).retryWhen(retry)) // // .onErrorResume(this::handleError) // .subscribe(this::handleSentOk, // this::handleExceptionInStream, // @@ -117,7 +123,7 @@ public abstract class JobDataDistributor { dataStore), 100) .map(jobGroup::filter) // .map(this::gzip) // - .flatMap(this::sendToClient, 1) // + .flatMap(filtered -> this.sendToClient(filtered).retryWhen(retry), 1) // .onErrorResume(this::handleCollectHistoricalDataError) // .doFinally(sig -> sendLastStoredRecord()) // .subscribe(); @@ -305,4 +311,4 @@ public abstract class JobDataDistributor { this.errorStats.handleOkFromConsumer(); } -} +} \ No newline at end of file -- 2.16.6