final int CONCURRENCY = 5;
return infiniteSubmitter.start() //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
+ .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
.flatMap(this::handleReceivedMessage, CONCURRENCY);
}