From 242299199382ec3fd7d514dde2eb607086a6a46e Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 19 Nov 2021 13:48:07 +0100 Subject: [PATCH] NONRTRIC - Implement DMaaP mediator producer service in Java Added regexp filtering of dmaap messages. Fixed problem in kafka listening. Fixed problems in integration tests. Added a test for creating a kafka job, just to improve coverage. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: Id969a94dbcd2f52d6c3487f03c7e22b9c6582580 --- dmaap-adaptor-java/api/api.json | 4 ++ dmaap-adaptor-java/api/api.yaml | 6 +++ .../oran/dmaapadapter/clients/AsyncRestClient.java | 3 +- .../controllers/ProducerCallbacksController.java | 9 +++- .../org/oran/dmaapadapter/repository/MultiMap.java | 5 ++ .../dmaapadapter/tasks/DmaapTopicConsumer.java | 42 +++------------ .../dmaapadapter/tasks/KafkaJobDataConsumer.java | 7 ++- .../dmaapadapter/tasks/KafkaTopicConsumers.java | 32 ++++++----- .../dmaapadapter/tasks/KafkaTopicListener.java | 1 - .../tasks/ProducerRegstrationTask.java | 9 +++- .../src/main/resources/typeSchemaKafka.json | 3 +- .../org/oran/dmaapadapter/ApplicationTest.java | 36 +++++++++---- .../org/oran/dmaapadapter/IntegrationWithEcs.java | 62 +++++++++++++++------- .../oran/dmaapadapter/IntegrationWithKafka.java | 61 +++++++++++---------- .../resources/test_application_configuration.json | 7 ++- .../test_application_configuration_kafka.json | 9 ---- 16 files changed, 173 insertions(+), 123 deletions(-) delete mode 100644 dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json diff --git a/dmaap-adaptor-java/api/api.json b/dmaap-adaptor-java/api/api.json index 39056e91..6cd3525b 100644 --- a/dmaap-adaptor-java/api/api.json +++ b/dmaap-adaptor-java/api/api.json @@ -121,6 +121,10 @@ "description": "OK", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}} }, + "400": { + "description": "Other error in the request", + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/error_information"}}} + }, "404": { "description": "Information type is not found", "content": {"application/json": {"schema": {"$ref": "#/components/schemas/error_information"}}} diff --git a/dmaap-adaptor-java/api/api.yaml b/dmaap-adaptor-java/api/api.yaml index 3c9fb599..b3acfdaf 100644 --- a/dmaap-adaptor-java/api/api.yaml +++ b/dmaap-adaptor-java/api/api.yaml @@ -51,6 +51,12 @@ paths: application/json: schema: $ref: '#/components/schemas/void' + 400: + description: Other error in the request + content: + application/json: + schema: + $ref: '#/components/schemas/error_information' 404: description: Information type is not found content: diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java index 8b3efed5..d54ac44c 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -47,6 +47,7 @@ import reactor.netty.transport.ProxyProvider; /** * Generic reactive REST client. */ +@SuppressWarnings("java:S4449") // @Add Nullable to third party api public class AsyncRestClient { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -83,7 +84,7 @@ public class AsyncRestClient { } public Mono postWithAuthHeader(String uri, String body, String username, String password, - MediaType mediaType) { + @Nullable MediaType mediaType) { Object traceTag = createTraceTag(); logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index 07f5aa72..094ead7d 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -34,6 +34,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; import java.util.ArrayList; import java.util.Collection; +import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; @@ -77,6 +78,8 @@ public class ProducerCallbacksController { content = @Content(schema = @Schema(implementation = VoidResponse.class))), // @ApiResponse(responseCode = "404", description = "Information type is not found", // content = @Content(schema = @Schema(implementation = ErrorResponse.ErrorInfo.class))), // + @ApiResponse(responseCode = "400", description = "Other error in the request", // + content = @Content(schema = @Schema(implementation = ErrorResponse.ErrorInfo.class))) // }) public ResponseEntity jobCreatedCallback( // @RequestBody String body) { @@ -86,8 +89,12 @@ public class ProducerCallbacksController { this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner, request.lastUpdated, toJobParameters(request.jobData)); return new ResponseEntity<>(HttpStatus.OK); - } catch (Exception e) { + } catch (ServiceException e) { + logger.warn("jobCreatedCallback failed: {}", e.getMessage()); return ErrorResponse.create(e, HttpStatus.NOT_FOUND); + } catch (Exception e) { + logger.warn("jobCreatedCallback failed: {}", e.getMessage()); + return ErrorResponse.create(e, HttpStatus.BAD_REQUEST); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java index 38f3d175..e2538af1 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.Vector; /** @@ -58,6 +59,10 @@ public class MultiMap { return new Vector<>(innerMap.values()); } + public Set keySet() { + return this.map.keySet(); + } + public void clear() { this.map.clear(); } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index 217a0723..fe7ec8b7 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; /** @@ -44,42 +43,10 @@ public class DmaapTopicConsumer { private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class); private final AsyncRestClient dmaapRestClient; - private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); protected final ApplicationConfig applicationConfig; protected final InfoType type; protected final Jobs jobs; - /** Submits new elements until stopped */ - private static class InfiniteFlux { - private FluxSink sink; - private int counter = 0; - - public synchronized Flux start() { - stop(); - return Flux.create(this::next).doOnRequest(this::onRequest); - } - - public synchronized void stop() { - if (this.sink != null) { - this.sink.complete(); - this.sink = null; - } - } - - void onRequest(long no) { - logger.debug("InfiniteFlux.onRequest {}", no); - for (long i = 0; i < no; ++i) { - sink.next(counter++); - } - } - - void next(FluxSink sink) { - logger.debug("InfiniteFlux.next"); - this.sink = sink; - sink.next(counter++); - } - } - public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); @@ -89,14 +56,18 @@ public class DmaapTopicConsumer { } public void start() { - infiniteSubmitter.start() // + Flux.range(0, Integer.MAX_VALUE) // .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) // .flatMap(this::pushDataToConsumers) // .subscribe(// null, // throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); // + this::onComplete); // + } + private void onComplete() { + logger.warn("DmaapMessageConsumer completed {}", type.getId()); + start(); } private String getDmaapUrl() { @@ -128,6 +99,7 @@ public class DmaapTopicConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // + .filter(job -> job.isFilterMatch(body)) // .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) // .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index 5550ce0e..f677502c 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -82,10 +82,15 @@ public class KafkaJobDataConsumer { .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleConsumerSentOk, // - t -> stop(), // + this::handleExceptionInStream, // () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId())); } + private void handleExceptionInStream(Throwable t) { + logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId()); + stop(); + } + private Mono postToClient(String body) { logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body); MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null; diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 0ed85c6a..29ad8c75 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -30,6 +30,7 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.repository.MultiMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,7 +47,7 @@ public class KafkaTopicConsumers { private final Map topicListeners = new HashMap<>(); // Key is typeId @Getter - private final Map consumers = new HashMap<>(); // Key is jobId + private final MultiMap consumers = new MultiMap<>(); // Key is typeId, jobId private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; @@ -75,17 +76,21 @@ public class KafkaTopicConsumers { } public synchronized void addJob(Job job) { - if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) { + if (job.getType().isKafkaTopicDefined()) { + removeJob(job); logger.debug("Kafka job added {}", job.getId()); KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId()); + if (consumers.get(job.getType().getId()).isEmpty()) { + topicConsumer.start(); + } KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job); subscription.start(topicConsumer.getOutput()); - consumers.put(job.getId(), subscription); + consumers.put(job.getType().getId(), job.getId(), subscription); } } public synchronized void removeJob(Job job) { - KafkaJobDataConsumer d = consumers.remove(job.getId()); + KafkaJobDataConsumer d = consumers.remove(job.getType().getId(), job.getId()); if (d != null) { logger.debug("Kafka job removed {}", job.getId()); d.stop(); @@ -94,12 +99,13 @@ public class KafkaTopicConsumers { @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) public synchronized void restartNonRunningTasks() { - - for (KafkaJobDataConsumer consumer : consumers.values()) { - if (!consumer.isRunning()) { - restartTopic(consumer); - } - } + this.consumers.keySet().forEach(typeId -> { + this.consumers.get(typeId).forEach(consumer -> { + if (!consumer.isRunning()) { + restartTopic(consumer); + } + }); + }); } private void restartTopic(KafkaJobDataConsumer consumer) { @@ -110,10 +116,8 @@ public class KafkaTopicConsumers { } private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { - this.consumers.forEach((jobId, consumer) -> { - if (consumer.getJob().getType().getId().equals(type.getId())) { - consumer.start(topic.getOutput()); - } + this.consumers.get(type.getId()).forEach((consumer) -> { + consumer.start(topic.getOutput()); }); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index d1045ee0..f3b44a34 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -53,7 +53,6 @@ public class KafkaTopicListener { public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) { this.applicationConfig = applicationConfig; this.type = type; - start(); } public Many getOutput() { diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 8b5b6cfc..3a81f39a 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -152,12 +152,17 @@ public class ProducerRegstrationTask { // An object with no properties String schemaStr = "{" // + "\"type\": \"object\"," // - + "\"properties\": {}," // + + "\"properties\": {" // + + " \"filter\": { \"type\": \"string\" }" // + + "}," // + "\"additionalProperties\": false" // + "}"; // - return jsonObject(schemaStr); + return + + jsonObject(schemaStr); } + } private String readSchemaFile(String filePath) throws IOException, ServiceException { diff --git a/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json index 290b70ae..38e7807f 100644 --- a/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json +++ b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json @@ -18,11 +18,12 @@ "type": "integer" } }, + "additionalProperties": false, "required": [ "maxSize", "maxTimeMiliseconds" ] } }, - "required": [] + "additionalProperties": false } \ No newline at end of file diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 287c95ec..c4c9602a 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -46,8 +46,8 @@ import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; import org.oran.dmaapadapter.r1.ConsumerJobInfo; import org.oran.dmaapadapter.r1.ProducerJobInfo; -import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -174,8 +174,7 @@ class ApplicationTest { } private ConsumerJobInfo consumerJobInfo() { - InfoType type = this.types.getAll().iterator().next(); - return consumerJobInfo(type.getId(), "EI_JOB_ID"); + return consumerJobInfo("DmaapInformationType", "EI_JOB_ID"); } private Object jsonObject() { @@ -237,7 +236,7 @@ class ApplicationTest { // Register producer, Register types await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create a job this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); @@ -253,30 +252,48 @@ class ApplicationTest { String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL; String jobs = restClient().get(jobUrl).block(); - assertThat(jobs).contains("ExampleInformationType"); + assertThat(jobs).contains(JOB_ID); // Delete the job this.ecsSimulatorController.deleteJob(JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - } @Test void testReRegister() throws Exception { // Wait foir register types and producer await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Clear the registration, should trigger a re-register ecsSimulatorController.testResults.reset(); await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Just clear the registerred types, should trigger a re-register ecsSimulatorController.testResults.types.clear(); await().untilAsserted( - () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1)); + () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2)); + } + + @Test + void testCreateKafkaJob() { + await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + final String TYPE_ID = "KafkaInformationType"; + + Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); + String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; + ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); + // Create a job + this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + // Delete the job + this.ecsSimulatorController.deleteJob("JOB_ID", restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } private void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) { @@ -303,5 +320,4 @@ class ApplicationTest { } return true; } - } diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java index 376d23e5..c8fcb832 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java @@ -38,8 +38,8 @@ import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.r1.ConsumerJobInfo; -import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; +import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; import org.oran.dmaapadapter.tasks.ProducerRegstrationTask; import org.springframework.beans.factory.annotation.Autowired; @@ -63,7 +63,8 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; }) class IntegrationWithEcs { - private static final String EI_JOB_ID = "EI_JOB_ID"; + private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID"; + private static final String DMAAP_TYPE_ID = "DmaapInformationType"; @Autowired private ApplicationConfig applicationConfig; @@ -128,8 +129,7 @@ class IntegrationWithEcs { @AfterEach void reset() { this.consumerController.testResults.reset(); - this.jobs.clear(); - this.types.clear(); + assertThat(this.jobs.size()).isZero(); } private AsyncRestClient restClient(boolean useTrustValidation) { @@ -165,11 +165,11 @@ class IntegrationWithEcs { } private String jobUrl(String jobId) { - return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId; + return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId + "?typeCheck=true"; } - private void createInformationJobInEcs(String jobId) { - String body = gson.toJson(consumerJobInfo()); + private void createInformationJobInEcs(String typeId, String jobId, String filter) { + String body = gson.toJson(consumerJobInfo(typeId, filter)); try { // Delete the job if it already exists deleteInformationJobInEcs(jobId); @@ -182,13 +182,8 @@ class IntegrationWithEcs { restClient().delete(jobUrl(jobId)).block(); } - private ConsumerJobInfo consumerJobInfo() { - InfoType type = this.types.getAll().iterator().next(); - return consumerJobInfo(type.getId(), EI_JOB_ID); - } - - private Object jsonObject() { - return jsonObject("{}"); + private ConsumerJobInfo consumerJobInfo(String typeId, String filter) { + return consumerJobInfo(typeId, DMAAP_JOB_ID, filter); } private Object jsonObject(String json) { @@ -199,31 +194,60 @@ class IntegrationWithEcs { } } - private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) { + private String quote(String str) { + return "\"" + str + "\""; + } + + private String consumerUri() { + return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL; + } + + private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, String filter) { try { - String targetUri = selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL; - return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, ""); + + String jsonStr = "{ \"filter\" :" + quote(filter) + "}"; + return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), ""); } catch (Exception e) { return null; } } + @Test + void testCreateKafkaJob() { + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + final String TYPE_ID = "KafkaInformationType"; + + Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); + + ConsumerJobInfo jobInfo = + new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), ""); + String body = gson.toJson(jobInfo); + + restClient().putForEntity(jobUrl("KAFKA_JOB_ID"), body).block(); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + deleteInformationJobInEcs("KAFKA_JOB_ID"); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + @Test void testWholeChain() throws Exception { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); - createInformationJobInEcs(EI_JOB_ID); + createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*"); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); DmaapSimulatorController.dmaapResponses.add("DmaapResponse1"); DmaapSimulatorController.dmaapResponses.add("DmaapResponse2"); + DmaapSimulatorController.dmaapResponses.add("Junk"); ConsumerController.TestResults results = this.consumerController.testResults; await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2)); assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1"); - deleteInformationJobInEcs(EI_JOB_ID); + deleteInformationJobInEcs(DMAAP_JOB_ID); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 470e114e..9cd4fdd1 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -75,10 +75,12 @@ import reactor.kafka.sender.SenderRecord; @TestPropertySource(properties = { // "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // - "app.configuration-filepath=./src/test/resources/test_application_configuration_kafka.json"// + "app.configuration-filepath=./src/test/resources/test_application_configuration.json"// }) class IntegrationWithKafka { + final String TYPE_ID = "KafkaInformationType"; + @Autowired private ApplicationConfig applicationConfig; @@ -97,7 +99,7 @@ class IntegrationWithKafka { @Autowired private KafkaTopicConsumers kafkaTopicConsumers; - private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); + private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); @@ -181,14 +183,15 @@ class IntegrationWithKafka { return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); } - private Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) { + private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, + int maxConcurrency) { Job.Parameters param = new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency); String str = gson.toJson(param); return jsonObject(str); } - private Object jsonObject(String json) { + private static Object jsonObject(String json) { try { return JsonParser.parseString(json).getAsJsonObject(); } catch (Exception e) { @@ -196,12 +199,10 @@ class IntegrationWithKafka { } } - private ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) { + ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) { try { - InfoType type = this.types.getAll().iterator().next(); - String typeId = type.getId(); String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - return new ConsumerJobInfo(typeId, + return new ConsumerJobInfo(TYPE_ID, jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri, ""); } catch (Exception e) { @@ -221,9 +222,11 @@ class IntegrationWithKafka { return SenderOptions.create(props); } - private SenderRecord senderRecord(String data, int i) { - final InfoType infoType = this.types.getAll().iterator().next(); - return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i); + private SenderRecord senderRecord(String data) { + final InfoType infoType = this.types.get(TYPE_ID); + int key = 1; + int correlationMetadata = 2; + return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata); } private void sendDataToStream(Flux> dataToSend) { @@ -244,13 +247,13 @@ class IntegrationWithKafka { } @Test - void kafkaIntegrationTest() throws InterruptedException { + void kafkaIntegrationTest() throws Exception { final String JOB_ID1 = "ID1"; final String JOB_ID2 = "ID2"; // Register producer, Register types await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. One buffering and one with a filter this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, @@ -259,23 +262,17 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); - var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. + var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. sendDataToStream(dataToSend); verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); - // Just for testing quoting - this.consumerController.testResults.reset(); - dataToSend = Flux.just(senderRecord("Message\"_", 1)); - sendDataToStream(dataToSend); - verifiedReceivedByConsumer("[\"Message\\\"_1\"]"); - // Delete the jobs this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); - await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty()); + await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty()); } @Test @@ -285,29 +282,37 @@ class IntegrationWithKafka { // Register producer, Register types await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); + assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID1, restClient()); + this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1, + restClient()); this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); - var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc. + var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc. sendDataToStream(dataToSend); // this should overflow - KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next(); + KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().get(TYPE_ID).iterator().next(); await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); this.consumerController.testResults.reset(); - kafkaTopicConsumers.restartNonRunningTasks(); this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job + kafkaTopicConsumers.restartNonRunningTasks(); Thread.sleep(1000); // Restarting the input seems to take some asynch time - dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i)); + dataToSend = Flux.just(senderRecord("Howdy\"")); sendDataToStream(dataToSend); - verifiedReceivedByConsumer("Howdy_1"); + verifiedReceivedByConsumer("[\"Howdy\\\"\"]"); + + // Delete the jobs + this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); + this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); + + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty()); } } diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json index 794eb8ec..32e6c32d 100644 --- a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json +++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json @@ -1,9 +1,14 @@ { "types": [ { - "id": "ExampleInformationType", + "id": "DmaapInformationType", "dmaapTopicUrl": "/dmaap-topic-1", "useHttpProxy": false + }, + { + "id": "KafkaInformationType", + "kafkaInputTopic": "TutorialTopic", + "useHttpProxy": false } ] } \ No newline at end of file diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json deleted file mode 100644 index e2ea5256..00000000 --- a/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "types": [ - { - "id": "ExampleInformationType", - "kafkaInputTopic": "TutorialTopic", - "useHttpProxy": false - } - ] -} \ No newline at end of file -- 2.16.6