2a16f47598785d5890a66e216481c573c60e14c6
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaJobDataConsumer.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2021 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.dmaapadapter.tasks;
22
23 import lombok.Getter;
24
25 import org.oran.dmaapadapter.repository.Job;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.springframework.http.MediaType;
29 import org.springframework.web.reactive.function.client.WebClientResponseException;
30
31 import reactor.core.Disposable;
32 import reactor.core.publisher.Flux;
33 import reactor.core.publisher.Mono;
34
35 /**
36  * The class streams data from a multi cast sink and sends the data to the Job
37  * owner via REST calls.
38  */
39 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
40 public class KafkaJobDataConsumer {
41     private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
42     @Getter
43     private final Job job;
44     private Disposable subscription;
45     private final ErrorStats errorStats = new ErrorStats();
46
47     private class ErrorStats {
48         private int consumerFaultCounter = 0;
49         private boolean kafkaError = false; // eg. overflow
50
51         public void handleOkFromConsumer() {
52             this.consumerFaultCounter = 0;
53         }
54
55         public void handleException(Throwable t) {
56             if (t instanceof WebClientResponseException) {
57                 ++this.consumerFaultCounter;
58             } else {
59                 kafkaError = true;
60             }
61         }
62
63         public boolean isItHopeless() {
64             final int STOP_AFTER_ERRORS = 5;
65             return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
66         }
67
68         public void resetKafkaErrors() {
69             kafkaError = false;
70         }
71     }
72
73     public KafkaJobDataConsumer(Job job) {
74         this.job = job;
75     }
76
77     public synchronized void start(Flux<String> input) {
78         stop();
79         this.errorStats.resetKafkaErrors();
80         this.subscription = getMessagesFromKafka(input, job) //
81                 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
82                 .onErrorResume(this::handleError) //
83                 .subscribe(this::handleConsumerSentOk, //
84                         this::handleExceptionInStream, //
85                         () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
86     }
87
88     private void handleExceptionInStream(Throwable t) {
89         logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
90         stop();
91     }
92
93     private Mono<String> postToClient(String body) {
94         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
95         MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
96         return job.getConsumerRestClient().post("", body, contentType);
97     }
98
99     public synchronized void stop() {
100         if (this.subscription != null) {
101             this.subscription.dispose();
102             this.subscription = null;
103         }
104     }
105
106     public synchronized boolean isRunning() {
107         return this.subscription != null;
108     }
109
110     private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
111         Flux<String> result = input.filter(job::isFilterMatch);
112
113         if (job.isBuffered()) {
114             result = result.map(this::quote) //
115                     .bufferTimeout( //
116                             job.getParameters().getBufferTimeout().getMaxSize(), //
117                             job.getParameters().getBufferTimeout().getMaxTime()) //
118                     .map(Object::toString);
119         }
120         return result;
121     }
122
123     private String quote(String str) {
124         final String q = "\"";
125         return q + str.replace(q, "\\\"") + q;
126     }
127
128     private Mono<String> handleError(Throwable t) {
129         logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
130         this.errorStats.handleException(t);
131         if (this.errorStats.isItHopeless()) {
132             return Mono.error(t);
133         } else {
134             return Mono.empty(); // Ignore
135         }
136     }
137
138     private void handleConsumerSentOk(String data) {
139         this.errorStats.handleOkFromConsumer();
140     }
141
142 }