NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaJobDataConsumer.java
index 5550ce0..f677502 100644 (file)
@@ -82,10 +82,15 @@ public class KafkaJobDataConsumer {
                 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
-                        t -> stop(), //
+                        this::handleExceptionInStream, //
                         () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
     }
 
+    private void handleExceptionInStream(Throwable t) {
+        logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+        stop();
+    }
+
     private Mono<String> postToClient(String body) {
         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
         MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;