Merge "Improve concurrency of message sending"
authorHenrik Andersson <henrik.b.andersson@est.tech>
Wed, 17 Nov 2021 10:17:09 +0000 (10:17 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Wed, 17 Nov 2021 10:17:09 +0000 (10:17 +0000)
31 files changed:
dmaap-adaptor-java/README.md
dmaap-adaptor-java/config/application.yaml
dmaap-adaptor-java/pom.xml
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java [moved from dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java with 80% similarity]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java [new file with mode: 0644]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java [new file with mode: 0644]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java [new file with mode: 0644]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java [new file with mode: 0644]
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json [new file with mode: 0644]
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java [new file with mode: 0644]
dmaap-adaptor-java/src/test/resources/test_application_configuration.json
dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json [new file with mode: 0644]
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/AsyncRestClient.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/a1e/A1eController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java

index 0378bc7..9b35fe5 100644 (file)
@@ -15,7 +15,7 @@ The file `config/application_configuration.json` contains the configuration of j
         [
           {
              "id":  "STD_Fault_Messages",
-             "dmaapTopicUrl":  events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages",
+             "dmaapTopicUrl":  events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0",
              "useHttpProxy": false
           }
         ]
index 5733ea7..6a2d68a 100644 (file)
@@ -51,6 +51,9 @@ app:
   # configuration from the Consul will override the file.
   configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json
   dmaap-base-url: http://dradmin:dradmin@localhost:2222
-  # The url used to adress this component. This is used as a callback url sent to other components. 
+  # The url used to adress this component. This is used as a callback url sent to other components.
   dmaap-adapter-base-url: https://localhost:8435
+  # KAFKA boostrap server. This is only needed if there are Information Types that uses a kafkaInputTopic
+  kafka:
+    bootstrap-servers: localhost:9092
 
index 1fbd83c..411b27c 100644 (file)
             <artifactId>mockwebserver</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor.kafka</groupId>
+            <artifactId>reactor-kafka</artifactId>
+            <version>1.3.7</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>31.0.1-jre</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
index c9ba93f..d98a8c3 100644 (file)
@@ -26,8 +26,6 @@ import org.apache.catalina.connector.Connector;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
-import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DmaapMessageConsumer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
@@ -47,15 +45,8 @@ public class BeanFactory {
     }
 
     @Bean
-    public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) {
+    public InfoTypes types(@Autowired ApplicationConfig appConfig) {
         Collection<InfoType> types = appConfig.getTypes();
-
-        // Start a consumer for each type
-        for (InfoType type : types) {
-            DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs);
-            topicConsumer.start();
-        }
-
         return new InfoTypes(types);
     }
 
index ec1541c..6939026 100644 (file)
@@ -67,96 +67,85 @@ public class AsyncRestClient {
         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.post() //
-                            .uri(uri) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .body(bodyProducer, String.class);
-                    return retrieve(traceTag, request);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .post() //
+                .uri(uri) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .body(bodyProducer, String.class);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> post(String uri, @Nullable String body) {
         return postForEntity(uri, body) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.post() //
-                            .uri(uri) //
-                            .headers(headers -> headers.setBasicAuth(username, password)) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .bodyValue(body);
-                    return retrieve(traceTag, request) //
-                            .flatMap(this::toBody);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .post() //
+                .uri(uri) //
+                .headers(headers -> headers.setBasicAuth(username, password)) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .bodyValue(body);
+        return retrieve(traceTag, request) //
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: {}", traceTag, body);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.put() //
-                            .uri(uri) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .bodyValue(body);
-                    return retrieve(traceTag, request);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .put() //
+                .uri(uri) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .bodyValue(body);
+        return retrieve(traceTag, request);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: <empty>", traceTag);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.put() //
-                            .uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient() //
+                .put() //
+                .uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> put(String uri, String body) {
         return putForEntity(uri, body) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.get().uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> get(String uri) {
         return getForEntity(uri) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.delete().uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> delete(String uri) {
         return deleteForEntity(uri) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
@@ -185,11 +174,11 @@ public class AsyncRestClient {
         }
     }
 
-    private Mono<String> toBody(ResponseEntity<String> entity) {
+    private String toBody(ResponseEntity<String> entity) {
         if (entity.getBody() == null) {
-            return Mono.just("");
+            return "";
         } else {
-            return Mono.just(entity.getBody());
+            return entity.getBody();
         }
     }
 
@@ -229,11 +218,11 @@ public class AsyncRestClient {
                 .build();
     }
 
-    private Mono<WebClient> getWebClient() {
+    private WebClient getWebClient() {
         if (this.webClient == null) {
             this.webClient = buildWebClient(baseUrl);
         }
-        return Mono.just(buildWebClient(baseUrl));
+        return this.webClient;
     }
 
 }
index e26fd46..f17a9c0 100644 (file)
@@ -88,6 +88,10 @@ public class ApplicationConfig {
     @Value("${app.dmaap-base-url}")
     private String dmaapBaseUrl;
 
+    @Getter
+    @Value("${app.kafka.bootstrap-servers:}")
+    private String kafkaBootStrapServers;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
index ca7c96c..07f5aa7 100644 (file)
@@ -82,17 +82,20 @@ public class ProducerCallbacksController {
             @RequestBody String body) {
         try {
             ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class);
-
-            logger.info("Job started callback {}", request.id);
-            Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner,
-                    request.lastUpdated);
-            this.jobs.put(job);
+            logger.debug("Job started callback {}", request.id);
+            this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner,
+                    request.lastUpdated, toJobParameters(request.jobData));
             return new ResponseEntity<>(HttpStatus.OK);
         } catch (Exception e) {
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
         }
     }
 
+    private Job.Parameters toJobParameters(Object jobData) {
+        String json = gson.toJson(jobData);
+        return gson.fromJson(json, Job.Parameters.class);
+    }
+
     @GetMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE)
     @Operation(summary = "Get all jobs", description = "Returns all info jobs, can be used for trouble shooting")
     @ApiResponse(responseCode = "200", //
@@ -118,7 +121,7 @@ public class ProducerCallbacksController {
     public ResponseEntity<Object> jobDeletedCallback( //
             @PathVariable("infoJobId") String infoJobId) {
 
-        logger.info("Job deleted callback {}", infoJobId);
+        logger.debug("Job deleted callback {}", infoJobId);
         this.jobs.remove(infoJobId);
         return new ResponseEntity<>(HttpStatus.OK);
     }
index 9dda1e6..27b527d 100644 (file)
@@ -22,6 +22,8 @@ package org.oran.dmaapadapter.repository;
 
 import lombok.Getter;
 
+import org.springframework.util.StringUtils;
+
 public class InfoType {
 
     @Getter
@@ -33,10 +35,22 @@ public class InfoType {
     @Getter
     private final boolean useHttpProxy;
 
-    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy) {
+    @Getter
+    private final String kafkaInputTopic;
+
+    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic) {
         this.id = id;
         this.dmaapTopicUrl = dmaapTopicUrl;
         this.useHttpProxy = useHttpProxy;
+        this.kafkaInputTopic = kafkaInputTopic;
+    }
+
+    public boolean isKafkaTopicDefined() {
+        return StringUtils.hasLength(kafkaInputTopic);
+    }
+
+    public boolean isDmaapTopicDefined() {
+        return StringUtils.hasLength(dmaapTopicUrl);
     }
 
 }
index b8677a3..558fc46 100644 (file)
@@ -35,7 +35,6 @@ public class InfoTypes {
     private Map<String, InfoType> allTypes = new HashMap<>();
 
     public InfoTypes(Collection<InfoType> types) {
-
         for (InfoType type : types) {
             put(type);
         }
index 0da94a6..5f7521c 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import java.time.Duration;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import lombok.Getter;
 
+import org.immutables.gson.Gson;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+
 public class Job {
 
+    @Gson.TypeAdapters
+    public static class Parameters {
+        @Getter
+        private String filter;
+        @Getter
+        private BufferTimeout bufferTimeout;
+
+        private int maxConcurrency;
+
+        public Parameters() {}
+
+        public Parameters(String filter, BufferTimeout bufferTimeout, int maxConcurrency) {
+            this.filter = filter;
+            this.bufferTimeout = bufferTimeout;
+            this.maxConcurrency = maxConcurrency;
+        }
+
+        public int getMaxConcurrency() {
+            return maxConcurrency == 0 ? 1 : maxConcurrency;
+        }
+    }
+
+    @Gson.TypeAdapters
+    public static class BufferTimeout {
+        public BufferTimeout(int maxSize, long maxTimeMiliseconds) {
+            this.maxSize = maxSize;
+            this.maxTimeMiliseconds = maxTimeMiliseconds;
+        }
+
+        public BufferTimeout() {}
+
+        @Getter
+        private int maxSize;
+
+        private long maxTimeMiliseconds;
+
+        public Duration getMaxTime() {
+            return Duration.ofMillis(maxTimeMiliseconds);
+        }
+    }
+
     @Getter
     private final String id;
 
@@ -36,15 +84,44 @@ public class Job {
     @Getter
     private final String owner;
 
+    @Getter
+    private final Parameters parameters;
+
     @Getter
     private final String lastUpdated;
 
-    public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated) {
+    private final Pattern jobDataFilter;
+
+    @Getter
+    private final AsyncRestClient consumerRestClient;
+
+    public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters,
+            AsyncRestClient consumerRestClient) {
         this.id = id;
         this.callbackUrl = callbackUrl;
         this.type = type;
         this.owner = owner;
         this.lastUpdated = lastUpdated;
+        this.parameters = parameters;
+        if (parameters != null && parameters.filter != null) {
+            jobDataFilter = Pattern.compile(parameters.filter);
+        } else {
+            jobDataFilter = null;
+        }
+        this.consumerRestClient = consumerRestClient;
+    }
+
+    public boolean isFilterMatch(String data) {
+        if (jobDataFilter == null) {
+            return true;
+        }
+        Matcher matcher = jobDataFilter.matcher(data);
+        return matcher.find();
+    }
+
+    public boolean isBuffered() {
+        return parameters != null && parameters.bufferTimeout != null && parameters.bufferTimeout.maxSize > 0
+                && parameters.bufferTimeout.maxTimeMiliseconds > 0;
     }
 
 }
index 6e2b326..0e7743d 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.repository.Job.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
 public class Jobs {
+    public interface Observer {
+        void onJobbAdded(Job job);
+
+        void onJobRemoved(Job job);
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
 
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
+    private final AsyncRestClientFactory restclientFactory;
+    private final List<Observer> observers = new ArrayList<>();
 
-    public Jobs() {}
+    public Jobs(@Autowired ApplicationConfig applicationConfig) {
+        restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+    }
 
     public synchronized Job getJob(String id) throws ServiceException {
         Job job = allJobs.get(id);
@@ -51,8 +68,26 @@ public class Jobs {
         return allJobs.get(id);
     }
 
-    public synchronized void put(Job job) {
-        logger.debug("Put service: {}", job.getId());
+    public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
+            Parameters parameters) {
+        AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
+                ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
+                : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
+        Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+        this.put(job);
+        synchronized (observers) {
+            this.observers.forEach(obs -> obs.onJobbAdded(job));
+        }
+    }
+
+    public void addObserver(Observer obs) {
+        synchronized (observers) {
+            this.observers.add(obs);
+        }
+    }
+
+    private synchronized void put(Job job) {
+        logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
         jobsByType.put(job.getType().getId(), job.getId(), job);
     }
@@ -69,9 +104,14 @@ public class Jobs {
         return job;
     }
 
-    public synchronized void remove(Job job) {
-        this.allJobs.remove(job.getId());
-        jobsByType.remove(job.getType().getId(), job.getId());
+    public void remove(Job job) {
+        synchronized (this) {
+            this.allJobs.remove(job.getId());
+            jobsByType.remove(job.getType().getId(), job.getId());
+        }
+        synchronized (observers) {
+            this.observers.forEach(obs -> obs.onJobRemoved(job));
+        }
     }
 
     public synchronized int size() {
@@ -38,16 +38,15 @@ import reactor.core.publisher.Mono;
  * The class fetches incoming requests from DMAAP and sends them further to the
  * consumers that has a job for this InformationType.
  */
-
-public class DmaapMessageConsumer {
+public class DmaapTopicConsumer {
     private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
-    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
-    private final ApplicationConfig applicationConfig;
+    private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
+
     private final AsyncRestClient dmaapRestClient;
-    private final AsyncRestClient consumerRestClient;
-    private final InfoType type;
-    private final Jobs jobs;
     private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
+    protected final ApplicationConfig applicationConfig;
+    protected final InfoType type;
+    protected final Jobs jobs;
 
     /** Submits new elements until stopped */
     private static class InfiniteFlux {
@@ -80,12 +79,10 @@ public class DmaapMessageConsumer {
         }
     }
 
-    public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
-        this.applicationConfig = applicationConfig;
+    public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
         this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
-        this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
-                : restclientFactory.createRestClientNoHttpProxy("");
+        this.applicationConfig = applicationConfig;
         this.type = type;
         this.jobs = jobs;
     }
@@ -93,16 +90,15 @@ public class DmaapMessageConsumer {
     public void start() {
         infiniteSubmitter.start() //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
-                .flatMap(this::handleReceivedMessage, 5) //
+                .flatMap(this::pushDataToConsumers) //
                 .subscribe(//
-                        value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), //
+                        null, //
                         throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
-                        () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) //
-                );
+                        () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); //
+
     }
 
     private String getDmaapUrl() {
-
         return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
     }
 
@@ -112,12 +108,7 @@ public class DmaapMessageConsumer {
                 .flatMap(notUsed -> Mono.empty());
     }
 
-    private Mono<String> handleConsumerErrorResponse(Throwable t) {
-        logger.warn("error from CONSUMER {}", t.getMessage());
-        return Mono.empty();
-    }
-
-    protected Mono<String> getFromMessageRouter(String topicUrl) {
+    private Mono<String> getFromMessageRouter(String topicUrl) {
         logger.trace("getFromMessageRouter {}", topicUrl);
         return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
@@ -125,15 +116,19 @@ public class DmaapMessageConsumer {
                 .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
-    protected Flux<String> handleReceivedMessage(String body) {
-        logger.debug("Received from DMAAP {}", body);
-        final int CONCURRENCY = 5;
+    private Mono<String> handleConsumerErrorResponse(Throwable t) {
+        logger.warn("error from CONSUMER {}", t.getMessage());
+        return Mono.empty();
+    }
+
+    protected Flux<String> pushDataToConsumers(String body) {
+        logger.debug("Received data {}", body);
+        final int CONCURRENCY = 50;
 
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
-                .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
-                .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
+                .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
+                .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }
-
 }
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumers.java
new file mode 100644 (file)
index 0000000..9447c3a
--- /dev/null
@@ -0,0 +1,43 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Jobs;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapTopicConsumers {
+
+    DmaapTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) {
+        // Start a consumer for each type
+        for (InfoType type : types.getAll()) {
+            if (type.isDmaapTopicDefined()) {
+                DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs);
+                topicConsumer.start();
+            }
+        }
+    }
+
+}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java
new file mode 100644 (file)
index 0000000..d240129
--- /dev/null
@@ -0,0 +1,113 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks.Many;
+
+/**
+ * The class streams data from a multi cast sink and sends the data to the Job
+ * owner via REST calls.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+
+public class KafkaJobDataConsumer {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
+    private final Many<String> input;
+    private final Job job;
+    private Disposable subscription;
+    private int errorCounter = 0;
+
+    KafkaJobDataConsumer(Many<String> input, Job job) {
+        this.input = input;
+        this.job = job;
+    }
+
+    public synchronized void start() {
+        stop();
+        this.subscription = getMessagesFromKafka(job) //
+                .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
+                .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
+                .onErrorResume(this::handleError) //
+                .subscribe(this::handleConsumerSentOk, //
+                        this::handleErrorInStream, //
+                        () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
+                                job.getType().getId()));
+    }
+
+    public synchronized void stop() {
+        if (this.subscription != null) {
+            subscription.dispose();
+            subscription = null;
+        }
+    }
+
+    public synchronized boolean isRunning() {
+        return this.subscription != null;
+    }
+
+    private Flux<String> getMessagesFromKafka(Job job) {
+        Flux<String> result = input.asFlux() //
+                .filter(job::isFilterMatch);
+
+        if (job.isBuffered()) {
+            result = result.bufferTimeout( //
+                    job.getParameters().getBufferTimeout().getMaxSize(), //
+                    job.getParameters().getBufferTimeout().getMaxTime()) //
+                    .map(Object::toString);
+        }
+        return result;
+    }
+
+    private Mono<String> handleError(Throwable t) {
+        logger.warn("exception: {} job: {}", t.getMessage(), job);
+
+        final int STOP_AFTER_ERRORS = 5;
+        if (t instanceof WebClientResponseException) {
+            if (++this.errorCounter > STOP_AFTER_ERRORS) {
+                logger.error("Stopping job {}", job);
+                return Mono.error(t);
+            } else {
+                return Mono.empty(); // Discard
+            }
+        } else {
+            // This can happen if there is an overflow.
+            return Mono.empty();
+        }
+    }
+
+    private void handleConsumerSentOk(String data) {
+        this.errorCounter = 0;
+    }
+
+    private void handleErrorInStream(Throwable t) {
+        logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
+        this.subscription = null;
+    }
+
+}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
new file mode 100644 (file)
index 0000000..785f98b
--- /dev/null
@@ -0,0 +1,103 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Getter;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+@Component
+@EnableScheduling
+public class KafkaTopicConsumers {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
+
+    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+    @Getter
+    private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
+
+    private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
+
+    public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types,
+            @Autowired Jobs jobs) {
+
+        for (InfoType type : types.getAll()) {
+            if (type.isKafkaTopicDefined()) {
+                KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
+                topicListeners.put(type.getId(), topicConsumer);
+            }
+        }
+
+        jobs.addObserver(new Jobs.Observer() {
+            @Override
+            public void onJobbAdded(Job job) {
+                addJob(job);
+            }
+
+            @Override
+            public void onJobRemoved(Job job) {
+                removeJob(job);
+            }
+
+        });
+    }
+
+    public synchronized void addJob(Job job) {
+        if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+            logger.debug("Kafka job added {}", job.getId());
+            KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
+            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
+            subscription.start();
+            activeSubscriptions.put(job.getId(), subscription);
+        }
+    }
+
+    public synchronized void removeJob(Job job) {
+        KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
+        if (d != null) {
+            logger.debug("Kafka job removed {}", job.getId());
+            d.stop();
+        }
+    }
+
+    @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
+    public synchronized void restartNonRunningTasks() {
+        for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+            if (!consumer.isRunning()) {
+                consumer.start();
+            }
+        }
+    }
+
+}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
new file mode 100644 (file)
index 0000000..0452b88
--- /dev/null
@@ -0,0 +1,99 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+
+/**
+ * The class streams incoming requests from a Kafka topic and sends them further
+ * to a multi cast sink, which several other streams can connect to.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class KafkaTopicListener {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
+    private final ApplicationConfig applicationConfig;
+    private final InfoType type;
+    private final Many<String> output;
+
+    public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
+        this.applicationConfig = applicationConfig;
+
+        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+
+        this.type = type;
+        startKafkaTopicReceiver();
+    }
+
+    public Many<String> getOutput() {
+        return this.output;
+    }
+
+    private Disposable startKafkaTopicReceiver() {
+        return KafkaReceiver.create(kafkaInputProperties()) //
+                .receive() //
+                .doOnNext(this::onReceivedData) //
+                .subscribe(null, //
+                        this::onReceivedError, //
+                        () -> logger.warn("KafkaTopicReceiver stopped"));
+    }
+
+    private void onReceivedData(ConsumerRecord<Integer, String> input) {
+        logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
+        output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
+    }
+
+    private void onReceivedError(Throwable t) {
+        logger.error("KafkaTopicReceiver error: {}", t.getMessage());
+    }
+
+    private ReceiverOptions<Integer, String> kafkaInputProperties() {
+        Map<String, Object> consumerProps = new HashMap<>();
+        if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
+            logger.error("No kafka boostrap server is setup");
+        }
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        return ReceiverOptions.<Integer, String>create(consumerProps)
+                .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
+    }
+
+}
index 4a68ab0..7b719e3 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import com.google.common.io.CharStreams;
 import com.google.gson.JsonParser;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
 import lombok.Getter;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
 import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -88,6 +95,7 @@ public class ProducerRegstrationTask {
         logger.warn("Registration of producer failed {}", t.getMessage());
     }
 
+    // Returns TRUE if registration is correct
     private Mono<Boolean> checkRegistration() {
         final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
         return restClient.get(url) //
@@ -116,7 +124,7 @@ public class ProducerRegstrationTask {
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
-                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
                         CONCURRENCY) //
                 .collectList() //
                 .doOnNext(type -> logger.info("Registering producer")) //
@@ -127,18 +135,39 @@ public class ProducerRegstrationTask {
         return jsonObject("{}");
     }
 
-    private ProducerInfoTypeInfo typeRegistrationInfo() {
-        return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+    private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+        try {
+            return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+        } catch (Exception e) {
+            logger.error("Fatal error {}", e.getMessage());
+            return null;
+        }
+    }
+
+    private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+
+        if (type.isKafkaTopicDefined()) {
+            String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json");
+            return jsonObject(schemaStrKafka);
+        } else {
+            // An object with no properties
+            String schemaStr = "{" //
+                    + "\"type\": \"object\"," //
+                    + "\"properties\": {}," //
+                    + "\"additionalProperties\": false" //
+                    + "}"; //
+
+            return jsonObject(schemaStr);
+        }
     }
 
-    private Object jsonSchemaObject() {
-        // An object with no properties
-        String schemaStr = "{" //
-                + "\"type\": \"object\"," //
-                + "\"properties\": {}," //
-                + "\"additionalProperties\": false" //
-                + "}"; //
-        return jsonObject(schemaStr);
+    private String readSchemaFile(String filePath) throws IOException, ServiceException {
+        InputStream in = getClass().getResourceAsStream(filePath);
+        logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+        if (in == null) {
+            throw new ServiceException("Could not readfile: " + filePath);
+        }
+        return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
     }
 
     private Object jsonObject(String json) {
diff --git a/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json
new file mode 100644 (file)
index 0000000..290b70a
--- /dev/null
@@ -0,0 +1,28 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+      "type": "string"
+    },
+    "maxConcurrency": {
+      "type": "integer"
+    },
+    "bufferTimeout": {
+      "type": "object",
+      "properties": {
+        "maxSize": {
+          "type": "integer"
+        },
+        "maxTimeMiliseconds": {
+          "type": "integer"
+        }
+      },
+      "required": [
+        "maxSize",
+        "maxTimeMiliseconds"
+      ]
+    }
+  },
+  "required": []
+}
\ No newline at end of file
index 4b6d901..70e89d6 100644 (file)
@@ -56,6 +56,15 @@ public class ConsumerController {
 
         public TestResults() {}
 
+        public boolean hasReceived(String str) {
+            for (String received : receivedBodies) {
+                if (received.equals(str)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
         public void reset() {
             receivedBodies.clear();
         }
index 828b027..8d1dda6 100644 (file)
@@ -79,7 +79,6 @@ public class EcsSimulatorController {
         } else {
             return new ResponseEntity<>(HttpStatus.NOT_FOUND);
         }
-
     }
 
     @PutMapping(path = API_ROOT + "/info-producers/{infoProducerId}", //
index 1cceef0..376d23e 100644 (file)
@@ -52,6 +52,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 
+@SuppressWarnings("java:S3577") // Rename class
 @ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 @TestPropertySource(properties = { //
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
new file mode 100644 (file)
index 0000000..a0db58a
--- /dev/null
@@ -0,0 +1,307 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.gson.JsonParser;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
+import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
+import org.oran.dmaapadapter.configuration.WebClientConfig;
+import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.r1.ConsumerJobInfo;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import reactor.core.publisher.Flux;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+@SuppressWarnings("java:S3577") // Rename class
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
+@TestPropertySource(properties = { //
+        "server.ssl.key-store=./config/keystore.jks", //
+        "app.webclient.trust-store=./config/truststore.jks", //
+        "app.configuration-filepath=./src/test/resources/test_application_configuration_kafka.json"//
+})
+class IntegrationWithKafka {
+
+    @Autowired
+    private ApplicationConfig applicationConfig;
+
+    @Autowired
+    private Jobs jobs;
+
+    @Autowired
+    private InfoTypes types;
+
+    @Autowired
+    private ConsumerController consumerController;
+
+    @Autowired
+    private EcsSimulatorController ecsSimulatorController;
+
+    @Autowired
+    private KafkaTopicConsumers kafkaTopicConsumers;
+
+    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+
+    private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+
+    @LocalServerPort
+    int localServerHttpPort;
+
+    static class TestApplicationConfig extends ApplicationConfig {
+        @Override
+        public String getEcsBaseUrl() {
+            return thisProcessUrl();
+        }
+
+        @Override
+        public String getDmaapBaseUrl() {
+            return thisProcessUrl();
+        }
+
+        @Override
+        public String getSelfUrl() {
+            return thisProcessUrl();
+        }
+
+        private String thisProcessUrl() {
+            final String url = "https://localhost:" + getLocalServerHttpPort();
+            return url;
+        }
+    }
+
+    /**
+     * Overrides the BeanFactory.
+     */
+    @TestConfiguration
+    static class TestBeanFactory extends BeanFactory {
+
+        @Override
+        @Bean
+        public ServletWebServerFactory servletContainer() {
+            return new TomcatServletWebServerFactory();
+        }
+
+        @Override
+        @Bean
+        public ApplicationConfig getApplicationConfig() {
+            TestApplicationConfig cfg = new TestApplicationConfig();
+            return cfg;
+        }
+    }
+
+    @AfterEach
+    void reset() {
+        this.consumerController.testResults.reset();
+        this.ecsSimulatorController.testResults.reset();
+        this.jobs.clear();
+    }
+
+    private AsyncRestClient restClient(boolean useTrustValidation) {
+        WebClientConfig config = this.applicationConfig.getWebClientConfig();
+        HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
+                .httpProxyHost("") //
+                .httpProxyPort(0) //
+                .build();
+        config = ImmutableWebClientConfig.builder() //
+                .keyStoreType(config.keyStoreType()) //
+                .keyStorePassword(config.keyStorePassword()) //
+                .keyStore(config.keyStore()) //
+                .keyPassword(config.keyPassword()) //
+                .isTrustStoreUsed(useTrustValidation) //
+                .trustStore(config.trustStore()) //
+                .trustStorePassword(config.trustStorePassword()) //
+                .httpProxyConfig(httpProxyConfig).build();
+
+        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+        return restClientFactory.createRestClientNoHttpProxy(baseUrl());
+    }
+
+    private AsyncRestClient restClient() {
+        return restClient(false);
+    }
+
+    private String baseUrl() {
+        return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
+    }
+
+    private Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) {
+        Job.Parameters param =
+                new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
+        String str = gson.toJson(param);
+        return jsonObject(str);
+    }
+
+    private Object jsonObject(String json) {
+        try {
+            return JsonParser.parseString(json).getAsJsonObject();
+        } catch (Exception e) {
+            throw new NullPointerException(e.toString());
+        }
+    }
+
+    private ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) {
+        try {
+            InfoType type = this.types.getAll().iterator().next();
+            String typeId = type.getId();
+            String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+            return new ConsumerJobInfo(typeId,
+                    jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri,
+                    "");
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private SenderOptions<Integer, String> senderOptions() {
+        String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
+
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return SenderOptions.create(props);
+    }
+
+    private SenderRecord<Integer, String, Integer> senderRecord(String data, int i) {
+        final InfoType infoType = this.types.getAll().iterator().next();
+        return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i);
+    }
+
+    private void sendDataToStream(Flux<SenderRecord<Integer, String, Integer>> dataToSend) {
+        final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
+
+        sender.send(dataToSend) //
+                .doOnError(e -> logger.error("Send failed", e)) //
+                .blockLast();
+
+    }
+
+    private void verifiedReceivedByConsumer(String... strings) {
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(strings.length));
+        for (String s : strings) {
+            assertTrue(consumer.hasReceived(s));
+        }
+    }
+
+    @Test
+    void kafkaIntegrationTest() throws InterruptedException {
+        final String JOB_ID1 = "ID1";
+        final String JOB_ID2 = "ID2";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+        // Create two jobs. One buffering and one with a filter
+        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+                restClient());
+        this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+        sendDataToStream(dataToSend);
+
+        verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]");
+
+        // Delete the jobs
+        this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
+        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+    }
+
+    @Test
+    void kafkaIOverflow() throws InterruptedException {
+        // This does not work. After an overflow, the kafka stream does not seem to work
+        //
+        final String JOB_ID1 = "ID1";
+        final String JOB_ID2 = "ID2";
+
+        // Register producer, Register types
+        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+        // Create two jobs.
+        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID1, restClient());
+        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+        var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+        sendDataToStream(dataToSend); // this will overflow
+
+        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+        await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
+        this.consumerController.testResults.reset();
+
+        kafkaTopicConsumers.restartNonRunningTasks();
+
+        dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+        sendDataToStream(dataToSend);
+
+        verifiedReceivedByConsumer("Message__1", "Message__1");
+    }
+
+}
index 8d211b8..794eb8e 100644 (file)
@@ -3,7 +3,7 @@
       {
          "id": "ExampleInformationType",
          "dmaapTopicUrl": "/dmaap-topic-1",
-         "useHttpProxy": true
+         "useHttpProxy": false
       }
    ]
 }
\ No newline at end of file
diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json
new file mode 100644 (file)
index 0000000..e2ea525
--- /dev/null
@@ -0,0 +1,9 @@
+{
+   "types": [
+      {
+         "id": "ExampleInformationType",
+         "kafkaInputTopic": "TutorialTopic",
+         "useHttpProxy": false
+      }
+   ]
+}
\ No newline at end of file
index 1b8e064..b7f23b1 100644 (file)
@@ -67,96 +67,85 @@ public class AsyncRestClient {
         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.post() //
-                    .uri(uri) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .body(bodyProducer, String.class);
-                return retrieve(traceTag, request);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .post() //
+            .uri(uri) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .body(bodyProducer, String.class);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> post(String uri, @Nullable String body) {
         return postForEntity(uri, body) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.post() //
-                    .uri(uri) //
-                    .headers(headers -> headers.setBasicAuth(username, password)) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .bodyValue(body);
-                return retrieve(traceTag, request) //
-                    .flatMap(this::toBody);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .post() //
+            .uri(uri) //
+            .headers(headers -> headers.setBasicAuth(username, password)) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .bodyValue(body);
+        return retrieve(traceTag, request) //
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: {}", traceTag, body);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.put() //
-                    .uri(uri) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .bodyValue(body);
-                return retrieve(traceTag, request);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .put() //
+            .uri(uri) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .bodyValue(body);
+        return retrieve(traceTag, request);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: <empty>", traceTag);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.put() //
-                    .uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient() //
+            .put() //
+            .uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> put(String uri, String body) {
         return putForEntity(uri, body) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.get().uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> get(String uri) {
         return getForEntity(uri) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.delete().uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> delete(String uri) {
         return deleteForEntity(uri) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
@@ -185,11 +174,11 @@ public class AsyncRestClient {
         }
     }
 
-    private Mono<String> toBody(ResponseEntity<String> entity) {
+    private String toBody(ResponseEntity<String> entity) {
         if (entity.getBody() == null) {
-            return Mono.just("");
+            return "";
         } else {
-            return Mono.just(entity.getBody());
+            return entity.getBody();
         }
     }
 
@@ -229,11 +218,10 @@ public class AsyncRestClient {
             .build();
     }
 
-    private Mono<WebClient> getWebClient() {
+    private WebClient getWebClient() {
         if (this.webClient == null) {
             this.webClient = buildWebClient(baseUrl);
         }
-        return Mono.just(buildWebClient(baseUrl));
+        return this.webClient;
     }
-
 }
index 9609e27..8c056fc 100644 (file)
@@ -298,7 +298,7 @@ public class A1eController {
         return validatePutEiJob(eiJobId, eiJobObject) //
             .flatMap(this::startEiJob) //
             .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
-            .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+            .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
             .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.INTERNAL_SERVER_ERROR)));
     }
 
@@ -306,7 +306,7 @@ public class A1eController {
         return this.producerCallbacks.startInfoSubscriptionJob(newEiJob, infoProducers) //
             .doOnNext(noOfAcceptingProducers -> this.logger.debug(
                 "Started EI job {}, number of activated producers: {}", newEiJob.getId(), noOfAcceptingProducers)) //
-            .flatMap(noOfAcceptingProducers -> Mono.just(newEiJob));
+            .map(noOfAcceptingProducers -> newEiJob);
     }
 
     private Mono<InfoJob> validatePutEiJob(String eiJobId, A1eEiJobInfo eiJobInfo) {
index 47a4a2e..b108380 100644 (file)
@@ -308,7 +308,7 @@ public class ConsumerController {
         return validatePutInfoJob(jobId, informationJobObject, performTypeCheck) //
             .flatMap(this::startInfoSubscriptionJob) //
             .doOnNext(this.infoJobs::put) //
-            .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+            .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
             .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
     }
 
@@ -441,7 +441,7 @@ public class ConsumerController {
         return this.producerCallbacks.startInfoSubscriptionJob(newInfoJob, infoProducers) //
             .doOnNext(noOfAcceptingProducers -> this.logger.debug("Started job {}, number of activated producers: {}",
                 newInfoJob.getId(), noOfAcceptingProducers)) //
-            .flatMap(noOfAcceptingProducers -> Mono.just(newInfoJob));
+            .map(noOfAcceptingProducers -> newInfoJob);
     }
 
     private Mono<InfoJob> validatePutInfoJob(String jobId, ConsumerJobInfo jobInfo, boolean performTypeCheck) {
index a97bdf6..558ae79 100644 (file)
@@ -84,7 +84,7 @@ public class ProducerCallbacks {
         return Flux.fromIterable(getProducersForJob(infoJob, infoProducers)) //
             .flatMap(infoProducer -> startInfoJob(infoProducer, infoJob, retrySpec)) //
             .collectList() //
-            .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
+            .map(okResponses -> Integer.valueOf(okResponses.size())); //
     }
 
     /**
index 65978e1..533199f 100644 (file)
@@ -222,8 +222,7 @@ public class InfoTypeSubscriptions {
     private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
         SubscriptionInfo subscriptionInfo) {
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
-        return Mono.just(1) //
-            .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
+        return notifyFunc.apply(subscriptionInfo) //
             .retryWhen(retrySpec) //
             .onErrorResume(throwable -> {
                 logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
index db7c29b..08c5fc8 100644 (file)
@@ -80,7 +80,7 @@ public class ProducerSupervision {
             })//
             .doOnNext(response -> handleRespondingProducer(response, producer))
             .flatMap(response -> checkProducerJobs(producer)) //
-            .flatMap(responses -> Mono.just(producer));
+            .map(responses -> producer);
     }
 
     private Mono<?> checkProducerJobs(InfoProducer producer) {
index 4418429..8c8ce5f 100644 (file)
@@ -1028,7 +1028,7 @@ class ApplicationTest {
         // Test that subscriptions are removed for a unresponsive consumer
 
         // PUT a subscription with a junk callback
-        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner");
+        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "/JUNK", "owner");
         String body = gson.toJson(info);
         restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
         assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);