NONRTRIC - Implement DMaaP mediator producer service in Java 33/7033/2
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 5 Nov 2021 13:56:19 +0000 (14:56 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 10 Nov 2021 12:59:39 +0000 (13:59 +0100)
Fixed so that an information type can receive data from a Kafka stream.
This can also filter the data (using regexp matchning).
The received data can be buffered to minimize the number of REST calls to deliver the data to the consumer.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Ie3740898bd919908a7ec5753f7d6050c652cebe4

20 files changed:
dmaap-adaptor-java/README.md
dmaap-adaptor-java/config/application.yaml
dmaap-adaptor-java/pom.xml
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java [moved from dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java with 85% similarity]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java [new file with mode: 0644]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java [new file with mode: 0644]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json [new file with mode: 0644]
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java [new file with mode: 0644]
dmaap-adaptor-java/src/test/resources/test_application_configuration.json
dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json [new file with mode: 0644]

index 0378bc7..9b35fe5 100644 (file)
@@ -15,7 +15,7 @@ The file `config/application_configuration.json` contains the configuration of j
         [
           {
              "id":  "STD_Fault_Messages",
-             "dmaapTopicUrl":  events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages",
+             "dmaapTopicUrl":  events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0",
              "useHttpProxy": false
           }
         ]
index 5733ea7..6a2d68a 100644 (file)
@@ -51,6 +51,9 @@ app:
   # configuration from the Consul will override the file.
   configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json
   dmaap-base-url: http://dradmin:dradmin@localhost:2222
-  # The url used to adress this component. This is used as a callback url sent to other components. 
+  # The url used to adress this component. This is used as a callback url sent to other components.
   dmaap-adapter-base-url: https://localhost:8435
+  # KAFKA boostrap server. This is only needed if there are Information Types that uses a kafkaInputTopic
+  kafka:
+    bootstrap-servers: localhost:9092
 
index 1fbd83c..411b27c 100644 (file)
             <artifactId>mockwebserver</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor.kafka</groupId>
+            <artifactId>reactor-kafka</artifactId>
+            <version>1.3.7</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>31.0.1-jre</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
index c9ba93f..faf5742 100644 (file)
@@ -27,7 +27,8 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DmaapMessageConsumer;
+import org.oran.dmaapadapter.tasks.DmaapTopicConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
@@ -37,6 +38,7 @@ import org.springframework.context.annotation.Configuration;
 
 @Configuration
 public class BeanFactory {
+    private InfoTypes infoTypes;
 
     @Value("${server.http-port}")
     private int httpPort = 0;
@@ -47,16 +49,24 @@ public class BeanFactory {
     }
 
     @Bean
-    public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) {
+    public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs,
+            @Autowired KafkaTopicConsumers kafkaConsumers) {
+        if (infoTypes != null) {
+            return infoTypes;
+        }
+
         Collection<InfoType> types = appConfig.getTypes();
 
         // Start a consumer for each type
         for (InfoType type : types) {
-            DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs);
-            topicConsumer.start();
+            if (type.isDmaapTopicDefined()) {
+                DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs);
+                topicConsumer.start();
+            }
         }
-
-        return new InfoTypes(types);
+        infoTypes = new InfoTypes(types);
+        kafkaConsumers.start(infoTypes);
+        return infoTypes;
     }
 
     @Bean
index e26fd46..f17a9c0 100644 (file)
@@ -88,6 +88,10 @@ public class ApplicationConfig {
     @Value("${app.dmaap-base-url}")
     private String dmaapBaseUrl;
 
+    @Getter
+    @Value("${app.kafka.bootstrap-servers:}")
+    private String kafkaBootStrapServers;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
index ca7c96c..e4dca5b 100644 (file)
@@ -85,7 +85,7 @@ public class ProducerCallbacksController {
 
             logger.info("Job started callback {}", request.id);
             Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner,
-                    request.lastUpdated);
+                    request.lastUpdated, toJobParameters(request.jobData));
             this.jobs.put(job);
             return new ResponseEntity<>(HttpStatus.OK);
         } catch (Exception e) {
@@ -93,6 +93,11 @@ public class ProducerCallbacksController {
         }
     }
 
+    private Job.Parameters toJobParameters(Object jobData) {
+        String json = gson.toJson(jobData);
+        return gson.fromJson(json, Job.Parameters.class);
+    }
+
     @GetMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE)
     @Operation(summary = "Get all jobs", description = "Returns all info jobs, can be used for trouble shooting")
     @ApiResponse(responseCode = "200", //
index 9dda1e6..27b527d 100644 (file)
@@ -22,6 +22,8 @@ package org.oran.dmaapadapter.repository;
 
 import lombok.Getter;
 
+import org.springframework.util.StringUtils;
+
 public class InfoType {
 
     @Getter
@@ -33,10 +35,22 @@ public class InfoType {
     @Getter
     private final boolean useHttpProxy;
 
-    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy) {
+    @Getter
+    private final String kafkaInputTopic;
+
+    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic) {
         this.id = id;
         this.dmaapTopicUrl = dmaapTopicUrl;
         this.useHttpProxy = useHttpProxy;
+        this.kafkaInputTopic = kafkaInputTopic;
+    }
+
+    public boolean isKafkaTopicDefined() {
+        return StringUtils.hasLength(kafkaInputTopic);
+    }
+
+    public boolean isDmaapTopicDefined() {
+        return StringUtils.hasLength(dmaapTopicUrl);
     }
 
 }
index b8677a3..558fc46 100644 (file)
@@ -35,7 +35,6 @@ public class InfoTypes {
     private Map<String, InfoType> allTypes = new HashMap<>();
 
     public InfoTypes(Collection<InfoType> types) {
-
         for (InfoType type : types) {
             put(type);
         }
index 0da94a6..d1697e9 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import lombok.Getter;
 
+import org.immutables.gson.Gson;
+
 public class Job {
 
+    @Gson.TypeAdapters
+    public static class Parameters {
+        public String filter;
+        public BufferTimeout bufferTimeout;
+
+        public Parameters() {
+        }
+
+        public Parameters(String filter, BufferTimeout bufferTimeout) {
+            this.filter = filter;
+            this.bufferTimeout = bufferTimeout;
+        }
+
+        public static class BufferTimeout {
+            public BufferTimeout(int maxSize, int maxTimeMiliseconds) {
+                this.maxSize = maxSize;
+                this.maxTimeMiliseconds = maxTimeMiliseconds;
+            }
+
+            public BufferTimeout() {
+            }
+
+            public int maxSize;
+            public int maxTimeMiliseconds;
+        }
+    }
+
     @Getter
     private final String id;
 
@@ -36,15 +68,39 @@ public class Job {
     @Getter
     private final String owner;
 
+    @Getter
+    private final Parameters parameters;
+
     @Getter
     private final String lastUpdated;
 
-    public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated) {
+    private final Pattern jobDataFilter;
+
+    public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters) {
         this.id = id;
         this.callbackUrl = callbackUrl;
         this.type = type;
         this.owner = owner;
         this.lastUpdated = lastUpdated;
+        this.parameters = parameters;
+        if (parameters != null && parameters.filter != null) {
+            jobDataFilter = Pattern.compile(parameters.filter);
+        } else {
+            jobDataFilter = null;
+        }
+    }
+
+    public boolean isFilterMatch(String data) {
+        if (jobDataFilter == null) {
+            return true;
+        }
+        Matcher matcher = jobDataFilter.matcher(data);
+        return matcher.find();
+    }
+
+    public boolean isBuffered() {
+        return parameters != null && parameters.bufferTimeout != null && parameters.bufferTimeout.maxSize > 0
+                && parameters.bufferTimeout.maxTimeMiliseconds > 0;
     }
 
 }
index 6e2b326..8a38824 100644 (file)
@@ -26,8 +26,10 @@ import java.util.Map;
 import java.util.Vector;
 
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -36,8 +38,11 @@ public class Jobs {
 
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
+    private final KafkaTopicConsumers kafkaConsumers;
 
-    public Jobs() {}
+    public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) {
+        this.kafkaConsumers = kafkaConsumers;
+    }
 
     public synchronized Job getJob(String id) throws ServiceException {
         Job job = allJobs.get(id);
@@ -52,9 +57,10 @@ public class Jobs {
     }
 
     public synchronized void put(Job job) {
-        logger.debug("Put service: {}", job.getId());
+        logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
         jobsByType.put(job.getType().getId(), job.getId(), job);
+        kafkaConsumers.addJob(job);
     }
 
     public synchronized Iterable<Job> getAll() {
@@ -72,6 +78,7 @@ public class Jobs {
     public synchronized void remove(Job job) {
         this.allJobs.remove(job.getId());
         jobsByType.remove(job.getType().getId(), job.getId());
+        kafkaConsumers.removeJob(job);
     }
 
     public synchronized int size() {
@@ -39,15 +39,16 @@ import reactor.core.publisher.Mono;
  * consumers that has a job for this InformationType.
  */
 
-public class DmaapMessageConsumer {
+public class DmaapTopicConsumer {
     private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
-    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
-    private final ApplicationConfig applicationConfig;
+    private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
+
     private final AsyncRestClient dmaapRestClient;
-    private final AsyncRestClient consumerRestClient;
-    private final InfoType type;
-    private final Jobs jobs;
     private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
+    private final AsyncRestClient consumerRestClient;
+    protected final ApplicationConfig applicationConfig;
+    protected final InfoType type;
+    protected final Jobs jobs;
 
     /** Submits new elements until stopped */
     private static class InfiniteFlux {
@@ -80,10 +81,10 @@ public class DmaapMessageConsumer {
         }
     }
 
-    public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
-        this.applicationConfig = applicationConfig;
+    public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
         this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
+        this.applicationConfig = applicationConfig;
         this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
                 : restclientFactory.createRestClientNoHttpProxy("");
         this.type = type;
@@ -93,31 +94,24 @@ public class DmaapMessageConsumer {
     public void start() {
         infiniteSubmitter.start() //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
-                .flatMap(this::handleReceivedMessage, 5) //
+                .flatMap(this::pushDataToConsumers) //
                 .subscribe(//
-                        value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), //
+                        null, //
                         throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
-                        () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) //
-                );
+                        () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); //
+
     }
 
     private String getDmaapUrl() {
-
         return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
     }
 
     private Mono<String> handleDmaapErrorResponse(Throwable t) {
         logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl());
-        return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) //
-                .flatMap(notUsed -> Mono.empty());
-    }
-
-    private Mono<String> handleConsumerErrorResponse(Throwable t) {
-        logger.warn("error from CONSUMER {}", t.getMessage());
-        return Mono.empty();
+        return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(notUsed -> Mono.empty());
     }
 
-    protected Mono<String> getFromMessageRouter(String topicUrl) {
+    private Mono<String> getFromMessageRouter(String topicUrl) {
         logger.trace("getFromMessageRouter {}", topicUrl);
         return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
@@ -125,9 +119,14 @@ public class DmaapMessageConsumer {
                 .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
-    protected Flux<String> handleReceivedMessage(String body) {
-        logger.debug("Received from DMAAP {}", body);
-        final int CONCURRENCY = 5;
+    private Mono<String> handleConsumerErrorResponse(Throwable t) {
+        logger.warn("error from CONSUMER {}", t.getMessage());
+        return Mono.empty();
+    }
+
+    protected Flux<String> pushDataToConsumers(String body) {
+        logger.debug("Received data {}", body);
+        final int CONCURRENCY = 50;
 
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
@@ -135,5 +134,4 @@ public class DmaapMessageConsumer {
                 .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }
-
 }
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java
new file mode 100644 (file)
index 0000000..6079edf
--- /dev/null
@@ -0,0 +1,130 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+
+/**
+ * The class fetches incoming requests from DMAAP and sends them further to the
+ * consumers that has a job for this InformationType.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class KafkaTopicConsumer {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
+    private final AsyncRestClient consumerRestClient;
+    private final ApplicationConfig applicationConfig;
+    private final InfoType type;
+    private final Many<String> consumerDistributor;
+
+    public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
+        this.applicationConfig = applicationConfig;
+
+        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10;
+        this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+
+        AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+        this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
+                : restclientFactory.createRestClientNoHttpProxy("");
+        this.type = type;
+        startKafkaTopicReceiver();
+    }
+
+    private Disposable startKafkaTopicReceiver() {
+        return KafkaReceiver.create(kafkaInputProperties()) //
+                .receive() //
+                .flatMap(this::onReceivedData) //
+                .subscribe(null, //
+                        throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
+                        () -> logger.warn("KafkaMessageConsumer stopped"));
+    }
+
+    private Flux<String> onReceivedData(ConsumerRecord<Integer, String> input) {
+        logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
+        consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
+        return consumerDistributor.asFlux();
+    }
+
+    public Disposable startDistributeToConsumer(Job job) {
+        return getMessagesFromKafka(job) //
+                .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
+                .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) //
+                .onErrorResume(this::handleConsumerErrorResponse) //
+                .subscribe(null, //
+                        throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
+                        () -> logger.warn("KafkaMessageConsumer stopped {}", job.getType().getId()));
+    }
+
+    private Flux<String> getMessagesFromKafka(Job job) {
+        if (job.isBuffered()) {
+            return consumerDistributor.asFlux() //
+                    .filter(job::isFilterMatch) //
+                    .bufferTimeout(job.getParameters().bufferTimeout.maxSize,
+                            Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) //
+                    .flatMap(o -> Flux.just(o.toString()));
+        } else {
+            return consumerDistributor.asFlux() //
+                    .filter(job::isFilterMatch);
+        }
+    }
+
+    private Mono<String> handleConsumerErrorResponse(Throwable t) {
+        logger.warn("error from CONSUMER {}", t.getMessage());
+        return Mono.empty();
+    }
+
+    private ReceiverOptions<Integer, String> kafkaInputProperties() {
+        Map<String, Object> consumerProps = new HashMap<>();
+        if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
+            logger.error("No kafka boostrap server is setup");
+        }
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        return ReceiverOptions.<Integer, String>create(consumerProps)
+                .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
+    }
+
+}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
new file mode 100644 (file)
index 0000000..23d9da2
--- /dev/null
@@ -0,0 +1,79 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+
+/**
+ * The class fetches incoming requests from DMAAP and sends them further to the
+ * consumers that has a job for this InformationType.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+@Component
+public class KafkaTopicConsumers {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
+
+    private final Map<String, KafkaTopicConsumer> topicConsumers = new HashMap<>();
+    private final Map<String, Disposable> activeSubscriptions = new HashMap<>();
+    private final ApplicationConfig appConfig;
+
+    public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig) {
+        this.appConfig = appConfig;
+    }
+
+    public void start(InfoTypes types) {
+        for (InfoType type : types.getAll()) {
+            if (type.isKafkaTopicDefined()) {
+                KafkaTopicConsumer topicConsumer = new KafkaTopicConsumer(appConfig, type);
+                topicConsumers.put(type.getId(), topicConsumer);
+            }
+        }
+    }
+
+    public synchronized void addJob(Job job) {
+        if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+            logger.debug("Kafka job added {}", job.getId());
+            KafkaTopicConsumer topicConsumer = topicConsumers.get(job.getType().getId());
+            Disposable subscription = topicConsumer.startDistributeToConsumer(job);
+            activeSubscriptions.put(job.getId(), subscription);
+        }
+    }
+
+    public synchronized void removeJob(Job job) {
+        Disposable d = activeSubscriptions.remove(job.getId());
+        if (d != null) {
+            logger.debug("Kafka job removed {}", job.getId());
+            d.dispose();
+        }
+    }
+
+}
index 4a68ab0..e8b236c 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import com.google.common.io.CharStreams;
 import com.google.gson.JsonParser;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
 import lombok.Getter;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
 import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -111,12 +118,12 @@ public class ProducerRegstrationTask {
 
     private Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 20;
-        final String producerUrl =
-                applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+        final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
+                + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
-                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
                         CONCURRENCY) //
                 .collectList() //
                 .doOnNext(type -> logger.info("Registering producer")) //
@@ -127,18 +134,39 @@ public class ProducerRegstrationTask {
         return jsonObject("{}");
     }
 
-    private ProducerInfoTypeInfo typeRegistrationInfo() {
-        return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+    private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+        try {
+            return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+        } catch (Exception e) {
+            logger.error("Fatal error {}", e.getMessage());
+            return null;
+        }
+    }
+
+    private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+
+        if (type.isKafkaTopicDefined()) {
+            String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json");
+            return jsonObject(schemaStrKafka);
+        } else {
+            // An object with no properties
+            String schemaStr = "{" //
+                    + "\"type\": \"object\"," //
+                    + "\"properties\": {}," //
+                    + "\"additionalProperties\": false" //
+                    + "}"; //
+
+            return jsonObject(schemaStr);
+        }
     }
 
-    private Object jsonSchemaObject() {
-        // An object with no properties
-        String schemaStr = "{" //
-                + "\"type\": \"object\"," //
-                + "\"properties\": {}," //
-                + "\"additionalProperties\": false" //
-                + "}"; //
-        return jsonObject(schemaStr);
+    private String readSchemaFile(String filePath) throws IOException, ServiceException {
+        InputStream in = getClass().getResourceAsStream(filePath);
+        logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+        if (in == null) {
+            throw new ServiceException("Could not readfile: " + filePath);
+        }
+        return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
     }
 
     private Object jsonObject(String json) {
diff --git a/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json
new file mode 100644 (file)
index 0000000..0ff7c80
--- /dev/null
@@ -0,0 +1,26 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+      "type": "string"
+    },
+    "bufferTimeout": {
+      "type": "object",
+      "properties": {
+        "maxSize": {
+          "type": "integer"
+        },
+        "maxTimeMiliseconds": {
+          "type": "integer"
+        }
+      },
+      "required": [
+        "maxSize",
+        "maxTimeMiliseconds"
+      ]
+    }
+  },
+  "required": [
+  ]
+}
index 828b027..8d1dda6 100644 (file)
@@ -79,7 +79,6 @@ public class EcsSimulatorController {
         } else {
             return new ResponseEntity<>(HttpStatus.NOT_FOUND);
         }
-
     }
 
     @PutMapping(path = API_ROOT + "/info-producers/{infoProducerId}", //
index 1cceef0..376d23e 100644 (file)
@@ -52,6 +52,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 
+@SuppressWarnings("java:S3577") // Rename class
 @ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 @TestPropertySource(properties = { //
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
new file mode 100644 (file)
index 0000000..31ef970
--- /dev/null
@@ -0,0 +1,257 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import com.google.gson.JsonParser;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
+import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
+import org.oran.dmaapadapter.configuration.WebClientConfig;
+import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.r1.ConsumerJobInfo;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import reactor.core.publisher.Flux;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+@SuppressWarnings("java:S3577") // Rename class
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
+@TestPropertySource(properties = { //
+        "server.ssl.key-store=./config/keystore.jks", //
+        "app.webclient.trust-store=./config/truststore.jks", //
+        "app.configuration-filepath=./src/test/resources/test_application_configuration_kafka.json"//
+})
+class IntegrationWithKafka {
+
+    @Autowired
+    private ApplicationConfig applicationConfig;
+
+    @Autowired
+    private Jobs jobs;
+
+    @Autowired
+    private InfoTypes types;
+
+    @Autowired
+    private ConsumerController consumerController;
+
+    @Autowired
+    private EcsSimulatorController ecsSimulatorController;
+
+    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+
+    private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+
+    @LocalServerPort
+    int localServerHttpPort;
+
+    static class TestApplicationConfig extends ApplicationConfig {
+        @Override
+        public String getEcsBaseUrl() {
+            return thisProcessUrl();
+        }
+
+        @Override
+        public String getDmaapBaseUrl() {
+            return thisProcessUrl();
+        }
+
+        @Override
+        public String getSelfUrl() {
+            return thisProcessUrl();
+        }
+
+        private String thisProcessUrl() {
+            final String url = "https://localhost:" + getLocalServerHttpPort();
+            return url;
+        }
+    }
+
+    /**
+     * Overrides the BeanFactory.
+     */
+    @TestConfiguration
+    static class TestBeanFactory extends BeanFactory {
+
+        @Override
+        @Bean
+        public ServletWebServerFactory servletContainer() {
+            return new TomcatServletWebServerFactory();
+        }
+
+        @Override
+        @Bean
+        public ApplicationConfig getApplicationConfig() {
+            TestApplicationConfig cfg = new TestApplicationConfig();
+            return cfg;
+        }
+    }
+
+    @AfterEach
+    void reset() {
+        this.consumerController.testResults.reset();
+        this.ecsSimulatorController.testResults.reset();
+        this.jobs.clear();
+    }
+
+    private AsyncRestClient restClient(boolean useTrustValidation) {
+        WebClientConfig config = this.applicationConfig.getWebClientConfig();
+        HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
+                .httpProxyHost("") //
+                .httpProxyPort(0) //
+                .build();
+        config = ImmutableWebClientConfig.builder() //
+                .keyStoreType(config.keyStoreType()) //
+                .keyStorePassword(config.keyStorePassword()) //
+                .keyStore(config.keyStore()) //
+                .keyPassword(config.keyPassword()) //
+                .isTrustStoreUsed(useTrustValidation) //
+                .trustStore(config.trustStore()) //
+                .trustStorePassword(config.trustStorePassword()) //
+                .httpProxyConfig(httpProxyConfig).build();
+
+        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+        return restClientFactory.createRestClientNoHttpProxy(baseUrl());
+    }
+
+    private AsyncRestClient restClient() {
+        return restClient(false);
+    }
+
+    private String baseUrl() {
+        return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
+    }
+
+    private Object jobParametersAsJsonObject(String filter, int maxTimeMiliseconds, int maxSize) {
+        Job.Parameters param = new Job.Parameters(filter,
+                new Job.Parameters.BufferTimeout(maxSize, maxTimeMiliseconds));
+        String str = gson.toJson(param);
+        return jsonObject(str);
+    }
+
+    private Object jsonObject(String json) {
+        try {
+            return JsonParser.parseString(json).getAsJsonObject();
+        } catch (Exception e) {
+            throw new NullPointerException(e.toString());
+        }
+    }
+
+    private ConsumerJobInfo consumerJobInfo(String filter, int maxTimeMiliseconds, int maxSize) {
+        try {
+            InfoType type = this.types.getAll().iterator().next();
+            String typeId = type.getId();
+            String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+            return new ConsumerJobInfo(typeId, jobParametersAsJsonObject(filter, maxTimeMiliseconds, maxSize), "owner",
+                    targetUri, "");
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private SenderOptions<Integer, String> senderOptions() {
+        String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
+
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return SenderOptions.create(props);
+    }
+
+    private SenderRecord<Integer, String, Integer> senderRecord(String data, int i) {
+        final InfoType infoType = this.types.getAll().iterator().next();
+        return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i);
+    }
+
+    @Test
+    void kafkaIntegrationTest() throws InterruptedException {
+        final String JOB_ID1 = "ID1";
+        final String JOB_ID2 = "ID2";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+        // Create a job
+        this.ecsSimulatorController.addJob(consumerJobInfo(".*", 10, 1000), JOB_ID1, restClient());
+        this.ecsSimulatorController.addJob(consumerJobInfo(".*Message_1.*", 0, 0), JOB_ID2, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+        final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
+
+        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+
+        sender.send(dataToSend) //
+                .doOnError(e -> logger.error("Send failed", e)) //
+                .doOnNext(senderResult -> logger.debug("Sent {}", senderResult)) //
+                .doOnError(t -> logger.error("Error {}", t)) //
+                .blockLast();
+
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2));
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("Message_1");
+        assertThat(consumer.receivedBodies.get(1)).isEqualTo("[Message_1, Message_2, Message_3]");
+
+        // Delete the job
+        this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
+        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
+
+}
index 8d211b8..794eb8e 100644 (file)
@@ -3,7 +3,7 @@
       {
          "id": "ExampleInformationType",
          "dmaapTopicUrl": "/dmaap-topic-1",
-         "useHttpProxy": true
+         "useHttpProxy": false
       }
    ]
 }
\ No newline at end of file
diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json
new file mode 100644 (file)
index 0000000..e2ea525
--- /dev/null
@@ -0,0 +1,9 @@
+{
+   "types": [
+      {
+         "id": "ExampleInformationType",
+         "kafkaInputTopic": "TutorialTopic",
+         "useHttpProxy": false
+      }
+   ]
+}
\ No newline at end of file