Improved resilence from Kafka errors. After an overflow, the stream will be restarted (currently after max 3 minutes).
In Kafka messages: if buffered, received message will be quoted.
When delivered to consumer, the REST content type will be JSON if buffered. If not buffered, no content type is sent.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Id0fb2b572a491d32300b1d11a7794a97371ac074
this.httpProxyConfig = httpProxyConfig;
}
- public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+ public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body,
+ @Nullable MediaType contentType) {
Object traceTag = createTraceTag();
logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
RequestHeadersSpec<?> request = getWebClient() //
.post() //
.uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
+ .contentType(contentType) //
.body(bodyProducer, String.class);
return retrieve(traceTag, request);
}
- public Mono<String> post(String uri, @Nullable String body) {
- return postForEntity(uri, body) //
+ public Mono<String> post(String uri, @Nullable String body, @Nullable MediaType contentType) {
+ return postForEntity(uri, body, contentType) //
.map(this::toBody);
}
- public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
+ public Mono<String> postWithAuthHeader(String uri, String body, String username, String password,
+ MediaType mediaType) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
.post() //
.uri(uri) //
.headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
+ .contentType(mediaType) //
.bodyValue(body);
return retrieve(traceTag, request) //
.map(this::toBody);
import org.oran.dmaapadapter.repository.Jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
- .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
+ .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}
}
package org.oran.dmaapadapter.tasks;
+import lombok.Getter;
+
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.Disposable;
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-
public class KafkaJobDataConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
- private final Many<String> input;
+ @Getter
private final Job job;
private Disposable subscription;
- private int errorCounter = 0;
+ private final ErrorStats errorStats = new ErrorStats();
+
+ private class ErrorStats {
+ private int consumerFaultCounter = 0;
+ private boolean kafkaError = false; // eg. overflow
+
+ public void handleOkFromConsumer() {
+ this.consumerFaultCounter = 0;
+ }
+
+ public void handleException(Throwable t) {
+ if (t instanceof WebClientResponseException) {
+ ++this.consumerFaultCounter;
+ } else {
+ kafkaError = true;
+ }
+ }
- KafkaJobDataConsumer(Many<String> input, Job job) {
- this.input = input;
+ public boolean isItHopeless() {
+ final int STOP_AFTER_ERRORS = 5;
+ return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
+ }
+
+ public void resetKafkaErrors() {
+ kafkaError = false;
+ }
+ }
+
+ public KafkaJobDataConsumer(Job job) {
this.job = job;
}
- public synchronized void start() {
+ public synchronized void start(Many<String> input) {
stop();
- this.subscription = getMessagesFromKafka(job) //
- .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
- .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
+ this.errorStats.resetKafkaErrors();
+ this.subscription = getMessagesFromKafka(input, job) //
+ .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
- this::handleErrorInStream, //
- () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
- job.getType().getId()));
+ t -> stop(), //
+ () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+ }
+
+ private Mono<String> postToClient(String body) {
+ logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
+ MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
+ return job.getConsumerRestClient().post("", body, contentType);
}
public synchronized void stop() {
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Job job) {
+ private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
Flux<String> result = input.asFlux() //
.filter(job::isFilterMatch);
if (job.isBuffered()) {
- result = result.bufferTimeout( //
- job.getParameters().getBufferTimeout().getMaxSize(), //
- job.getParameters().getBufferTimeout().getMaxTime()) //
+ result = result.map(this::quote) //
+ .bufferTimeout( //
+ job.getParameters().getBufferTimeout().getMaxSize(), //
+ job.getParameters().getBufferTimeout().getMaxTime()) //
.map(Object::toString);
}
return result;
}
- private Mono<String> handleError(Throwable t) {
- logger.warn("exception: {} job: {}", t.getMessage(), job);
+ private String quote(String str) {
+ final String q = "\"";
+ return q + str.replace(q, "\\\"") + q;
+ }
- final int STOP_AFTER_ERRORS = 5;
- if (t instanceof WebClientResponseException) {
- if (++this.errorCounter > STOP_AFTER_ERRORS) {
- logger.error("Stopping job {}", job);
- return Mono.error(t);
- } else {
- return Mono.empty(); // Discard
- }
+ private Mono<String> handleError(Throwable t) {
+ logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+ this.errorStats.handleException(t);
+ if (this.errorStats.isItHopeless()) {
+ return Mono.error(t);
} else {
- // This can happen if there is an overflow.
- return Mono.empty();
+ return Mono.empty(); // Ignore
}
}
private void handleConsumerSentOk(String data) {
- this.errorCounter = 0;
- }
-
- private void handleErrorInStream(Throwable t) {
- logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
- this.subscription = null;
+ this.errorStats.handleOkFromConsumer();
}
}
public class KafkaTopicConsumers {
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
- private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+ private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
+
@Getter
- private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
+ private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
}
public synchronized void addJob(Job job) {
- if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+ if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
logger.debug("Kafka job added {}", job.getId());
KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
- KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
- subscription.start();
- activeSubscriptions.put(job.getId(), subscription);
+ KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
+ subscription.start(topicConsumer.getOutput());
+ consumers.put(job.getId(), subscription);
}
}
public synchronized void removeJob(Job job) {
- KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
+ KafkaJobDataConsumer d = consumers.remove(job.getId());
if (d != null) {
logger.debug("Kafka job removed {}", job.getId());
d.stop();
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
public synchronized void restartNonRunningTasks() {
- for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+
+ for (KafkaJobDataConsumer consumer : consumers.values()) {
if (!consumer.isRunning()) {
- consumer.start();
+ restartTopic(consumer);
}
}
}
+ private void restartTopic(KafkaJobDataConsumer consumer) {
+ InfoType type = consumer.getJob().getType();
+ KafkaTopicListener topic = this.topicListeners.get(type.getId());
+ topic.start();
+ restartConsumersOfType(topic, type);
+ }
+
+ private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
+ this.consumers.forEach((jobId, consumer) -> {
+ if (consumer.getJob().getType().getId().equals(type.getId())) {
+ consumer.start(topic.getOutput());
+ }
+ });
+ }
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private final Many<String> output;
+ private Many<String> output;
+ private Disposable topicReceiverTask;
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
-
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
- this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
this.type = type;
- startKafkaTopicReceiver();
+ start();
}
public Many<String> getOutput() {
return this.output;
}
- private Disposable startKafkaTopicReceiver() {
- return KafkaReceiver.create(kafkaInputProperties()) //
+ public void start() {
+ stop();
+ final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+ this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+ logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
+ topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
.doOnNext(this::onReceivedData) //
.subscribe(null, //
() -> logger.warn("KafkaTopicReceiver stopped"));
}
- private void onReceivedData(ConsumerRecord<Integer, String> input) {
+ private void stop() {
+ if (topicReceiverTask != null) {
+ topicReceiverTask.dispose();
+ topicReceiverTask = null;
+ }
+ }
+
+ private void onReceivedData(ConsumerRecord<String, String> input) {
logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
}
logger.error("KafkaTopicReceiver error: {}", t.getMessage());
}
- private ReceiverOptions<Integer, String> kafkaInputProperties() {
+ private ReceiverOptions<String, String> kafkaInputProperties() {
Map<String, Object> consumerProps = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return ReceiverOptions.<Integer, String>create(consumerProps)
+ return ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
}
}
private void handleRegistrationCompleted() {
- logger.debug("Registering types and producer completed");
isRegisteredInEcs = true;
}
private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered");
+ logger.trace("Already registered in ECS");
return Mono.just(Boolean.TRUE);
} else {
return Mono.just(Boolean.FALSE);
ProducerJobInfo info = new ProducerJobInfo(null, "id", "typeId", "targetUri", "owner", "lastUpdated");
String body = gson.toJson(info);
- testErrorCode(restClient().post(jobUrl, body), HttpStatus.NOT_FOUND, "Could not find type");
+ testErrorCode(restClient().post(jobUrl, body, MediaType.APPLICATION_JSON), HttpStatus.NOT_FOUND,
+ "Could not find type");
}
@Test
new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP");
String body = gson.toJson(request);
logger.info("ECS Simulator PUT job: {}", body);
- restClient.post(url, body).block();
+ restClient.post(url, body, MediaType.APPLICATION_JSON).block();
}
public void deleteJob(String jobId, AsyncRestClient restClient) {
var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]");
+ verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+
+ // Just for testing quoting
+ this.consumerController.testResults.reset();
+ dataToSend = Flux.just(senderRecord("Message\"_", 1));
+ sendDataToStream(dataToSend);
+ verifiedReceivedByConsumer("[\"Message\\\"_1\"]");
// Delete the jobs
this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty());
}
@Test
void kafkaIOverflow() throws InterruptedException {
- // This does not work. After an overflow, the kafka stream does not seem to work
- //
final String JOB_ID1 = "ID1";
final String JOB_ID2 = "ID2";
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
- sendDataToStream(dataToSend); // this will overflow
+ sendDataToStream(dataToSend); // this should overflow
- KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+ KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next();
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
kafkaTopicConsumers.restartNonRunningTasks();
+ this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+ Thread.sleep(1000); // Restarting the input seems to take some asynch time
- dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+ dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i));
sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("Message__1", "Message__1");
+ verifiedReceivedByConsumer("Howdy_1");
}
}