X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FDmaapTopicConsumer.java;h=fe7ec8b7da7388897df1416c84b65652bb80c8f7;hb=09e21f39a3ffcfc2063110bcad028014b0056398;hp=55a02abfd8d5c4177502af222e33492f28224151;hpb=d0c7f9207203ce9a502fc15c09f9938eebfd44f7;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index 55a02abf..fe7ec8b7 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -29,57 +29,24 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.Jobs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; /** * The class fetches incoming requests from DMAAP and sends them further to the * consumers that has a job for this InformationType. */ - public class DmaapTopicConsumer { private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class); private final AsyncRestClient dmaapRestClient; - private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); protected final ApplicationConfig applicationConfig; protected final InfoType type; protected final Jobs jobs; - /** Submits new elements until stopped */ - private static class InfiniteFlux { - private FluxSink sink; - private int counter = 0; - - public synchronized Flux start() { - stop(); - return Flux.create(this::next).doOnRequest(this::onRequest); - } - - public synchronized void stop() { - if (this.sink != null) { - this.sink.complete(); - this.sink = null; - } - } - - void onRequest(long no) { - logger.debug("InfiniteFlux.onRequest {}", no); - for (long i = 0; i < no; ++i) { - sink.next(counter++); - } - } - - void next(FluxSink sink) { - logger.debug("InfiniteFlux.next"); - this.sink = sink; - sink.next(counter++); - } - } - public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); @@ -89,14 +56,18 @@ public class DmaapTopicConsumer { } public void start() { - infiniteSubmitter.start() // + Flux.range(0, Integer.MAX_VALUE) // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // .flatMap(this::pushDataToConsumers) // .subscribe(// null, // throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); // + this::onComplete); // + } + private void onComplete() { + logger.warn("DmaapMessageConsumer completed {}", type.getId()); + start(); } private String getDmaapUrl() { @@ -128,8 +99,9 @@ public class DmaapTopicConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // + .filter(job -> job.isFilterMatch(body)) // .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) // - .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) // + .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); } }