From: Henrik Andersson Date: Fri, 26 Nov 2021 06:31:11 +0000 (+0000) Subject: Merge "Add output schema for ODU slice assurance usecase" X-Git-Tag: 1.2.0~25 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=2389606af9e77879c76e2f87822b5b7d68920d19;hp=14fb60fc82d5bd0baab5d0caafc6a5759bfab08b;p=nonrtric.git Merge "Add output schema for ODU slice assurance usecase" --- diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml index bd260eab..c3476ac6 100644 --- a/dmaap-adaptor-java/config/application.yaml +++ b/dmaap-adaptor-java/config/application.yaml @@ -46,7 +46,7 @@ app: # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s http.proxy-host: http.proxy-port: 0 - ecs-base-url: https://localhost:8434 + ics-base-url: https://localhost:8434 # Location of the component configuration file. The file will only be used if the Consul database is not used; # configuration from the Consul will override the file. configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java index b40c606a..3ea64e71 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java @@ -79,8 +79,8 @@ public class ApplicationConfig { private int localServerHttpPort; @Getter - @Value("${app.ecs-base-url}") - private String ecsBaseUrl; + @Value("${app.ics-base-url}") + private String icsBaseUrl; @Getter @Value("${app.dmaap-adapter-base-url}") diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java index e2538af1..f7cc14e9 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/MultiMap.java @@ -51,6 +51,14 @@ public class MultiMap { return null; } + public T get(String key1, String key2) { + Map innerMap = this.map.get(key1); + if (innerMap == null) { + return null; + } + return innerMap.get(key2); + } + public Collection get(String key) { Map innerMap = this.map.get(key); if (innerMap == null) { diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java index f677502c..2a16f475 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java @@ -31,7 +31,6 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks.Many; /** * The class streams data from a multi cast sink and sends the data to the Job @@ -75,7 +74,7 @@ public class KafkaJobDataConsumer { this.job = job; } - public synchronized void start(Many input) { + public synchronized void start(Flux input) { stop(); this.errorStats.resetKafkaErrors(); this.subscription = getMessagesFromKafka(input, job) // @@ -99,8 +98,8 @@ public class KafkaJobDataConsumer { public synchronized void stop() { if (this.subscription != null) { - subscription.dispose(); - subscription = null; + this.subscription.dispose(); + this.subscription = null; } } @@ -108,9 +107,8 @@ public class KafkaJobDataConsumer { return this.subscription != null; } - private Flux getMessagesFromKafka(Many input, Job job) { - Flux result = input.asFlux() // - .filter(job::isFilterMatch); + private Flux getMessagesFromKafka(Flux input, Job job) { + Flux result = input.filter(job::isFilterMatch); if (job.isBuffered()) { result = result.map(this::quote) // diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java index 29ad8c75..48090170 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java @@ -71,7 +71,6 @@ public class KafkaTopicConsumers { public void onJobRemoved(Job job) { removeJob(job); } - }); } @@ -84,7 +83,7 @@ public class KafkaTopicConsumers { topicConsumer.start(); } KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job); - subscription.start(topicConsumer.getOutput()); + subscription.start(topicConsumer.getOutput().asFlux()); consumers.put(job.getType().getId(), job.getId(), subscription); } } @@ -98,14 +97,12 @@ public class KafkaTopicConsumers { } @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS) - public synchronized void restartNonRunningTasks() { - this.consumers.keySet().forEach(typeId -> { - this.consumers.get(typeId).forEach(consumer -> { - if (!consumer.isRunning()) { - restartTopic(consumer); - } - }); - }); + public synchronized void restartNonRunningTopics() { + for (String typeId : this.consumers.keySet()) { + for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) { + restartTopic(consumer); + } + } } private void restartTopic(KafkaJobDataConsumer consumer) { @@ -116,8 +113,6 @@ public class KafkaTopicConsumers { } private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { - this.consumers.get(type.getId()).forEach((consumer) -> { - consumer.start(topic.getOutput()); - }); + this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 306cc6b4..c9284b5b 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -50,7 +50,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Registers the types and this producer in ECS. This is done when needed. + * Registers the types and this producer in Innformation Coordinator Service. + * This is done when needed. */ @Component @EnableScheduling @@ -65,7 +66,7 @@ public class ProducerRegstrationTask { private static final String PRODUCER_ID = "DmaapGenericInfoProducer"; @Getter - private boolean isRegisteredInEcs = false; + private boolean isRegisteredInIcs = false; private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5; public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) { @@ -78,7 +79,7 @@ public class ProducerRegstrationTask { @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) public void supervisionTask() { checkRegistration() // - .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) // + .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) // .flatMap(isRegisterred -> registerTypesAndProducer()) // .subscribe( // null, // @@ -87,7 +88,7 @@ public class ProducerRegstrationTask { } private void handleRegistrationCompleted() { - isRegisteredInEcs = true; + isRegisteredInIcs = true; } private void handleRegistrationFailure(Throwable t) { @@ -96,7 +97,7 @@ public class ProducerRegstrationTask { // Returns TRUE if registration is correct private Mono checkRegistration() { - final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return restClient.get(url) // .flatMap(this::isRegisterredInfoCorrect) // .onErrorResume(t -> Mono.just(Boolean.FALSE)); @@ -105,7 +106,7 @@ public class ProducerRegstrationTask { private Mono isRegisterredInfoCorrect(String registerredInfoStr) { ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); if (isEqual(producerRegistrationInfo(), registerredInfo)) { - logger.trace("Already registered in ECS"); + logger.trace("Already registered in ICS"); return Mono.just(Boolean.TRUE); } else { return Mono.just(Boolean.FALSE); @@ -113,13 +114,13 @@ public class ProducerRegstrationTask { } private String registerTypeUrl(InfoType type) { - return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); + return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); } private Mono registerTypesAndProducer() { final int CONCURRENCY = 20; final String producerUrl = - applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index e78313c0..0ea00564 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -50,6 +50,8 @@ import org.oran.dmaapadapter.r1.ProducerJobInfo; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; +import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer; +import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -65,6 +67,7 @@ import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -90,7 +93,10 @@ class ApplicationTest { private ConsumerController consumerController; @Autowired - private EcsSimulatorController ecsSimulatorController; + private IcsSimulatorController icsSimulatorController; + + @Autowired + KafkaTopicConsumers kafkaTopicConsumers; private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); @@ -99,7 +105,7 @@ class ApplicationTest { static class TestApplicationConfig extends ApplicationConfig { @Override - public String getEcsBaseUrl() { + public String getIcsBaseUrl() { return thisProcessUrl(); } @@ -147,7 +153,7 @@ class ApplicationTest { @AfterEach void reset() { this.consumerController.testResults.reset(); - this.ecsSimulatorController.testResults.reset(); + this.icsSimulatorController.testResults.reset(); this.jobs.clear(); } @@ -237,15 +243,54 @@ class ApplicationTest { } @Test - void testWholeChain() throws Exception { + void testReceiveAndPostDataFromKafka() { + final String JOB_ID = "ID"; + final String TYPE_ID = "KafkaInformationType"; + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + + // Create a job + Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1); + String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; + ConsumerJobInfo kafkaJobInfo = + new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); + + this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID); + + // Handle received data from Kafka, check that it has been posted to the + // consumer + kafkaConsumer.start(Flux.just("data")); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1)); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]"); + + // Test send an exception + kafkaConsumer.start(Flux.error(new NullPointerException())); + + // Test regular restart of stopped + kafkaConsumer.stop(); + this.kafkaTopicConsumers.restartNonRunningTopics(); + await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue()); + + // Delete the job + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + } + + @Test + void testReceiveAndPostDataFromDmaap() throws Exception { final String JOB_ID = "ID"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create a job - this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); // Return two messages from DMAAP and verify that these are sent to the owner of @@ -261,45 +306,25 @@ class ApplicationTest { assertThat(jobs).contains(JOB_ID); // Delete the job - this.ecsSimulatorController.deleteJob(JOB_ID, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); } @Test void testReRegister() throws Exception { // Wait foir register types and producer - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Clear the registration, should trigger a re-register - ecsSimulatorController.testResults.reset(); - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + icsSimulatorController.testResults.reset(); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Just clear the registerred types, should trigger a re-register - ecsSimulatorController.testResults.types.clear(); + icsSimulatorController.testResults.types.clear(); await().untilAsserted( - () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2)); - } - - @Test - void testCreateKafkaJob() { - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - - final String TYPE_ID = "KafkaInformationType"; - - Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); - String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL; - ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, ""); - - // Create a job - this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); - - // Delete the job - this.ecsSimulatorController.deleteJob("JOB_ID", restClient()); - await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); + () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2)); } private void testErrorCode(Mono request, HttpStatus expStatus, String responseContains) { diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java similarity index 99% rename from dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java rename to dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java index 1cf8903a..bff2634f 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java @@ -47,7 +47,7 @@ import org.springframework.web.bind.annotation.RestController; @RestController("IcsSimulatorController") @Tag(name = "Information Coordinator Service Simulator (exists only in test)") -public class EcsSimulatorController { +public class IcsSimulatorController { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final static Gson gson = new GsonBuilder().create(); diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java similarity index 98% rename from dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java rename to dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index c8fcb832..12696edc 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -86,7 +86,7 @@ class IntegrationWithEcs { static class TestApplicationConfig extends ApplicationConfig { @Override - public String getEcsBaseUrl() { + public String getIcsBaseUrl() { return "https://localhost:8434"; } @@ -161,7 +161,7 @@ class IntegrationWithEcs { } private String ecsBaseUrl() { - return applicationConfig.getEcsBaseUrl(); + return applicationConfig.getIcsBaseUrl(); } private String jobUrl(String jobId) { @@ -214,7 +214,7 @@ class IntegrationWithEcs { @Test void testCreateKafkaJob() { - await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); final String TYPE_ID = "KafkaInformationType"; Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1); @@ -233,7 +233,7 @@ class IntegrationWithEcs { @Test void testWholeChain() throws Exception { - await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue()); + await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*"); diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 9cd4fdd1..c38af8a9 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -94,7 +94,7 @@ class IntegrationWithKafka { private ConsumerController consumerController; @Autowired - private EcsSimulatorController ecsSimulatorController; + private IcsSimulatorController icsSimulatorController; @Autowired private KafkaTopicConsumers kafkaTopicConsumers; @@ -108,7 +108,7 @@ class IntegrationWithKafka { static class TestApplicationConfig extends ApplicationConfig { @Override - public String getEcsBaseUrl() { + public String getIcsBaseUrl() { return thisProcessUrl(); } @@ -151,7 +151,7 @@ class IntegrationWithKafka { @AfterEach void reset() { this.consumerController.testResults.reset(); - this.ecsSimulatorController.testResults.reset(); + this.icsSimulatorController.testResults.reset(); this.jobs.clear(); } @@ -252,13 +252,13 @@ class IntegrationWithKafka { final String JOB_ID2 = "ID2"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. One buffering and one with a filter - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1, restClient()); - this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); @@ -268,8 +268,8 @@ class IntegrationWithKafka { verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]"); // Delete the jobs - this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty()); @@ -281,13 +281,13 @@ class IntegrationWithKafka { final String JOB_ID2 = "ID2"; // Register producer, Register types - await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); + await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); + assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); // Create two jobs. - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1, + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1, restClient()); - this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); + this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); @@ -298,8 +298,8 @@ class IntegrationWithKafka { await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse()); this.consumerController.testResults.reset(); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job - kafkaTopicConsumers.restartNonRunningTasks(); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job + kafkaTopicConsumers.restartNonRunningTopics(); Thread.sleep(1000); // Restarting the input seems to take some asynch time dataToSend = Flux.just(senderRecord("Howdy\"")); @@ -308,8 +308,8 @@ class IntegrationWithKafka { verifiedReceivedByConsumer("[\"Howdy\\\"\"]"); // Delete the jobs - this.ecsSimulatorController.deleteJob(JOB_ID1, restClient()); - this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID1, restClient()); + this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isZero()); await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty()); diff --git a/dmaap-mediator-producer/README.md b/dmaap-mediator-producer/README.md index 2fd7194f..9a0903bb 100644 --- a/dmaap-mediator-producer/README.md +++ b/dmaap-mediator-producer/README.md @@ -38,19 +38,23 @@ At start up the producer will register the configured job types in ICS and also Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again. +The producer provides a REST API to control the log level. The available levels are the same as the ones used in the configuration above. + + PUT https://mrproducer:8085/admin/log?level= + ## Development To make it easy to test during development of the producer, two stubs are provided in the `stub` folder. -One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following: +One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port ` flag when starting the stub. To build and start the stub, do the following: >1. cd stub/dmaap >2. go build ->3. ./dmaap +>3. ./dmaap [-port \] -One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following: +One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port ` flag when starting the stub. To build and start the stub, do the following: >1. cd stub/consumer >2. go build ->3. ./consumer +>3. ./consumer [-port \] Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in. diff --git a/dmaap-mediator-producer/go.mod b/dmaap-mediator-producer/go.mod index ffea6a25..eaaecf7f 100644 --- a/dmaap-mediator-producer/go.mod +++ b/dmaap-mediator-producer/go.mod @@ -3,15 +3,15 @@ module oransc.org/nonrtric/dmaapmediatorproducer go 1.17 require ( + github.com/gorilla/mux v1.8.0 + github.com/hashicorp/go-retryablehttp v0.7.0 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/gorilla/mux v1.8.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.1 // indirect - github.com/hashicorp/go-retryablehttp v0.7.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.1.0 // indirect golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect diff --git a/dmaap-mediator-producer/go.sum b/dmaap-mediator-producer/go.sum index 8447fa07..4b3557bb 100644 --- a/dmaap-mediator-producer/go.sum +++ b/dmaap-mediator-producer/go.sum @@ -5,6 +5,7 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4= github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index eef1b5f9..34b056d5 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -21,6 +21,7 @@ package config import ( + "encoding/json" "fmt" "os" "strconv" @@ -81,3 +82,19 @@ func getLogLevel() log.Level { return log.InfoLevel } } + +func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) { + typeDefsByte, err := os.ReadFile(configFile) + if err != nil { + return nil, err + } + typeDefs := struct { + Types []TypeDefinition `json:"types"` + }{} + err = json.Unmarshal(typeDefsByte, &typeDefs) + if err != nil { + return nil, err + } + + return typeDefs.Types, nil +} diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index 90d3c036..293e0d0b 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -23,7 +23,7 @@ package config import ( "bytes" "os" - "reflect" + "path/filepath" "testing" log "github.com/sirupsen/logrus" @@ -75,9 +75,8 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T ProducerCertPath: "security/producer.crt", ProducerKeyPath: "security/producer.key", } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + got := New() + assertions.Equal(&wantConfig, got) logString := buf.String() assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used") } @@ -109,3 +108,30 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) { logString := buf.String() assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!") } + +const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` + +func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) { + assertions := require.New(t) + typesDir, err := os.MkdirTemp("", "configs") + if err != nil { + t.Errorf("Unable to create temporary directory for types due to: %v", err) + } + fname := filepath.Join(typesDir, "type_config.json") + t.Cleanup(func() { + os.RemoveAll(typesDir) + }) + if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { + t.Errorf("Unable to create temporary config file for types due to: %v", err) + } + + types, err := GetJobTypesFromConfiguration(fname) + + wantedType := TypeDefinition{ + Id: "type1", + DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + } + wantedTypes := []TypeDefinition{wantedType} + assertions.EqualValues(wantedTypes, types) + assertions.Nil(err) +} diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 6dad5fd9..b6616a1b 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -21,9 +21,7 @@ package jobs import ( - "encoding/json" "fmt" - "os" "sync" log "github.com/sirupsen/logrus" @@ -47,7 +45,7 @@ type JobInfo struct { } type JobTypesManager interface { - LoadTypesFromConfiguration() ([]config.TypeDefinition, error) + LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition GetSupportedTypes() []string } @@ -57,16 +55,14 @@ type JobsManager interface { } type JobsManagerImpl struct { - configFile string allTypes map[string]TypeData pollClient restclient.HTTPClient mrAddress string distributeClient restclient.HTTPClient } -func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { +func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { return &JobsManagerImpl{ - configFile: typeConfigFilePath, allTypes: make(map[string]TypeData), pollClient: pollClient, mrAddress: mrAddr, @@ -107,26 +103,15 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error { return nil } -func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) { - typeDefsByte, err := os.ReadFile(jm.configFile) - if err != nil { - return nil, err - } - typeDefs := struct { - Types []config.TypeDefinition `json:"types"` - }{} - err = json.Unmarshal(typeDefsByte, &typeDefs) - if err != nil { - return nil, err - } - for _, typeDef := range typeDefs.Types { +func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition { + for _, typeDef := range types { jm.allTypes[typeDef.Id] = TypeData{ TypeId: typeDef.Id, DMaaPTopicURL: typeDef.DmaapTopicURL, jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient), } } - return typeDefs.Types, nil + return types } func (jm *JobsManagerImpl) GetSupportedTypes() []string { diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 552b5fa1..30b4ffd9 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -24,8 +24,6 @@ import ( "bytes" "io/ioutil" "net/http" - "os" - "path/filepath" "sync" "testing" "time" @@ -38,26 +36,18 @@ const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unau func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) - typesDir, err := os.MkdirTemp("", "configs") - if err != nil { - t.Errorf("Unable to create temporary directory for types due to: %v", err) - } - fname := filepath.Join(typesDir, "type_config.json") - managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil) - t.Cleanup(func() { - os.RemoveAll(typesDir) - }) - if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { - t.Errorf("Unable to create temporary config file for types due to: %v", err) - } - types, err := managerUnderTest.LoadTypesFromConfiguration() + + managerUnderTest := NewJobsManagerImpl(nil, "", nil) + wantedType := config.TypeDefinition{ Id: "type1", DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", } wantedTypes := []config.TypeDefinition{wantedType} + + types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes) + assertions.EqualValues(wantedTypes, types) - assertions.Nil(err) supportedTypes := managerUnderTest.GetSupportedTypes() assertions.EqualValues([]string{"type1"}, supportedTypes) @@ -65,7 +55,7 @@ func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedT func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", @@ -93,7 +83,7 @@ func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } @@ -105,7 +95,7 @@ func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) managerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } @@ -120,7 +110,7 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) managerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } @@ -136,7 +126,7 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) jobsHandler := jobsHandler{ deleteJobCh: make(chan string)} managerUnderTest.allTypes["type1"] = TypeData{ @@ -192,7 +182,7 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) }) jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock) - jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock) + jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock) jobsManager.allTypes["type1"] = TypeData{ DMaaPTopicURL: "/topicUrl", TypeId: "type1", diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go index 8ccd4b21..9a827e7a 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go @@ -31,6 +31,7 @@ import ( "time" "github.com/hashicorp/go-retryablehttp" + log "github.com/sirupsen/logrus" ) // HTTPClient interface @@ -115,6 +116,7 @@ func CreateClientCertificate(certPath string, keyPath string) (tls.Certificate, func CreateRetryClient(cert tls.Certificate) *http.Client { rawRetryClient := retryablehttp.NewClient() + rawRetryClient.Logger = leveledLogger{} rawRetryClient.RetryWaitMax = time.Minute rawRetryClient.RetryMax = math.MaxInt rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert) @@ -145,3 +147,28 @@ func IsUrlSecure(configUrl string) bool { u, _ := url.Parse(configUrl) return u.Scheme == "https" } + +// Used to get leveled logging in the RetryClient +type leveledLogger struct { +} + +func (ll leveledLogger) Error(msg string, keysAndValues ...interface{}) { + log.WithFields(getFields(keysAndValues)).Error(msg) +} +func (ll leveledLogger) Info(msg string, keysAndValues ...interface{}) { + log.WithFields(getFields(keysAndValues)).Info(msg) +} +func (ll leveledLogger) Debug(msg string, keysAndValues ...interface{}) { + log.WithFields(getFields(keysAndValues)).Debug(msg) +} +func (ll leveledLogger) Warn(msg string, keysAndValues ...interface{}) { + log.WithFields(getFields(keysAndValues)).Warn(msg) +} + +func getFields(keysAndValues []interface{}) log.Fields { + fields := log.Fields{} + for i := 0; i < len(keysAndValues); i = i + 2 { + fields[fmt.Sprint(keysAndValues[i])] = keysAndValues[i+1] + } + return fields +} diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go index 79646c29..8c5577d7 100644 --- a/dmaap-mediator-producer/internal/server/server.go +++ b/dmaap-mediator-producer/internal/server/server.go @@ -27,6 +27,7 @@ import ( "net/http" "github.com/gorilla/mux" + log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" ) @@ -34,6 +35,8 @@ const StatusPath = "/status" const AddJobPath = "/jobs" const jobIdToken = "infoJobId" const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}" +const logLevelToken = "level" +const logAdminPath = "/admin/log" type ProducerCallbackHandler struct { jobsManager jobs.JobsManager @@ -51,6 +54,7 @@ func NewRouter(jm jobs.JobsManager) *mux.Router { r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status") r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add") r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete") + r.HandleFunc(logAdminPath, callbackHandler.setLogLevel).Methods(http.MethodPut).Name("setLogLevel") r.NotFoundHandler = ¬FoundHandler{} r.MethodNotAllowedHandler = &methodNotAllowedHandler{} return r @@ -87,6 +91,17 @@ func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r h.jobsManager.DeleteJobFromRESTCall(id) } +func (h *ProducerCallbackHandler) setLogLevel(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + logLevelStr := query.Get(logLevelToken) + if loglevel, err := log.ParseLevel(logLevelStr); err == nil { + log.SetLevel(loglevel) + } else { + http.Error(w, fmt.Sprintf("Invalid log level: %v. Log level will not be changed!", logLevelStr), http.StatusBadRequest) + return + } +} + type notFoundHandler struct{} func (h *notFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index 1d458c98..1db36446 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -78,6 +78,14 @@ func TestNewRouter(t *testing.T) { handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t)) assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code) assertions.Contains(responseRecorder.Body.String(), "Method is not supported.") + + setLogLevelRoute := r.Get("setLogLevel") + assertions.NotNil(setLogLevelRoute) + supportedMethods, err = setLogLevelRoute.GetMethods() + assertions.Equal([]string{http.MethodPut}, supportedMethods) + assertions.Nil(err) + path, _ = setLogLevelRoute.GetPathTemplate() + assertions.Equal("/admin/log", path) } func TestStatusHandler(t *testing.T) { @@ -119,7 +127,6 @@ func TestAddInfoJobHandler(t *testing.T) { }, }, wantedStatus: http.StatusOK, - wantedBody: "", }, { name: "AddInfoJobHandler with incorrect job info, should return BadRequest", @@ -171,6 +178,50 @@ func TestDeleteJob(t *testing.T) { jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1") } +func TestSetLogLevel(t *testing.T) { + assertions := require.New(t) + + type args struct { + logLevel string + } + tests := []struct { + name string + args args + wantedStatus int + wantedBody string + }{ + { + name: "Set to valid log level, should return OK", + args: args{ + logLevel: "Debug", + }, + wantedStatus: http.StatusOK, + }, + { + name: "Set to invalid log level, should return BadRequest", + args: args{ + logLevel: "bad", + }, + wantedStatus: http.StatusBadRequest, + wantedBody: "Invalid log level: bad", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + callbackHandlerUnderTest := NewProducerCallbackHandler(nil) + + handler := http.HandlerFunc(callbackHandlerUnderTest.setLogLevel) + responseRecorder := httptest.NewRecorder() + r, _ := http.NewRequest(http.MethodPut, "/admin/log?level="+tt.args.logLevel, nil) + + handler.ServeHTTP(responseRecorder, r) + + assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name) + assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name) + }) + } +} + func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request { var body io.Reader if jobInfo != nil { diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 194ed750..2d72466b 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -56,7 +56,7 @@ func main() { } retryClient := restclient.CreateRetryClient(cert) - jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 5*time.Second)) + jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second)) if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil { log.Fatalf("Stopping producer due to: %v", err) } @@ -87,13 +87,15 @@ func validateConfiguration(configuration *config.Config) error { } func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error { registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client) - if types, err := jobTypesHandler.LoadTypesFromConfiguration(); err == nil { - if regErr := registrator.RegisterTypes(types); regErr != nil { - return fmt.Errorf("unable to register all types due to: %v", regErr) - } - } else { - return fmt.Errorf("unable to get types to register due to: %v", err) + configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json") + if err != nil { + return fmt.Errorf("unable to register all types due to: %v", err) } + regErr := registrator.RegisterTypes(jobTypesHandler.LoadTypesFromConfiguration(configTypes)) + if regErr != nil { + return fmt.Errorf("unable to register all types due to: %v", regErr) + } + producer := config.ProducerRegistrationInfo{ InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath, SupportedInfoTypes: jobTypesHandler.GetSupportedTypes(), diff --git a/dmaap-mediator-producer/stub/consumer/consumerstub.go b/dmaap-mediator-producer/stub/consumer/consumerstub.go index 5cbcaeab..4260cae1 100644 --- a/dmaap-mediator-producer/stub/consumer/consumerstub.go +++ b/dmaap-mediator-producer/stub/consumer/consumerstub.go @@ -43,7 +43,7 @@ func main() { registerJob(*port) - fmt.Print("Starting consumer on port: ", *port) + fmt.Println("Starting consumer on port: ", *port) fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), nil)) } @@ -59,11 +59,11 @@ func registerJob(port int) { InfoTypeId: "STD_Fault_Messages", JobDefinition: "{}", } - fmt.Print("Registering consumer: ", jobInfo) + fmt.Println("Registering consumer: ", jobInfo) body, _ := json.Marshal(jobInfo) putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient) if putErr != nil { - fmt.Printf("Unable to register consumer: %v", putErr) + fmt.Println("Unable to register consumer: ", putErr) } } diff --git a/dmaap-mediator-producer/stub/dmaap/mrstub.go b/dmaap-mediator-producer/stub/dmaap/mrstub.go index 36ffa396..451bc9a1 100644 --- a/dmaap-mediator-producer/stub/dmaap/mrstub.go +++ b/dmaap-mediator-producer/stub/dmaap/mrstub.go @@ -71,12 +71,13 @@ func handleData(w http.ResponseWriter, req *http.Request) { var responseBody []byte if critical { responseBody = getFaultMessage("CRITICAL") + fmt.Println("Sending CRITICAL") critical = false } else { responseBody = getFaultMessage("NORMAL") + fmt.Println("Sending NORMAL") critical = true } - // w.Write(responseBody) fmt.Fprint(w, string(responseBody)) } diff --git a/test/usecases/oruclosedlooprecovery/scriptversion/helm/dmaap-mr/templates/deployment.yaml b/test/usecases/oruclosedlooprecovery/scriptversion/helm/dmaap-mr/templates/deployment.yaml index 700bb90f..d8ce48da 100644 --- a/test/usecases/oruclosedlooprecovery/scriptversion/helm/dmaap-mr/templates/deployment.yaml +++ b/test/usecases/oruclosedlooprecovery/scriptversion/helm/dmaap-mr/templates/deployment.yaml @@ -45,6 +45,13 @@ spec: {{- toYaml .Values.securityContext | nindent 12 }} image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} + env: + - name: TOPIC_READ + value: http://dmaap-mr:3904/events/unauthenticated.SEC_FAULT_OUTPUT + - name: TOPIC_WRITE + value: http://dmaap-mr:3904/events/unauthenticated.SEC_FAULT_OUTPUT + - name: GENERIC_TOPICS_UPLOAD_BASEURL + value: http://dmaap-mr:3904 ports: - name: http containerPort: 3904