}
public void handleDmaapMsg(String msg) {
- try {
- this.createTask(msg) //
- .subscribe(x -> logger.debug("handleDmaapMsg: " + x), //
- throwable -> logger.warn("handleDmaapMsg failure ", throwable), //
- () -> logger.debug("handleDmaapMsg complete"));
- } catch (Exception e) {
- logger.warn("Received unparsable message from DMAAP: {}", msg);
- }
+ this.createTask(msg) //
+ .subscribe(x -> logger.debug("handleDmaapMsg: " + x), //
+ throwable -> logger.warn("handleDmaapMsg failure ", throwable), //
+ () -> logger.debug("handleDmaapMsg complete"));
}
Mono<String> createTask(String msg) {
private Mono<String> sendToDmaap(String body) {
try {
+ logger.debug("sendToDmaap: {} ", body);
dmaapClient.send(body);
+ dmaapClient.sendBatchWithResponse();
return Mono.just("OK");
} catch (IOException e) {
return Mono.error(e);