DMaapAdapter - Add some retries is send to consumer fails 31/12631/2
authoraravind.est <aravindhan.a@est.tech>
Wed, 10 Apr 2024 10:16:43 +0000 (11:16 +0100)
committeraravind.est <aravindhan.a@est.tech>
Wed, 10 Apr 2024 10:16:43 +0000 (11:16 +0100)
Change-Id: Ic8bbd6b82d005e677463ccf55012386e17ea1e9f
Signed-off-by: aravind.est <aravindhan.a@est.tech>
Issue-ID: NONRTRIC-996

src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java

index 5e150dd..bea26b3 100644 (file)
@@ -3,6 +3,7 @@
  * O-RAN-SC
  * %%
  * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
  * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,6 +23,7 @@ package org.oran.dmaapadapter.tasks;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.time.Duration;
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
@@ -43,6 +45,8 @@ import org.slf4j.LoggerFactory;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
 
 /**
  * The class streams data from a multi cast sink and sends the data to the Job
@@ -85,9 +89,11 @@ public abstract class JobDataDistributor {
     public void start(Flux<TopicListener.DataFromTopic> input) {
         logger.debug("Starting distribution, to topic: {}", jobGroup.getId());
         PmReportFilter filter = getPmReportFilter(this.jobGroup);
+        final RetryBackoffSpec retry = Retry.fixedDelay(3, Duration.ofSeconds(5));
         if (filter == null || filter.getFilterData().getPmRopEndTime() == null) {
             this.subscription = filterAndBuffer(input, this.jobGroup) //
-                    .flatMap(this::sendToClient) //
+                    .doOnNext(filtered -> logger.debug("received data"))
+                    .flatMap(filtered -> this.sendToClient(filtered).retryWhen(retry)) //                                                    //
                     .onErrorResume(this::handleError) //
                     .subscribe(this::handleSentOk, //
                             this::handleExceptionInStream, //
@@ -117,7 +123,7 @@ public abstract class JobDataDistributor {
                             dataStore), 100)
                     .map(jobGroup::filter) //
                     .map(this::gzip) //
-                    .flatMap(this::sendToClient, 1) //
+                    .flatMap(filtered -> this.sendToClient(filtered).retryWhen(retry), 1) //
                     .onErrorResume(this::handleCollectHistoricalDataError) //
                     .doFinally(sig -> sendLastStoredRecord()) //
                     .subscribe();
@@ -305,4 +311,4 @@ public abstract class JobDataDistributor {
         this.errorStats.handleOkFromConsumer();
     }
 
-}
+}
\ No newline at end of file