X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FDmaapMessageConsumer.java;h=1a260e92eb773bd68c41eed8be05059a36792ce4;hb=e372f940d2e57562d23e08ecb797f580800dc719;hp=fb5c891c4b971d24bb07b576d1da7fcac01ad9b6;hpb=f0e49a07dad877f94f635dda4ab477b9636536c8;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java index fb5c891c..1a260e92 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java @@ -101,6 +101,7 @@ public class DmaapMessageConsumer { final int CONCURRENCY = 5; return infiniteSubmitter.start() // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // + .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // .flatMap(this::handleReceivedMessage, CONCURRENCY); }