NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / DmaapTopicConsumer.java
index 217a072..fe7ec8b 100644 (file)
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
 /**
@@ -44,42 +43,10 @@ public class DmaapTopicConsumer {
     private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
 
     private final AsyncRestClient dmaapRestClient;
-    private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
     protected final ApplicationConfig applicationConfig;
     protected final InfoType type;
     protected final Jobs jobs;
 
-    /** Submits new elements until stopped */
-    private static class InfiniteFlux {
-        private FluxSink<Integer> sink;
-        private int counter = 0;
-
-        public synchronized Flux<Integer> start() {
-            stop();
-            return Flux.create(this::next).doOnRequest(this::onRequest);
-        }
-
-        public synchronized void stop() {
-            if (this.sink != null) {
-                this.sink.complete();
-                this.sink = null;
-            }
-        }
-
-        void onRequest(long no) {
-            logger.debug("InfiniteFlux.onRequest {}", no);
-            for (long i = 0; i < no; ++i) {
-                sink.next(counter++);
-            }
-        }
-
-        void next(FluxSink<Integer> sink) {
-            logger.debug("InfiniteFlux.next");
-            this.sink = sink;
-            sink.next(counter++);
-        }
-    }
-
     public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
         this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
@@ -89,14 +56,18 @@ public class DmaapTopicConsumer {
     }
 
     public void start() {
-        infiniteSubmitter.start() //
+        Flux.range(0, Integer.MAX_VALUE) //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
                 .flatMap(this::pushDataToConsumers) //
                 .subscribe(//
                         null, //
                         throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
-                        () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); //
+                        this::onComplete); //
+    }
 
+    private void onComplete() {
+        logger.warn("DmaapMessageConsumer completed {}", type.getId());
+        start();
     }
 
     private String getDmaapUrl() {
@@ -128,6 +99,7 @@ public class DmaapTopicConsumer {
 
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
+                .filter(job -> job.isFilterMatch(body)) //
                 .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
                 .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);