2 * ========================LICENSE_START=================================
5 * Copyright (C) 2021 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oran.dmaapadapter.tasks;
25 import org.oran.dmaapadapter.repository.Job;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.springframework.http.MediaType;
29 import org.springframework.web.reactive.function.client.WebClientResponseException;
31 import reactor.core.Disposable;
32 import reactor.core.publisher.Flux;
33 import reactor.core.publisher.Mono;
34 import reactor.core.publisher.Sinks.Many;
37 * The class streams data from a multi cast sink and sends the data to the Job
38 * owner via REST calls.
40 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
41 public class KafkaJobDataConsumer {
42 private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
44 private final Job job;
45 private Disposable subscription;
46 private final ErrorStats errorStats = new ErrorStats();
48 private class ErrorStats {
49 private int consumerFaultCounter = 0;
50 private boolean kafkaError = false; // eg. overflow
52 public void handleOkFromConsumer() {
53 this.consumerFaultCounter = 0;
56 public void handleException(Throwable t) {
57 if (t instanceof WebClientResponseException) {
58 ++this.consumerFaultCounter;
64 public boolean isItHopeless() {
65 final int STOP_AFTER_ERRORS = 5;
66 return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
69 public void resetKafkaErrors() {
74 public KafkaJobDataConsumer(Job job) {
78 public synchronized void start(Many<String> input) {
80 this.errorStats.resetKafkaErrors();
81 this.subscription = getMessagesFromKafka(input, job) //
82 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
83 .onErrorResume(this::handleError) //
84 .subscribe(this::handleConsumerSentOk, //
86 () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
89 private Mono<String> postToClient(String body) {
90 logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
91 MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
92 return job.getConsumerRestClient().post("", body, contentType);
95 public synchronized void stop() {
96 if (this.subscription != null) {
97 subscription.dispose();
102 public synchronized boolean isRunning() {
103 return this.subscription != null;
106 private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
107 Flux<String> result = input.asFlux() //
108 .filter(job::isFilterMatch);
110 if (job.isBuffered()) {
111 result = result.map(this::quote) //
113 job.getParameters().getBufferTimeout().getMaxSize(), //
114 job.getParameters().getBufferTimeout().getMaxTime()) //
115 .map(Object::toString);
120 private String quote(String str) {
121 final String q = "\"";
122 return q + str.replace(q, "\\\"") + q;
125 private Mono<String> handleError(Throwable t) {
126 logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
127 this.errorStats.handleException(t);
128 if (this.errorStats.isItHopeless()) {
129 return Mono.error(t);
131 return Mono.empty(); // Ignore
135 private void handleConsumerSentOk(String data) {
136 this.errorStats.handleOkFromConsumer();