}
public void handleDmaapMsg(String msg) {
- this.createTask(msg) //
- .subscribe(message -> logger.debug("handleDmaapMsg: {}", message), //
- throwable -> logger.warn("handleDmaapMsg failure {}", throwable.getMessage()), //
- () -> logger.debug("handleDmaapMsg complete"));
+ try {
+ String result = this.createTask(msg).block();
+ logger.debug("handleDmaapMsg: {}", result);
+ } catch (Exception throwable) {
+ logger.warn("handleDmaapMsg failure {}", throwable.getMessage());
+ }
}
Mono<String> createTask(String msg) {
.flatMap(
response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
} catch (Exception e) {
- logger.warn("Received unparsable message from DMAAP: {}", msg);
- return Mono.error(e); // Cannot make any response
+ String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage();
+ return Mono.error(new ServiceException(errorMsg)); // Cannot make any response
}
}