NONRTRIC - Implement DMaaP mediator producer service in Java 90/7090/2
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 19 Nov 2021 12:48:07 +0000 (13:48 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 22 Nov 2021 09:27:17 +0000 (10:27 +0100)
Added regexp filtering of dmaap messages.

Fixed problem in kafka listening.

Fixed problems in integration tests.

Added a test for creating a kafka job, just to improve coverage.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Id969a94dbcd2f52d6c3487f03c7e22b9c6582580

16 files changed:
dmaap-adaptor-java/api/api.json
dmaap-adaptor-java/api/api.yaml
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
dmaap-adaptor-java/src/test/resources/test_application_configuration.json
dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json [deleted file]

index 39056e9..6cd3525 100644 (file)
                         "description": "OK",
                         "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
                     },
+                    "400": {
+                        "description": "Other error in the request",
+                        "content": {"application/json": {"schema": {"$ref": "#/components/schemas/error_information"}}}
+                    },
                     "404": {
                         "description": "Information type is not found",
                         "content": {"application/json": {"schema": {"$ref": "#/components/schemas/error_information"}}}
index 3c9fb59..b3acfda 100644 (file)
@@ -51,6 +51,12 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/void'
+        400:
+          description: Other error in the request
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/error_information'
         404:
           description: Information type is not found
           content:
index 8b3efed..d54ac44 100644 (file)
@@ -47,6 +47,7 @@ import reactor.netty.transport.ProxyProvider;
 /**
  * Generic reactive REST client.
  */
+@SuppressWarnings("java:S4449") // @Add Nullable to third party api
 public class AsyncRestClient {
 
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -83,7 +84,7 @@ public class AsyncRestClient {
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password,
-            MediaType mediaType) {
+            @Nullable MediaType mediaType) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
index 07f5aa7..094ead7 100644 (file)
@@ -34,6 +34,7 @@ import io.swagger.v3.oas.annotations.tags.Tag;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ProducerJobInfo;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
@@ -77,6 +78,8 @@ public class ProducerCallbacksController {
                     content = @Content(schema = @Schema(implementation = VoidResponse.class))), //
             @ApiResponse(responseCode = "404", description = "Information type is not found", //
                     content = @Content(schema = @Schema(implementation = ErrorResponse.ErrorInfo.class))), //
+            @ApiResponse(responseCode = "400", description = "Other error in the request", //
+                    content = @Content(schema = @Schema(implementation = ErrorResponse.ErrorInfo.class))) //
     })
     public ResponseEntity<Object> jobCreatedCallback( //
             @RequestBody String body) {
@@ -86,8 +89,12 @@ public class ProducerCallbacksController {
             this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner,
                     request.lastUpdated, toJobParameters(request.jobData));
             return new ResponseEntity<>(HttpStatus.OK);
-        } catch (Exception e) {
+        } catch (ServiceException e) {
+            logger.warn("jobCreatedCallback failed: {}", e.getMessage());
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
+        } catch (Exception e) {
+            logger.warn("jobCreatedCallback failed: {}", e.getMessage());
+            return ErrorResponse.create(e, HttpStatus.BAD_REQUEST);
         }
     }
 
index 38f3d17..e2538af 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 
 /**
@@ -58,6 +59,10 @@ public class MultiMap<T> {
         return new Vector<>(innerMap.values());
     }
 
+    public Set<String> keySet() {
+        return this.map.keySet();
+    }
+
     public void clear() {
         this.map.clear();
     }
index 217a072..fe7ec8b 100644 (file)
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
 /**
@@ -44,42 +43,10 @@ public class DmaapTopicConsumer {
     private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
 
     private final AsyncRestClient dmaapRestClient;
-    private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
     protected final ApplicationConfig applicationConfig;
     protected final InfoType type;
     protected final Jobs jobs;
 
-    /** Submits new elements until stopped */
-    private static class InfiniteFlux {
-        private FluxSink<Integer> sink;
-        private int counter = 0;
-
-        public synchronized Flux<Integer> start() {
-            stop();
-            return Flux.create(this::next).doOnRequest(this::onRequest);
-        }
-
-        public synchronized void stop() {
-            if (this.sink != null) {
-                this.sink.complete();
-                this.sink = null;
-            }
-        }
-
-        void onRequest(long no) {
-            logger.debug("InfiniteFlux.onRequest {}", no);
-            for (long i = 0; i < no; ++i) {
-                sink.next(counter++);
-            }
-        }
-
-        void next(FluxSink<Integer> sink) {
-            logger.debug("InfiniteFlux.next");
-            this.sink = sink;
-            sink.next(counter++);
-        }
-    }
-
     public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
         this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
@@ -89,14 +56,18 @@ public class DmaapTopicConsumer {
     }
 
     public void start() {
-        infiniteSubmitter.start() //
+        Flux.range(0, Integer.MAX_VALUE) //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
                 .flatMap(this::pushDataToConsumers) //
                 .subscribe(//
                         null, //
                         throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
-                        () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); //
+                        this::onComplete); //
+    }
 
+    private void onComplete() {
+        logger.warn("DmaapMessageConsumer completed {}", type.getId());
+        start();
     }
 
     private String getDmaapUrl() {
@@ -128,6 +99,7 @@ public class DmaapTopicConsumer {
 
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
+                .filter(job -> job.isFilterMatch(body)) //
                 .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
                 .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
index 5550ce0..f677502 100644 (file)
@@ -82,10 +82,15 @@ public class KafkaJobDataConsumer {
                 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
-                        t -> stop(), //
+                        this::handleExceptionInStream, //
                         () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
     }
 
+    private void handleExceptionInStream(Throwable t) {
+        logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+        stop();
+    }
+
     private Mono<String> postToClient(String body) {
         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
         MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
index 0ed85c6..29ad8c7 100644 (file)
@@ -30,6 +30,7 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.MultiMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -46,7 +47,7 @@ public class KafkaTopicConsumers {
     private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
 
     @Getter
-    private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
+    private final MultiMap<KafkaJobDataConsumer> consumers = new MultiMap<>(); // Key is typeId, jobId
 
     private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
 
@@ -75,17 +76,21 @@ public class KafkaTopicConsumers {
     }
 
     public synchronized void addJob(Job job) {
-        if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+        if (job.getType().isKafkaTopicDefined()) {
+            removeJob(job);
             logger.debug("Kafka job added {}", job.getId());
             KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
+            if (consumers.get(job.getType().getId()).isEmpty()) {
+                topicConsumer.start();
+            }
             KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
             subscription.start(topicConsumer.getOutput());
-            consumers.put(job.getId(), subscription);
+            consumers.put(job.getType().getId(), job.getId(), subscription);
         }
     }
 
     public synchronized void removeJob(Job job) {
-        KafkaJobDataConsumer d = consumers.remove(job.getId());
+        KafkaJobDataConsumer d = consumers.remove(job.getType().getId(), job.getId());
         if (d != null) {
             logger.debug("Kafka job removed {}", job.getId());
             d.stop();
@@ -94,12 +99,13 @@ public class KafkaTopicConsumers {
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
     public synchronized void restartNonRunningTasks() {
-
-        for (KafkaJobDataConsumer consumer : consumers.values()) {
-            if (!consumer.isRunning()) {
-                restartTopic(consumer);
-            }
-        }
+        this.consumers.keySet().forEach(typeId -> {
+            this.consumers.get(typeId).forEach(consumer -> {
+                if (!consumer.isRunning()) {
+                    restartTopic(consumer);
+                }
+            });
+        });
     }
 
     private void restartTopic(KafkaJobDataConsumer consumer) {
@@ -110,10 +116,8 @@ public class KafkaTopicConsumers {
     }
 
     private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
-        this.consumers.forEach((jobId, consumer) -> {
-            if (consumer.getJob().getType().getId().equals(type.getId())) {
-                consumer.start(topic.getOutput());
-            }
+        this.consumers.get(type.getId()).forEach((consumer) -> {
+            consumer.start(topic.getOutput());
         });
     }
 }
index d1045ee..f3b44a3 100644 (file)
@@ -53,7 +53,6 @@ public class KafkaTopicListener {
     public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
         this.type = type;
-        start();
     }
 
     public Many<String> getOutput() {
index 8b5b6cf..3a81f39 100644 (file)
@@ -152,12 +152,17 @@ public class ProducerRegstrationTask {
             // An object with no properties
             String schemaStr = "{" //
                     + "\"type\": \"object\"," //
-                    + "\"properties\": {}," //
+                    + "\"properties\": {" //
+                    + "   \"filter\": { \"type\": \"string\" }" //
+                    + "}," //
                     + "\"additionalProperties\": false" //
                     + "}"; //
 
-            return jsonObject(schemaStr);
+            return
+
+            jsonObject(schemaStr);
         }
+
     }
 
     private String readSchemaFile(String filePath) throws IOException, ServiceException {
index 290b70a..38e7807 100644 (file)
           "type": "integer"
         }
       },
+      "additionalProperties": false,
       "required": [
         "maxSize",
         "maxTimeMiliseconds"
       ]
     }
   },
-  "required": []
+  "additionalProperties": false
 }
\ No newline at end of file
index 287c95e..c4c9602 100644 (file)
@@ -46,8 +46,8 @@ import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.r1.ProducerJobInfo;
-import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -174,8 +174,7 @@ class ApplicationTest {
     }
 
     private ConsumerJobInfo consumerJobInfo() {
-        InfoType type = this.types.getAll().iterator().next();
-        return consumerJobInfo(type.getId(), "EI_JOB_ID");
+        return consumerJobInfo("DmaapInformationType", "EI_JOB_ID");
     }
 
     private Object jsonObject() {
@@ -237,7 +236,7 @@ class ApplicationTest {
 
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create a job
         this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
@@ -253,30 +252,48 @@ class ApplicationTest {
 
         String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
         String jobs = restClient().get(jobUrl).block();
-        assertThat(jobs).contains("ExampleInformationType");
+        assertThat(jobs).contains(JOB_ID);
 
         // Delete the job
         this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-
     }
 
     @Test
     void testReRegister() throws Exception {
         // Wait foir register types and producer
         await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Clear the registration, should trigger a re-register
         ecsSimulatorController.testResults.reset();
         await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Just clear the registerred types, should trigger a re-register
         ecsSimulatorController.testResults.types.clear();
         await().untilAsserted(
-                () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1));
+                () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
+    }
+
+    @Test
+    void testCreateKafkaJob() {
+        await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+        final String TYPE_ID = "KafkaInformationType";
+
+        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+        String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+        ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
 
+        // Create a job
+        this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        // Delete the job
+        this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
     }
 
     private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
@@ -303,5 +320,4 @@ class ApplicationTest {
         }
         return true;
     }
-
 }
index 376d23e..c8fcb83 100644 (file)
@@ -38,8 +38,8 @@ import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
-import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -63,7 +63,8 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
 })
 class IntegrationWithEcs {
 
-    private static final String EI_JOB_ID = "EI_JOB_ID";
+    private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID";
+    private static final String DMAAP_TYPE_ID = "DmaapInformationType";
 
     @Autowired
     private ApplicationConfig applicationConfig;
@@ -128,8 +129,7 @@ class IntegrationWithEcs {
     @AfterEach
     void reset() {
         this.consumerController.testResults.reset();
-        this.jobs.clear();
-        this.types.clear();
+        assertThat(this.jobs.size()).isZero();
     }
 
     private AsyncRestClient restClient(boolean useTrustValidation) {
@@ -165,11 +165,11 @@ class IntegrationWithEcs {
     }
 
     private String jobUrl(String jobId) {
-        return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId;
+        return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId + "?typeCheck=true";
     }
 
-    private void createInformationJobInEcs(String jobId) {
-        String body = gson.toJson(consumerJobInfo());
+    private void createInformationJobInEcs(String typeId, String jobId, String filter) {
+        String body = gson.toJson(consumerJobInfo(typeId, filter));
         try {
             // Delete the job if it already exists
             deleteInformationJobInEcs(jobId);
@@ -182,13 +182,8 @@ class IntegrationWithEcs {
         restClient().delete(jobUrl(jobId)).block();
     }
 
-    private ConsumerJobInfo consumerJobInfo() {
-        InfoType type = this.types.getAll().iterator().next();
-        return consumerJobInfo(type.getId(), EI_JOB_ID);
-    }
-
-    private Object jsonObject() {
-        return jsonObject("{}");
+    private ConsumerJobInfo consumerJobInfo(String typeId, String filter) {
+        return consumerJobInfo(typeId, DMAAP_JOB_ID, filter);
     }
 
     private Object jsonObject(String json) {
@@ -199,31 +194,60 @@ class IntegrationWithEcs {
         }
     }
 
-    private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) {
+    private String quote(String str) {
+        return "\"" + str + "\"";
+    }
+
+    private String consumerUri() {
+        return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+    }
+
+    private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, String filter) {
         try {
-            String targetUri = selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-            return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, "");
+
+            String jsonStr = "{ \"filter\" :" + quote(filter) + "}";
+            return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), "");
         } catch (Exception e) {
             return null;
         }
     }
 
+    @Test
+    void testCreateKafkaJob() {
+        await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+        final String TYPE_ID = "KafkaInformationType";
+
+        Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+
+        ConsumerJobInfo jobInfo =
+                new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
+        String body = gson.toJson(jobInfo);
+
+        restClient().putForEntity(jobUrl("KAFKA_JOB_ID"), body).block();
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        deleteInformationJobInEcs("KAFKA_JOB_ID");
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+    }
+
     @Test
     void testWholeChain() throws Exception {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
 
-        createInformationJobInEcs(EI_JOB_ID);
+        createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
 
         DmaapSimulatorController.dmaapResponses.add("DmaapResponse1");
         DmaapSimulatorController.dmaapResponses.add("DmaapResponse2");
+        DmaapSimulatorController.dmaapResponses.add("Junk");
 
         ConsumerController.TestResults results = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2));
         assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
 
-        deleteInformationJobInEcs(EI_JOB_ID);
+        deleteInformationJobInEcs(DMAAP_JOB_ID);
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
 
index 470e114..9cd4fdd 100644 (file)
@@ -75,10 +75,12 @@ import reactor.kafka.sender.SenderRecord;
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
-        "app.configuration-filepath=./src/test/resources/test_application_configuration_kafka.json"//
+        "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
 })
 class IntegrationWithKafka {
 
+    final String TYPE_ID = "KafkaInformationType";
+
     @Autowired
     private ApplicationConfig applicationConfig;
 
@@ -97,7 +99,7 @@ class IntegrationWithKafka {
     @Autowired
     private KafkaTopicConsumers kafkaTopicConsumers;
 
-    private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+    private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
     private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
 
@@ -181,14 +183,15 @@ class IntegrationWithKafka {
         return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
     }
 
-    private Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) {
+    private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
+            int maxConcurrency) {
         Job.Parameters param =
                 new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
         String str = gson.toJson(param);
         return jsonObject(str);
     }
 
-    private Object jsonObject(String json) {
+    private static Object jsonObject(String json) {
         try {
             return JsonParser.parseString(json).getAsJsonObject();
         } catch (Exception e) {
@@ -196,12 +199,10 @@ class IntegrationWithKafka {
         }
     }
 
-    private ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) {
+    ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) {
         try {
-            InfoType type = this.types.getAll().iterator().next();
-            String typeId = type.getId();
             String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
-            return new ConsumerJobInfo(typeId,
+            return new ConsumerJobInfo(TYPE_ID,
                     jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri,
                     "");
         } catch (Exception e) {
@@ -221,9 +222,11 @@ class IntegrationWithKafka {
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<Integer, String, Integer> senderRecord(String data, int i) {
-        final InfoType infoType = this.types.getAll().iterator().next();
-        return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i);
+    private SenderRecord<Integer, String, Integer> senderRecord(String data) {
+        final InfoType infoType = this.types.get(TYPE_ID);
+        int key = 1;
+        int correlationMetadata = 2;
+        return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
     }
 
     private void sendDataToStream(Flux<SenderRecord<Integer, String, Integer>> dataToSend) {
@@ -244,13 +247,13 @@ class IntegrationWithKafka {
     }
 
     @Test
-    void kafkaIntegrationTest() throws InterruptedException {
+    void kafkaIntegrationTest() throws Exception {
         final String JOB_ID1 = "ID1";
         final String JOB_ID2 = "ID2";
 
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs. One buffering and one with a filter
         this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
@@ -259,23 +262,17 @@ class IntegrationWithKafka {
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
-        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+        var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend);
 
         verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
 
-        // Just for testing quoting
-        this.consumerController.testResults.reset();
-        dataToSend = Flux.just(senderRecord("Message\"_", 1));
-        sendDataToStream(dataToSend);
-        verifiedReceivedByConsumer("[\"Message\\\"_1\"]");
-
         // Delete the jobs
         this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
         this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
     }
 
     @Test
@@ -285,29 +282,37 @@ class IntegrationWithKafka {
 
         // Register producer, Register types
         await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+        assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
 
         // Create two jobs.
-        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID1, restClient());
+        this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
+                restClient());
         this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
-        var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+        var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend); // this should overflow
 
-        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next();
+        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().get(TYPE_ID).iterator().next();
         await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
         this.consumerController.testResults.reset();
 
-        kafkaTopicConsumers.restartNonRunningTasks();
         this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+        kafkaTopicConsumers.restartNonRunningTasks();
         Thread.sleep(1000); // Restarting the input seems to take some asynch time
 
-        dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i));
+        dataToSend = Flux.just(senderRecord("Howdy\""));
         sendDataToStream(dataToSend);
 
-        verifiedReceivedByConsumer("Howdy_1");
+        verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
+
+        // Delete the jobs
+        this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
+        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
     }
 
 }
index 794eb8e..32e6c32 100644 (file)
@@ -1,9 +1,14 @@
 {
    "types": [
       {
-         "id": "ExampleInformationType",
+         "id": "DmaapInformationType",
          "dmaapTopicUrl": "/dmaap-topic-1",
          "useHttpProxy": false
+      },
+      {
+         "id": "KafkaInformationType",
+         "kafkaInputTopic": "TutorialTopic",
+         "useHttpProxy": false
       }
    ]
 }
\ No newline at end of file
diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json
deleted file mode 100644 (file)
index e2ea525..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-{
-   "types": [
-      {
-         "id": "ExampleInformationType",
-         "kafkaInputTopic": "TutorialTopic",
-         "useHttpProxy": false
-      }
-   ]
-}
\ No newline at end of file