From: PatrikBuhr Date: Tue, 16 Nov 2021 09:58:25 +0000 (+0100) Subject: NONRTRIC - Implement DMaaP mediator producer service in Java X-Git-Tag: 1.2.0~43^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=ce1d9f2d3e1d2713289dc4d2b5c246f99ec65160;p=nonrtric.git NONRTRIC - Implement DMaaP mediator producer service in Java Improved resilence from Kafka errors. After an overflow, the stream will be restarted (currently after max 3 minutes). In Kafka messages: if buffered, received message will be quoted. When delivered to consumer, the REST content type will be JSON if buffered. If not buffered, no content type is sent. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: Id0fb2b572a491d32300b1d11a7794a97371ac074 --- diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java index 6939026d..8b3efed5 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -62,7 +62,8 @@ public class AsyncRestClient { this.httpProxyConfig = httpProxyConfig; } - public Mono> postForEntity(String uri, @Nullable String body) { + public Mono> postForEntity(String uri, @Nullable String body, + @Nullable MediaType contentType) { Object traceTag = createTraceTag(); logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); @@ -71,17 +72,18 @@ public class AsyncRestClient { RequestHeadersSpec request = getWebClient() // .post() // .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // + .contentType(contentType) // .body(bodyProducer, String.class); return retrieve(traceTag, request); } - public Mono post(String uri, @Nullable String body) { - return postForEntity(uri, body) // + public Mono post(String uri, @Nullable String body, @Nullable MediaType contentType) { + return postForEntity(uri, body, contentType) // .map(this::toBody); } - public Mono postWithAuthHeader(String uri, String body, String username, String password) { + public Mono postWithAuthHeader(String uri, String body, String username, String password, + MediaType mediaType) { Object traceTag = createTraceTag(); logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); @@ -90,7 +92,7 @@ public class AsyncRestClient { .post() // .uri(uri) // .headers(headers -> headers.setBasicAuth(username, password)) // - .contentType(MediaType.APPLICATION_JSON) // + .contentType(mediaType) // .bodyValue(body); return retrieve(traceTag, request) // .map(this::toBody); diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index 507d9b6b..217a0723 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -29,6 +29,7 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.Jobs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -128,7 +129,7 @@ public class DmaapTopicConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) // - .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) // + .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index d240129e..5550ce0e 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -20,9 +20,12 @@ package org.oran.dmaapadapter.tasks; +import lombok.Getter; + import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.Disposable; @@ -35,29 +38,58 @@ import reactor.core.publisher.Sinks.Many; * owner via REST calls. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally - public class KafkaJobDataConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class); - private final Many input; + @Getter private final Job job; private Disposable subscription; - private int errorCounter = 0; + private final ErrorStats errorStats = new ErrorStats(); + + private class ErrorStats { + private int consumerFaultCounter = 0; + private boolean kafkaError = false; // eg. overflow + + public void handleOkFromConsumer() { + this.consumerFaultCounter = 0; + } + + public void handleException(Throwable t) { + if (t instanceof WebClientResponseException) { + ++this.consumerFaultCounter; + } else { + kafkaError = true; + } + } - KafkaJobDataConsumer(Many input, Job job) { - this.input = input; + public boolean isItHopeless() { + final int STOP_AFTER_ERRORS = 5; + return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS; + } + + public void resetKafkaErrors() { + kafkaError = false; + } + } + + public KafkaJobDataConsumer(Job job) { this.job = job; } - public synchronized void start() { + public synchronized void start(Many input) { stop(); - this.subscription = getMessagesFromKafka(job) // - .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data)) - .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) // + this.errorStats.resetKafkaErrors(); + this.subscription = getMessagesFromKafka(input, job) // + .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleConsumerSentOk, // - this::handleErrorInStream, // - () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(), - job.getType().getId())); + t -> stop(), // + () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId())); + } + + private Mono postToClient(String body) { + logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body); + MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null; + return job.getConsumerRestClient().post("", body, contentType); } public synchronized void stop() { @@ -71,43 +103,37 @@ public class KafkaJobDataConsumer { return this.subscription != null; } - private Flux getMessagesFromKafka(Job job) { + private Flux getMessagesFromKafka(Many input, Job job) { Flux result = input.asFlux() // .filter(job::isFilterMatch); if (job.isBuffered()) { - result = result.bufferTimeout( // - job.getParameters().getBufferTimeout().getMaxSize(), // - job.getParameters().getBufferTimeout().getMaxTime()) // + result = result.map(this::quote) // + .bufferTimeout( // + job.getParameters().getBufferTimeout().getMaxSize(), // + job.getParameters().getBufferTimeout().getMaxTime()) // .map(Object::toString); } return result; } - private Mono handleError(Throwable t) { - logger.warn("exception: {} job: {}", t.getMessage(), job); + private String quote(String str) { + final String q = "\""; + return q + str.replace(q, "\\\"") + q; + } - final int STOP_AFTER_ERRORS = 5; - if (t instanceof WebClientResponseException) { - if (++this.errorCounter > STOP_AFTER_ERRORS) { - logger.error("Stopping job {}", job); - return Mono.error(t); - } else { - return Mono.empty(); // Discard - } + private Mono handleError(Throwable t) { + logger.warn("exception: {} job: {}", t.getMessage(), job.getId()); + this.errorStats.handleException(t); + if (this.errorStats.isItHopeless()) { + return Mono.error(t); } else { - // This can happen if there is an overflow. - return Mono.empty(); + return Mono.empty(); // Ignore } } private void handleConsumerSentOk(String data) { - this.errorCounter = 0; - } - - private void handleErrorInStream(Throwable t) { - logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage()); - this.subscription = null; + this.errorStats.handleOkFromConsumer(); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 785f98bf..0ed85c6a 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -43,9 +43,10 @@ import org.springframework.stereotype.Component; public class KafkaTopicConsumers { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class); - private final Map topicListeners = new HashMap<>(); + private final Map topicListeners = new HashMap<>(); // Key is typeId + @Getter - private final Map activeSubscriptions = new HashMap<>(); + private final Map consumers = new HashMap<>(); // Key is jobId private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; @@ -74,17 +75,17 @@ public class KafkaTopicConsumers { } public synchronized void addJob(Job job) { - if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) { + if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) { logger.debug("Kafka job added {}", job.getId()); KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId()); - KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job); - subscription.start(); - activeSubscriptions.put(job.getId(), subscription); + KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job); + subscription.start(topicConsumer.getOutput()); + consumers.put(job.getId(), subscription); } } public synchronized void removeJob(Job job) { - KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId()); + KafkaJobDataConsumer d = consumers.remove(job.getId()); if (d != null) { logger.debug("Kafka job removed {}", job.getId()); d.stop(); @@ -93,11 +94,26 @@ public class KafkaTopicConsumers { @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) public synchronized void restartNonRunningTasks() { - for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) { + + for (KafkaJobDataConsumer consumer : consumers.values()) { if (!consumer.isRunning()) { - consumer.start(); + restartTopic(consumer); } } } + private void restartTopic(KafkaJobDataConsumer consumer) { + InfoType type = consumer.getJob().getType(); + KafkaTopicListener topic = this.topicListeners.get(type.getId()); + topic.start(); + restartConsumersOfType(topic, type); + } + + private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { + this.consumers.forEach((jobId, consumer) -> { + if (consumer.getJob().getType().getId().equals(type.getId())) { + consumer.start(topic.getOutput()); + } + }); + } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 0452b88c..d1045ee0 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -26,7 +26,6 @@ import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; @@ -48,24 +47,25 @@ public class KafkaTopicListener { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class); private final ApplicationConfig applicationConfig; private final InfoType type; - private final Many output; + private Many output; + private Disposable topicReceiverTask; public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { this.applicationConfig = applicationConfig; - - final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; - this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); - this.type = type; - startKafkaTopicReceiver(); + start(); } public Many getOutput() { return this.output; } - private Disposable startKafkaTopicReceiver() { - return KafkaReceiver.create(kafkaInputProperties()) // + public void start() { + stop(); + final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; + this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); + logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId()); + topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) // .receive() // .doOnNext(this::onReceivedData) // .subscribe(null, // @@ -73,7 +73,14 @@ public class KafkaTopicListener { () -> logger.warn("KafkaTopicReceiver stopped")); } - private void onReceivedData(ConsumerRecord input) { + private void stop() { + if (topicReceiverTask != null) { + topicReceiverTask.dispose(); + topicReceiverTask = null; + } + } + + private void onReceivedData(ConsumerRecord input) { logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value()); output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST); } @@ -82,17 +89,17 @@ public class KafkaTopicListener { logger.error("KafkaTopicReceiver error: {}", t.getMessage()); } - private ReceiverOptions kafkaInputProperties() { + private ReceiverOptions kafkaInputProperties() { Map consumerProps = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); } consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor"); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return ReceiverOptions.create(consumerProps) + return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 7b719e3c..8b5b6cfc 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -87,7 +87,6 @@ public class ProducerRegstrationTask { } private void handleRegistrationCompleted() { - logger.debug("Registering types and producer completed"); isRegisteredInEcs = true; } @@ -106,7 +105,7 @@ public class ProducerRegstrationTask { private Mono isRegisterredInfoCorrect(String registerredInfoStr) { ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); if (isEqual(producerRegistrationInfo(), registerredInfo)) { - logger.trace("Already registered"); + logger.trace("Already registered in ECS"); return Mono.just(Boolean.TRUE); } else { return Mono.just(Boolean.FALSE); diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 1ca4fac2..287c95ec 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -227,7 +227,8 @@ class ApplicationTest { ProducerJobInfo info = new ProducerJobInfo(null, "id", "typeId", "targetUri", "owner", "lastUpdated"); String body = gson.toJson(info); - testErrorCode(restClient().post(jobUrl, body), HttpStatus.NOT_FOUND, "Could not find type"); + testErrorCode(restClient().post(jobUrl, body, MediaType.APPLICATION_JSON), HttpStatus.NOT_FOUND, + "Could not find type"); } @Test diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java index 8d1dda66..1cf8903a 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java @@ -105,7 +105,7 @@ public class EcsSimulatorController { new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP"); String body = gson.toJson(request); logger.info("ECS Simulator PUT job: {}", body); - restClient.post(url, body).block(); + restClient.post(url, body, MediaType.APPLICATION_JSON).block(); } public void deleteJob(String jobId, AsyncRestClient restClient) { diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index a0db58a0..470e114e 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -262,20 +262,24 @@ class IntegrationWithKafka { var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. sendDataToStream(dataToSend); - verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]"); + verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); + + // Just for testing quoting + this.consumerController.testResults.reset(); + dataToSend = Flux.just(senderRecord("Message\"_", 1)); + sendDataToStream(dataToSend); + verifiedReceivedByConsumer("[\"Message\\\"_1\"]"); // Delete the jobs this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty()); + await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty()); } @Test void kafkaIOverflow() throws InterruptedException { - // This does not work. After an overflow, the kafka stream does not seem to work - // final String JOB_ID1 = "ID1"; final String JOB_ID2 = "ID2"; @@ -290,18 +294,20 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. - sendDataToStream(dataToSend); // this will overflow + sendDataToStream(dataToSend); // this should overflow - KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next(); + KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next(); await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); this.consumerController.testResults.reset(); kafkaTopicConsumers.restartNonRunningTasks(); + this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job + Thread.sleep(1000); // Restarting the input seems to take some asynch time - dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1 + dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i)); sendDataToStream(dataToSend); - verifiedReceivedByConsumer("Message__1", "Message__1"); + verifiedReceivedByConsumer("Howdy_1"); } }