d240129ea4ee0703a77a0de9b2e7edd5cb977b10
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / KafkaJobDataConsumer.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2021 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.dmaapadapter.tasks;
22
23 import org.oran.dmaapadapter.repository.Job;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import org.springframework.web.reactive.function.client.WebClientResponseException;
27
28 import reactor.core.Disposable;
29 import reactor.core.publisher.Flux;
30 import reactor.core.publisher.Mono;
31 import reactor.core.publisher.Sinks.Many;
32
33 /**
34  * The class streams data from a multi cast sink and sends the data to the Job
35  * owner via REST calls.
36  */
37 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
38
39 public class KafkaJobDataConsumer {
40     private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
41     private final Many<String> input;
42     private final Job job;
43     private Disposable subscription;
44     private int errorCounter = 0;
45
46     KafkaJobDataConsumer(Many<String> input, Job job) {
47         this.input = input;
48         this.job = job;
49     }
50
51     public synchronized void start() {
52         stop();
53         this.subscription = getMessagesFromKafka(job) //
54                 .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
55                 .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
56                 .onErrorResume(this::handleError) //
57                 .subscribe(this::handleConsumerSentOk, //
58                         this::handleErrorInStream, //
59                         () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
60                                 job.getType().getId()));
61     }
62
63     public synchronized void stop() {
64         if (this.subscription != null) {
65             subscription.dispose();
66             subscription = null;
67         }
68     }
69
70     public synchronized boolean isRunning() {
71         return this.subscription != null;
72     }
73
74     private Flux<String> getMessagesFromKafka(Job job) {
75         Flux<String> result = input.asFlux() //
76                 .filter(job::isFilterMatch);
77
78         if (job.isBuffered()) {
79             result = result.bufferTimeout( //
80                     job.getParameters().getBufferTimeout().getMaxSize(), //
81                     job.getParameters().getBufferTimeout().getMaxTime()) //
82                     .map(Object::toString);
83         }
84         return result;
85     }
86
87     private Mono<String> handleError(Throwable t) {
88         logger.warn("exception: {} job: {}", t.getMessage(), job);
89
90         final int STOP_AFTER_ERRORS = 5;
91         if (t instanceof WebClientResponseException) {
92             if (++this.errorCounter > STOP_AFTER_ERRORS) {
93                 logger.error("Stopping job {}", job);
94                 return Mono.error(t);
95             } else {
96                 return Mono.empty(); // Discard
97             }
98         } else {
99             // This can happen if there is an overflow.
100             return Mono.empty();
101         }
102     }
103
104     private void handleConsumerSentOk(String data) {
105         this.errorCounter = 0;
106     }
107
108     private void handleErrorInStream(Throwable t) {
109         logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
110         this.subscription = null;
111     }
112
113 }