2 * ========================LICENSE_START=================================
5 * Copyright (C) 2021 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.dmaapadapter.tasks;
25 import org.oran.dmaapadapter.repository.Job;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.springframework.http.MediaType;
29 import org.springframework.web.reactive.function.client.WebClientResponseException;
31 import reactor.core.Disposable;
32 import reactor.core.publisher.Flux;
33 import reactor.core.publisher.Mono;
36 * The class streams data from a multi cast sink and sends the data to the Job
37 * owner via REST calls.
39 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
40 public class KafkaJobDataConsumer {
41 private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
43 private final Job job;
44 private Disposable subscription;
45 private final ErrorStats errorStats = new ErrorStats();
47 private class ErrorStats {
48 private int consumerFaultCounter = 0;
49 private boolean kafkaError = false; // eg. overflow
51 public void handleOkFromConsumer() {
52 this.consumerFaultCounter = 0;
55 public void handleException(Throwable t) {
56 if (t instanceof WebClientResponseException) {
57 ++this.consumerFaultCounter;
63 public boolean isItHopeless() {
64 final int STOP_AFTER_ERRORS = 5;
65 return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
68 public void resetKafkaErrors() {
73 public KafkaJobDataConsumer(Job job) {
77 public synchronized void start(Flux<String> input) {
79 this.errorStats.resetKafkaErrors();
80 this.subscription = getMessagesFromKafka(input, job) //
81 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
82 .onErrorResume(this::handleError) //
83 .subscribe(this::handleConsumerSentOk, //
84 this::handleExceptionInStream, //
85 () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
88 private void handleExceptionInStream(Throwable t) {
89 logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
93 private Mono<String> postToClient(String body) {
94 logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
95 MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
96 return job.getConsumerRestClient().post("", body, contentType);
99 public synchronized void stop() {
100 if (this.subscription != null) {
101 this.subscription.dispose();
102 this.subscription = null;
106 public synchronized boolean isRunning() {
107 return this.subscription != null;
110 private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
111 Flux<String> result = input.filter(job::isFilterMatch);
113 if (job.isBuffered()) {
114 result = result.map(this::quote) //
116 job.getParameters().getBufferTimeout().getMaxSize(), //
117 job.getParameters().getBufferTimeout().getMaxTime()) //
118 .map(Object::toString);
123 private String quote(String str) {
124 final String q = "\"";
125 return q + str.replace(q, "\\\"") + q;
128 private Mono<String> handleError(Throwable t) {
129 logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
130 this.errorStats.handleException(t);
131 if (this.errorStats.isItHopeless()) {
132 return Mono.error(t);
134 return Mono.empty(); // Ignore
138 private void handleConsumerSentOk(String data) {
139 this.errorStats.handleOkFromConsumer();