From b3896f4ad7912be9e12c05e7d4770fa39752d797 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Mon, 15 Nov 2021 12:26:01 +0100 Subject: [PATCH] NONRTRIC - Implement DMaaP mediator producer service in Java Added a possibility fora consumer to choose how many concurrent REST sessions it can handle. If correct message sequence it to be maintained, this must be set to 1. Taking a rest if the consumer is unavialable. Trying to restart strems at overflow, which does not work. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: I05ffe17dd39fb7a20aeb394b258c2b0cb0f4c638 --- .../java/org/oran/dmaapadapter/BeanFactory.java | 23 +---- .../java/org/oran/dmaapadapter/repository/Job.java | 13 ++- .../org/oran/dmaapadapter/repository/Jobs.java | 37 +++++-- .../dmaapadapter/tasks/DmaapTopicConsumer.java | 1 - .../dmaapadapter/tasks/DmaapTopicConsumers.java | 43 ++++++++ .../dmaapadapter/tasks/KafkaJobDataConsumer.java | 113 +++++++++++++++++++++ .../dmaapadapter/tasks/KafkaTopicConsumers.java | 60 +++++++---- ...aTopicConsumer.java => KafkaTopicListener.java} | 56 +++------- .../src/main/resources/typeSchemaKafka.json | 8 +- .../org/oran/dmaapadapter/ConsumerController.java | 9 ++ .../oran/dmaapadapter/IntegrationWithKafka.java | 93 +++++++++++++---- 11 files changed, 338 insertions(+), 118 deletions(-) create mode 100644 dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java create mode 100644 dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java rename dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/{KafkaTopicConsumer.java => KafkaTopicListener.java} (60%) diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java index faf57426..d98a8c3b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java @@ -26,9 +26,6 @@ import org.apache.catalina.connector.Connector; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; -import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.DmaapTopicConsumer; -import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory; @@ -38,7 +35,6 @@ import org.springframework.context.annotation.Configuration; @Configuration public class BeanFactory { - private InfoTypes infoTypes; @Value("${server.http-port}") private int httpPort = 0; @@ -49,24 +45,9 @@ public class BeanFactory { } @Bean - public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs, - @Autowired KafkaTopicConsumers kafkaConsumers) { - if (infoTypes != null) { - return infoTypes; - } - + public InfoTypes types(@Autowired ApplicationConfig appConfig) { Collection types = appConfig.getTypes(); - - // Start a consumer for each type - for (InfoType type : types) { - if (type.isDmaapTopicDefined()) { - DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs); - topicConsumer.start(); - } - } - infoTypes = new InfoTypes(types); - kafkaConsumers.start(infoTypes); - return infoTypes; + return new InfoTypes(types); } @Bean diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java index fbeb9cbc..5f7521c3 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -38,17 +38,24 @@ public class Job { @Getter private BufferTimeout bufferTimeout; + private int maxConcurrency; + public Parameters() {} - public Parameters(String filter, BufferTimeout bufferTimeout) { + public Parameters(String filter, BufferTimeout bufferTimeout, int maxConcurrency) { this.filter = filter; this.bufferTimeout = bufferTimeout; + this.maxConcurrency = maxConcurrency; + } + + public int getMaxConcurrency() { + return maxConcurrency == 0 ? 1 : maxConcurrency; } } @Gson.TypeAdapters public static class BufferTimeout { - public BufferTimeout(int maxSize, int maxTimeMiliseconds) { + public BufferTimeout(int maxSize, long maxTimeMiliseconds) { this.maxSize = maxSize; this.maxTimeMiliseconds = maxTimeMiliseconds; } @@ -58,7 +65,7 @@ public class Job { @Getter private int maxSize; - private int maxTimeMiliseconds; + private long maxTimeMiliseconds; public Duration getMaxTime() { return Duration.ofMillis(maxTimeMiliseconds); diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index e3bc61e8..0e7743d4 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -20,8 +20,10 @@ package org.oran.dmaapadapter.repository; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Vector; @@ -30,7 +32,6 @@ import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.repository.Job.Parameters; -import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -38,16 +39,20 @@ import org.springframework.stereotype.Component; @Component public class Jobs { + public interface Observer { + void onJobbAdded(Job job); + + void onJobRemoved(Job job); + } + private static final Logger logger = LoggerFactory.getLogger(Jobs.class); private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); - private final KafkaTopicConsumers kafkaConsumers; private final AsyncRestClientFactory restclientFactory; + private final List observers = new ArrayList<>(); - public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) { - this.kafkaConsumers = kafkaConsumers; - + public Jobs(@Autowired ApplicationConfig applicationConfig) { restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); } @@ -70,13 +75,21 @@ public class Jobs { : restclientFactory.createRestClientNoHttpProxy(callbackUrl); Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient); this.put(job); + synchronized (observers) { + this.observers.forEach(obs -> obs.onJobbAdded(job)); + } + } + + public void addObserver(Observer obs) { + synchronized (observers) { + this.observers.add(obs); + } } private synchronized void put(Job job) { logger.debug("Put job: {}", job.getId()); allJobs.put(job.getId(), job); jobsByType.put(job.getType().getId(), job.getId(), job); - kafkaConsumers.addJob(job); } public synchronized Iterable getAll() { @@ -91,10 +104,14 @@ public class Jobs { return job; } - public synchronized void remove(Job job) { - this.allJobs.remove(job.getId()); - jobsByType.remove(job.getType().getId(), job.getId()); - kafkaConsumers.removeJob(job); + public void remove(Job job) { + synchronized (this) { + this.allJobs.remove(job.getId()); + jobsByType.remove(job.getType().getId(), job.getId()); + } + synchronized (observers) { + this.observers.forEach(obs -> obs.onJobRemoved(job)); + } } public synchronized int size() { diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index 55a02abf..507d9b6b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -38,7 +38,6 @@ import reactor.core.publisher.Mono; * The class fetches incoming requests from DMAAP and sends them further to the * consumers that has a job for this InformationType. */ - public class DmaapTopicConsumer { private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class); diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java new file mode 100644 index 00000000..9447c3ab --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java @@ -0,0 +1,43 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.repository.InfoType; +import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Jobs; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class DmaapTopicConsumers { + + DmaapTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) { + // Start a consumer for each type + for (InfoType type : types.getAll()) { + if (type.isDmaapTopicDefined()) { + DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs); + topicConsumer.start(); + } + } + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java new file mode 100644 index 00000000..d240129e --- /dev/null +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -0,0 +1,113 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import org.oran.dmaapadapter.repository.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.reactive.function.client.WebClientResponseException; + +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks.Many; + +/** + * The class streams data from a multi cast sink and sends the data to the Job + * owner via REST calls. + */ +@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally + +public class KafkaJobDataConsumer { + private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class); + private final Many input; + private final Job job; + private Disposable subscription; + private int errorCounter = 0; + + KafkaJobDataConsumer(Many input, Job job) { + this.input = input; + this.job = job; + } + + public synchronized void start() { + stop(); + this.subscription = getMessagesFromKafka(job) // + .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data)) + .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) // + .onErrorResume(this::handleError) // + .subscribe(this::handleConsumerSentOk, // + this::handleErrorInStream, // + () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(), + job.getType().getId())); + } + + public synchronized void stop() { + if (this.subscription != null) { + subscription.dispose(); + subscription = null; + } + } + + public synchronized boolean isRunning() { + return this.subscription != null; + } + + private Flux getMessagesFromKafka(Job job) { + Flux result = input.asFlux() // + .filter(job::isFilterMatch); + + if (job.isBuffered()) { + result = result.bufferTimeout( // + job.getParameters().getBufferTimeout().getMaxSize(), // + job.getParameters().getBufferTimeout().getMaxTime()) // + .map(Object::toString); + } + return result; + } + + private Mono handleError(Throwable t) { + logger.warn("exception: {} job: {}", t.getMessage(), job); + + final int STOP_AFTER_ERRORS = 5; + if (t instanceof WebClientResponseException) { + if (++this.errorCounter > STOP_AFTER_ERRORS) { + logger.error("Stopping job {}", job); + return Mono.error(t); + } else { + return Mono.empty(); // Discard + } + } else { + // This can happen if there is an overflow. + return Mono.empty(); + } + } + + private void handleConsumerSentOk(String data) { + this.errorCounter = 0; + } + + private void handleErrorInStream(Throwable t) { + logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage()); + this.subscription = null; + } + +} diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 23d9da2c..785f98bf 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -23,56 +23,80 @@ package org.oran.dmaapadapter.tasks; import java.util.HashMap; import java.util.Map; +import lombok.Getter; + import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; +import org.oran.dmaapadapter.repository.Jobs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import reactor.core.Disposable; -/** - * The class fetches incoming requests from DMAAP and sends them further to the - * consumers that has a job for this InformationType. - */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally @Component +@EnableScheduling public class KafkaTopicConsumers { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class); - private final Map topicConsumers = new HashMap<>(); - private final Map activeSubscriptions = new HashMap<>(); - private final ApplicationConfig appConfig; + private final Map topicListeners = new HashMap<>(); + @Getter + private final Map activeSubscriptions = new HashMap<>(); - public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; - } + private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; + + public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, + @Autowired Jobs jobs) { - public void start(InfoTypes types) { for (InfoType type : types.getAll()) { if (type.isKafkaTopicDefined()) { - KafkaTopicConsumer topicConsumer = new KafkaTopicConsumer(appConfig, type); - topicConsumers.put(type.getId(), topicConsumer); + KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type); + topicListeners.put(type.getId(), topicConsumer); } } + + jobs.addObserver(new Jobs.Observer() { + @Override + public void onJobbAdded(Job job) { + addJob(job); + } + + @Override + public void onJobRemoved(Job job) { + removeJob(job); + } + + }); } public synchronized void addJob(Job job) { if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) { logger.debug("Kafka job added {}", job.getId()); - KafkaTopicConsumer topicConsumer = topicConsumers.get(job.getType().getId()); - Disposable subscription = topicConsumer.startDistributeToConsumer(job); + KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId()); + KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job); + subscription.start(); activeSubscriptions.put(job.getId(), subscription); } } public synchronized void removeJob(Job job) { - Disposable d = activeSubscriptions.remove(job.getId()); + KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId()); if (d != null) { logger.debug("Kafka job removed {}", job.getId()); - d.dispose(); + d.stop(); + } + } + + @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) + public synchronized void restartNonRunningTasks() { + for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) { + if (!consumer.isRunning()) { + consumer.start(); + } } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java similarity index 60% rename from dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java rename to dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 55900224..0452b88c 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -30,82 +30,56 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; -import org.oran.dmaapadapter.repository.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; /** - * The class fetches incoming requests from DMAAP and sends them further to the - * consumers that has a job for this InformationType. + * The class streams incoming requests from a Kafka topic and sends them further + * to a multi cast sink, which several other streams can connect to. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -public class KafkaTopicConsumer { - private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class); +public class KafkaTopicListener { + private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class); private final ApplicationConfig applicationConfig; private final InfoType type; - private final Many consumerDistributor; + private final Many output; - public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) { + public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { this.applicationConfig = applicationConfig; final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; - this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); + this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); this.type = type; startKafkaTopicReceiver(); } + public Many getOutput() { + return this.output; + } + private Disposable startKafkaTopicReceiver() { return KafkaReceiver.create(kafkaInputProperties()) // .receive() // .doOnNext(this::onReceivedData) // .subscribe(null, // - throwable -> logger.error("KafkaTopicReceiver error: {}", throwable.getMessage()), // + this::onReceivedError, // () -> logger.warn("KafkaTopicReceiver stopped")); } private void onReceivedData(ConsumerRecord input) { logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value()); - consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST); - } - - public Disposable startDistributeToConsumer(Job job) { - final int CONCURRENCY = 10; // Has to be 1 to guarantee correct order. - - return getMessagesFromKafka(job) // - .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data)) - .flatMap(body -> job.getConsumerRestClient().post("", body), CONCURRENCY) // - .onErrorResume(this::handleConsumerErrorResponse) // - .subscribe(null, // - throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("KafkaMessageConsumer stopped {}", job.getType().getId())); - } - - private Flux getMessagesFromKafka(Job job) { - if (job.isBuffered()) { - return consumerDistributor.asFlux() // - .filter(job::isFilterMatch) // - .bufferTimeout( // - job.getParameters().getBufferTimeout().getMaxSize(), // - job.getParameters().getBufferTimeout().getMaxTime()) // - .map(Object::toString); - } else { - return consumerDistributor.asFlux() // - .filter(job::isFilterMatch); - } + output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST); } - private Mono handleConsumerErrorResponse(Throwable t) { - logger.warn("error from CONSUMER {}", t.getMessage()); - return Mono.empty(); + private void onReceivedError(Throwable t) { + logger.error("KafkaTopicReceiver error: {}", t.getMessage()); } private ReceiverOptions kafkaInputProperties() { diff --git a/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json index 0ff7c80e..290b70ae 100644 --- a/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json +++ b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json @@ -5,6 +5,9 @@ "filter": { "type": "string" }, + "maxConcurrency": { + "type": "integer" + }, "bufferTimeout": { "type": "object", "properties": { @@ -21,6 +24,5 @@ ] } }, - "required": [ - ] -} + "required": [] +} \ No newline at end of file diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java index 4b6d9010..70e89d6b 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java @@ -56,6 +56,15 @@ public class ConsumerController { public TestResults() {} + public boolean hasReceived(String str) { + for (String received : receivedBodies) { + if (received.equals(str)) { + return true; + } + } + return false; + } + public void reset() { receivedBodies.clear(); } diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 5ee34524..a0db58a0 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -22,9 +22,11 @@ package org.oran.dmaapadapter; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.gson.JsonParser; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -47,6 +49,8 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer; +import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -90,6 +94,9 @@ class IntegrationWithKafka { @Autowired private EcsSimulatorController ecsSimulatorController; + @Autowired + private KafkaTopicConsumers kafkaTopicConsumers; + private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); @@ -174,8 +181,9 @@ class IntegrationWithKafka { return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); } - private Object jobParametersAsJsonObject(String filter, int maxTimeMiliseconds, int maxSize) { - Job.Parameters param = new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds)); + private Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) { + Job.Parameters param = + new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency); String str = gson.toJson(param); return jsonObject(str); } @@ -188,13 +196,14 @@ class IntegrationWithKafka { } } - private ConsumerJobInfo consumerJobInfo(String filter, int maxTimeMiliseconds, int maxSize) { + private ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) { try { InfoType type = this.types.getAll().iterator().next(); String typeId = type.getId(); String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - return new ConsumerJobInfo(typeId, jobParametersAsJsonObject(filter, maxTimeMiliseconds, maxSize), "owner", - targetUri, ""); + return new ConsumerJobInfo(typeId, + jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri, + ""); } catch (Exception e) { return null; } @@ -217,6 +226,23 @@ class IntegrationWithKafka { return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i); } + private void sendDataToStream(Flux> dataToSend) { + final KafkaSender sender = KafkaSender.create(senderOptions()); + + sender.send(dataToSend) // + .doOnError(e -> logger.error("Send failed", e)) // + .blockLast(); + + } + + private void verifiedReceivedByConsumer(String... strings) { + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(strings.length)); + for (String s : strings) { + assertTrue(consumer.hasReceived(s)); + } + } + @Test void kafkaIntegrationTest() throws InterruptedException { final String JOB_ID1 = "ID1"; @@ -226,31 +252,56 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); - // Create a job - this.ecsSimulatorController.addJob(consumerJobInfo(null, 10, 1000), JOB_ID1, restClient()); - this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", 0, 0), JOB_ID2, restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + // Create two jobs. One buffering and one with a filter + this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, + restClient()); + this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); - final KafkaSender sender = KafkaSender.create(senderOptions()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. + sendDataToStream(dataToSend); - sender.send(dataToSend) // - .doOnError(e -> logger.error("Send failed", e)) // - .doOnNext(senderResult -> logger.debug("Sent {}", senderResult)) // - .doOnError(t -> logger.error("Error {}", t)) // - .blockLast(); - - ConsumerController.TestResults consumer = this.consumerController.testResults; - await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2)); - assertThat(consumer.receivedBodies.get(0)).isEqualTo("Message_1"); - assertThat(consumer.receivedBodies.get(1)).isEqualTo("[Message_1, Message_2, Message_3]"); + verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]"); - // Delete the job + // Delete the jobs this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty()); + } + + @Test + void kafkaIOverflow() throws InterruptedException { + // This does not work. After an overflow, the kafka stream does not seem to work + // + final String JOB_ID1 = "ID1"; + final String JOB_ID2 = "ID2"; + + // Register producer, Register types + await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + + // Create two jobs. + this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID1, restClient()); + this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); + + var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. + sendDataToStream(dataToSend); // this will overflow + + KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next(); + await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); + this.consumerController.testResults.reset(); + + kafkaTopicConsumers.restartNonRunningTasks(); + + dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1 + sendDataToStream(dataToSend); + + verifiedReceivedByConsumer("Message__1", "Message__1"); } } -- 2.16.6