NONRTRIC - Implement DMaaP mediator producer service in Java 72/7072/11
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 16 Nov 2021 09:58:25 +0000 (10:58 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 17 Nov 2021 09:07:13 +0000 (10:07 +0100)
Improved resilence from Kafka errors. After an overflow, the stream will be restarted (currently after max 3 minutes).

In Kafka messages: if buffered, received message will be quoted.
When delivered to consumer, the REST content type will be JSON if buffered. If not buffered, no content type is sent.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Id0fb2b572a491d32300b1d11a7794a97371ac074

dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 6939026..8b3efed 100644 (file)
@@ -62,7 +62,8 @@ public class AsyncRestClient {
         this.httpProxyConfig = httpProxyConfig;
     }
 
-    public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+    public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body,
+            @Nullable MediaType contentType) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
@@ -71,17 +72,18 @@ public class AsyncRestClient {
         RequestHeadersSpec<?> request = getWebClient() //
                 .post() //
                 .uri(uri) //
-                .contentType(MediaType.APPLICATION_JSON) //
+                .contentType(contentType) //
                 .body(bodyProducer, String.class);
         return retrieve(traceTag, request);
     }
 
-    public Mono<String> post(String uri, @Nullable String body) {
-        return postForEntity(uri, body) //
+    public Mono<String> post(String uri, @Nullable String body, @Nullable MediaType contentType) {
+        return postForEntity(uri, body, contentType) //
                 .map(this::toBody);
     }
 
-    public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
+    public Mono<String> postWithAuthHeader(String uri, String body, String username, String password,
+            MediaType mediaType) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
@@ -90,7 +92,7 @@ public class AsyncRestClient {
                 .post() //
                 .uri(uri) //
                 .headers(headers -> headers.setBasicAuth(username, password)) //
-                .contentType(MediaType.APPLICATION_JSON) //
+                .contentType(mediaType) //
                 .bodyValue(body);
         return retrieve(traceTag, request) //
                 .map(this::toBody);
index 507d9b6..217a072 100644 (file)
@@ -29,6 +29,7 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
@@ -128,7 +129,7 @@ public class DmaapTopicConsumer {
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
                 .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
-                .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
+                .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }
 }
index d240129..5550ce0 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import lombok.Getter;
+
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.Disposable;
@@ -35,29 +38,58 @@ import reactor.core.publisher.Sinks.Many;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-
 public class KafkaJobDataConsumer {
     private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
-    private final Many<String> input;
+    @Getter
     private final Job job;
     private Disposable subscription;
-    private int errorCounter = 0;
+    private final ErrorStats errorStats = new ErrorStats();
+
+    private class ErrorStats {
+        private int consumerFaultCounter = 0;
+        private boolean kafkaError = false; // eg. overflow
+
+        public void handleOkFromConsumer() {
+            this.consumerFaultCounter = 0;
+        }
+
+        public void handleException(Throwable t) {
+            if (t instanceof WebClientResponseException) {
+                ++this.consumerFaultCounter;
+            } else {
+                kafkaError = true;
+            }
+        }
 
-    KafkaJobDataConsumer(Many<String> input, Job job) {
-        this.input = input;
+        public boolean isItHopeless() {
+            final int STOP_AFTER_ERRORS = 5;
+            return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
+        }
+
+        public void resetKafkaErrors() {
+            kafkaError = false;
+        }
+    }
+
+    public KafkaJobDataConsumer(Job job) {
         this.job = job;
     }
 
-    public synchronized void start() {
+    public synchronized void start(Many<String> input) {
         stop();
-        this.subscription = getMessagesFromKafka(job) //
-                .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
-                .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
+        this.errorStats.resetKafkaErrors();
+        this.subscription = getMessagesFromKafka(input, job) //
+                .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
-                        this::handleErrorInStream, //
-                        () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
-                                job.getType().getId()));
+                        t -> stop(), //
+                        () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+    }
+
+    private Mono<String> postToClient(String body) {
+        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
+        MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
+        return job.getConsumerRestClient().post("", body, contentType);
     }
 
     public synchronized void stop() {
@@ -71,43 +103,37 @@ public class KafkaJobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> getMessagesFromKafka(Job job) {
+    private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
         Flux<String> result = input.asFlux() //
                 .filter(job::isFilterMatch);
 
         if (job.isBuffered()) {
-            result = result.bufferTimeout( //
-                    job.getParameters().getBufferTimeout().getMaxSize(), //
-                    job.getParameters().getBufferTimeout().getMaxTime()) //
+            result = result.map(this::quote) //
+                    .bufferTimeout( //
+                            job.getParameters().getBufferTimeout().getMaxSize(), //
+                            job.getParameters().getBufferTimeout().getMaxTime()) //
                     .map(Object::toString);
         }
         return result;
     }
 
-    private Mono<String> handleError(Throwable t) {
-        logger.warn("exception: {} job: {}", t.getMessage(), job);
+    private String quote(String str) {
+        final String q = "\"";
+        return q + str.replace(q, "\\\"") + q;
+    }
 
-        final int STOP_AFTER_ERRORS = 5;
-        if (t instanceof WebClientResponseException) {
-            if (++this.errorCounter > STOP_AFTER_ERRORS) {
-                logger.error("Stopping job {}", job);
-                return Mono.error(t);
-            } else {
-                return Mono.empty(); // Discard
-            }
+    private Mono<String> handleError(Throwable t) {
+        logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+        this.errorStats.handleException(t);
+        if (this.errorStats.isItHopeless()) {
+            return Mono.error(t);
         } else {
-            // This can happen if there is an overflow.
-            return Mono.empty();
+            return Mono.empty(); // Ignore
         }
     }
 
     private void handleConsumerSentOk(String data) {
-        this.errorCounter = 0;
-    }
-
-    private void handleErrorInStream(Throwable t) {
-        logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
-        this.subscription = null;
+        this.errorStats.handleOkFromConsumer();
     }
 
 }
index 785f98b..0ed85c6 100644 (file)
@@ -43,9 +43,10 @@ import org.springframework.stereotype.Component;
 public class KafkaTopicConsumers {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
 
-    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
+
     @Getter
-    private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
+    private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
 
     private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
 
@@ -74,17 +75,17 @@ public class KafkaTopicConsumers {
     }
 
     public synchronized void addJob(Job job) {
-        if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+        if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
             logger.debug("Kafka job added {}", job.getId());
             KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
-            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
-            subscription.start();
-            activeSubscriptions.put(job.getId(), subscription);
+            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
+            subscription.start(topicConsumer.getOutput());
+            consumers.put(job.getId(), subscription);
         }
     }
 
     public synchronized void removeJob(Job job) {
-        KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
+        KafkaJobDataConsumer d = consumers.remove(job.getId());
         if (d != null) {
             logger.debug("Kafka job removed {}", job.getId());
             d.stop();
@@ -93,11 +94,26 @@ public class KafkaTopicConsumers {
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
     public synchronized void restartNonRunningTasks() {
-        for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+
+        for (KafkaJobDataConsumer consumer : consumers.values()) {
             if (!consumer.isRunning()) {
-                consumer.start();
+                restartTopic(consumer);
             }
         }
     }
 
+    private void restartTopic(KafkaJobDataConsumer consumer) {
+        InfoType type = consumer.getJob().getType();
+        KafkaTopicListener topic = this.topicListeners.get(type.getId());
+        topic.start();
+        restartConsumersOfType(topic, type);
+    }
+
+    private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
+        this.consumers.forEach((jobId, consumer) -> {
+            if (consumer.getJob().getType().getId().equals(type.getId())) {
+                consumer.start(topic.getOutput());
+            }
+        });
+    }
 }
index 0452b88..d1045ee 100644 (file)
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -48,24 +47,25 @@ public class KafkaTopicListener {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
-    private final Many<String> output;
+    private Many<String> output;
+    private Disposable topicReceiverTask;
 
     public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
-
-        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
-        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
         this.type = type;
-        startKafkaTopicReceiver();
+        start();
     }
 
     public Many<String> getOutput() {
         return this.output;
     }
 
-    private Disposable startKafkaTopicReceiver() {
-        return KafkaReceiver.create(kafkaInputProperties()) //
+    public void start() {
+        stop();
+        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+        logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
+        topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
                 .receive() //
                 .doOnNext(this::onReceivedData) //
                 .subscribe(null, //
@@ -73,7 +73,14 @@ public class KafkaTopicListener {
                         () -> logger.warn("KafkaTopicReceiver stopped"));
     }
 
-    private void onReceivedData(ConsumerRecord<Integer, String> input) {
+    private void stop() {
+        if (topicReceiverTask != null) {
+            topicReceiverTask.dispose();
+            topicReceiverTask = null;
+        }
+    }
+
+    private void onReceivedData(ConsumerRecord<String, String> input) {
         logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
         output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
     }
@@ -82,17 +89,17 @@ public class KafkaTopicListener {
         logger.error("KafkaTopicReceiver error: {}", t.getMessage());
     }
 
-    private ReceiverOptions<Integer, String> kafkaInputProperties() {
+    private ReceiverOptions<String, String> kafkaInputProperties() {
         Map<String, Object> consumerProps = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
-        return ReceiverOptions.<Integer, String>create(consumerProps)
+        return ReceiverOptions.<String, String>create(consumerProps)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
     }
 
index 7b719e3..8b5b6cf 100644 (file)
@@ -87,7 +87,6 @@ public class ProducerRegstrationTask {
     }
 
     private void handleRegistrationCompleted() {
-        logger.debug("Registering types and producer completed");
         isRegisteredInEcs = true;
     }
 
@@ -106,7 +105,7 @@ public class ProducerRegstrationTask {
     private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
         ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
         if (isEqual(producerRegistrationInfo(), registerredInfo)) {
-            logger.trace("Already registered");
+            logger.trace("Already registered in ECS");
             return Mono.just(Boolean.TRUE);
         } else {
             return Mono.just(Boolean.FALSE);
index 1ca4fac..287c95e 100644 (file)
@@ -227,7 +227,8 @@ class ApplicationTest {
 
         ProducerJobInfo info = new ProducerJobInfo(null, "id", "typeId", "targetUri", "owner", "lastUpdated");
         String body = gson.toJson(info);
-        testErrorCode(restClient().post(jobUrl, body), HttpStatus.NOT_FOUND, "Could not find type");
+        testErrorCode(restClient().post(jobUrl, body, MediaType.APPLICATION_JSON), HttpStatus.NOT_FOUND,
+                "Could not find type");
     }
 
     @Test
index 8d1dda6..1cf8903 100644 (file)
@@ -105,7 +105,7 @@ public class EcsSimulatorController {
                 new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP");
         String body = gson.toJson(request);
         logger.info("ECS Simulator PUT job: {}", body);
-        restClient.post(url, body).block();
+        restClient.post(url, body, MediaType.APPLICATION_JSON).block();
     }
 
     public void deleteJob(String jobId, AsyncRestClient restClient) {
index a0db58a..470e114 100644 (file)
@@ -262,20 +262,24 @@ class IntegrationWithKafka {
         var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend);
 
-        verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]");
+        verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+
+        // Just for testing quoting
+        this.consumerController.testResults.reset();
+        dataToSend = Flux.just(senderRecord("Message\"_", 1));
+        sendDataToStream(dataToSend);
+        verifiedReceivedByConsumer("[\"Message\\\"_1\"]");
 
         // Delete the jobs
         this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
         this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty());
     }
 
     @Test
     void kafkaIOverflow() throws InterruptedException {
-        // This does not work. After an overflow, the kafka stream does not seem to work
-        //
         final String JOB_ID1 = "ID1";
         final String JOB_ID2 = "ID2";
 
@@ -290,18 +294,20 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
         var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
-        sendDataToStream(dataToSend); // this will overflow
+        sendDataToStream(dataToSend); // this should overflow
 
-        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next();
         await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
         this.consumerController.testResults.reset();
 
         kafkaTopicConsumers.restartNonRunningTasks();
+        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+        Thread.sleep(1000); // Restarting the input seems to take some asynch time
 
-        dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+        dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i));
         sendDataToStream(dataToSend);
 
-        verifiedReceivedByConsumer("Message__1", "Message__1");
+        verifiedReceivedByConsumer("Howdy_1");
     }
 
 }