- this::handleErrorInStream, //
- () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
- job.getType().getId()));
+ this::handleExceptionInStream, //
+ () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+ }
+
+ private void handleExceptionInStream(Throwable t) {
+ logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+ stop();
+ }
+
+ private Mono<String> postToClient(String body) {
+ logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
+ MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
+ return job.getConsumerRestClient().post("", body, contentType);