Merge "Use env varialbes to replace image urls & tags"
authorHenrik Andersson <henrik.b.andersson@est.tech>
Fri, 19 Nov 2021 09:54:16 +0000 (09:54 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Fri, 19 Nov 2021 09:54:16 +0000 (09:54 +0000)
36 files changed:
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaJobDataConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
dmaap-mediator-producer/Dockerfile
dmaap-mediator-producer/README.md
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/internal/server/server.go
dmaap-mediator-producer/internal/server/server_test.go
dmaap-mediator-producer/main.go
dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
dmaap-mediator-producer/stub/consumer/consumerstub.go
dmaap-mediator-producer/stub/dmaap/mrstub.go
onap/oran
test/usecases/oruclosedlooprecovery/goversion/.gitignore
test/usecases/oruclosedlooprecovery/goversion/README.md
test/usecases/oruclosedlooprecovery/goversion/go.mod
test/usecases/oruclosedlooprecovery/goversion/go.sum
test/usecases/oruclosedlooprecovery/goversion/internal/config/config.go
test/usecases/oruclosedlooprecovery/goversion/internal/config/config_test.go
test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go
test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go
test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go
test/usecases/oruclosedlooprecovery/goversion/main.go
test/usecases/oruclosedlooprecovery/goversion/main_test.go
test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go [deleted file]
test/usecases/oruclosedlooprecovery/goversion/security/consumer.crt [new file with mode: 0644]
test/usecases/oruclosedlooprecovery/goversion/security/consumer.key [new file with mode: 0644]
test/usecases/oruclosedlooprecovery/goversion/stub/producer/producerstub.go [moved from test/usecases/oruclosedlooprecovery/goversion/simulator/producer.go with 100% similarity]
test/usecases/oruclosedlooprecovery/goversion/stub/sdnr/sdnrstub.go [new file with mode: 0644]

index 6939026..8b3efed 100644 (file)
@@ -62,7 +62,8 @@ public class AsyncRestClient {
         this.httpProxyConfig = httpProxyConfig;
     }
 
-    public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+    public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body,
+            @Nullable MediaType contentType) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
@@ -71,17 +72,18 @@ public class AsyncRestClient {
         RequestHeadersSpec<?> request = getWebClient() //
                 .post() //
                 .uri(uri) //
-                .contentType(MediaType.APPLICATION_JSON) //
+                .contentType(contentType) //
                 .body(bodyProducer, String.class);
         return retrieve(traceTag, request);
     }
 
-    public Mono<String> post(String uri, @Nullable String body) {
-        return postForEntity(uri, body) //
+    public Mono<String> post(String uri, @Nullable String body, @Nullable MediaType contentType) {
+        return postForEntity(uri, body, contentType) //
                 .map(this::toBody);
     }
 
-    public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
+    public Mono<String> postWithAuthHeader(String uri, String body, String username, String password,
+            MediaType mediaType) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
@@ -90,7 +92,7 @@ public class AsyncRestClient {
                 .post() //
                 .uri(uri) //
                 .headers(headers -> headers.setBasicAuth(username, password)) //
-                .contentType(MediaType.APPLICATION_JSON) //
+                .contentType(mediaType) //
                 .bodyValue(body);
         return retrieve(traceTag, request) //
                 .map(this::toBody);
index 507d9b6..217a072 100644 (file)
@@ -29,6 +29,7 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.Jobs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
@@ -128,7 +129,7 @@ public class DmaapTopicConsumer {
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
                 .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
-                .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
+                .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }
 }
index d240129..5550ce0 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import lombok.Getter;
+
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.Disposable;
@@ -35,29 +38,58 @@ import reactor.core.publisher.Sinks.Many;
  * owner via REST calls.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-
 public class KafkaJobDataConsumer {
     private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
-    private final Many<String> input;
+    @Getter
     private final Job job;
     private Disposable subscription;
-    private int errorCounter = 0;
+    private final ErrorStats errorStats = new ErrorStats();
+
+    private class ErrorStats {
+        private int consumerFaultCounter = 0;
+        private boolean kafkaError = false; // eg. overflow
+
+        public void handleOkFromConsumer() {
+            this.consumerFaultCounter = 0;
+        }
+
+        public void handleException(Throwable t) {
+            if (t instanceof WebClientResponseException) {
+                ++this.consumerFaultCounter;
+            } else {
+                kafkaError = true;
+            }
+        }
 
-    KafkaJobDataConsumer(Many<String> input, Job job) {
-        this.input = input;
+        public boolean isItHopeless() {
+            final int STOP_AFTER_ERRORS = 5;
+            return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
+        }
+
+        public void resetKafkaErrors() {
+            kafkaError = false;
+        }
+    }
+
+    public KafkaJobDataConsumer(Job job) {
         this.job = job;
     }
 
-    public synchronized void start() {
+    public synchronized void start(Many<String> input) {
         stop();
-        this.subscription = getMessagesFromKafka(job) //
-                .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
-                .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
+        this.errorStats.resetKafkaErrors();
+        this.subscription = getMessagesFromKafka(input, job) //
+                .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
-                        this::handleErrorInStream, //
-                        () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
-                                job.getType().getId()));
+                        t -> stop(), //
+                        () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+    }
+
+    private Mono<String> postToClient(String body) {
+        logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
+        MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
+        return job.getConsumerRestClient().post("", body, contentType);
     }
 
     public synchronized void stop() {
@@ -71,43 +103,37 @@ public class KafkaJobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> getMessagesFromKafka(Job job) {
+    private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
         Flux<String> result = input.asFlux() //
                 .filter(job::isFilterMatch);
 
         if (job.isBuffered()) {
-            result = result.bufferTimeout( //
-                    job.getParameters().getBufferTimeout().getMaxSize(), //
-                    job.getParameters().getBufferTimeout().getMaxTime()) //
+            result = result.map(this::quote) //
+                    .bufferTimeout( //
+                            job.getParameters().getBufferTimeout().getMaxSize(), //
+                            job.getParameters().getBufferTimeout().getMaxTime()) //
                     .map(Object::toString);
         }
         return result;
     }
 
-    private Mono<String> handleError(Throwable t) {
-        logger.warn("exception: {} job: {}", t.getMessage(), job);
+    private String quote(String str) {
+        final String q = "\"";
+        return q + str.replace(q, "\\\"") + q;
+    }
 
-        final int STOP_AFTER_ERRORS = 5;
-        if (t instanceof WebClientResponseException) {
-            if (++this.errorCounter > STOP_AFTER_ERRORS) {
-                logger.error("Stopping job {}", job);
-                return Mono.error(t);
-            } else {
-                return Mono.empty(); // Discard
-            }
+    private Mono<String> handleError(Throwable t) {
+        logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+        this.errorStats.handleException(t);
+        if (this.errorStats.isItHopeless()) {
+            return Mono.error(t);
         } else {
-            // This can happen if there is an overflow.
-            return Mono.empty();
+            return Mono.empty(); // Ignore
         }
     }
 
     private void handleConsumerSentOk(String data) {
-        this.errorCounter = 0;
-    }
-
-    private void handleErrorInStream(Throwable t) {
-        logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
-        this.subscription = null;
+        this.errorStats.handleOkFromConsumer();
     }
 
 }
index 785f98b..0ed85c6 100644 (file)
@@ -43,9 +43,10 @@ import org.springframework.stereotype.Component;
 public class KafkaTopicConsumers {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
 
-    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+    private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
+
     @Getter
-    private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
+    private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
 
     private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
 
@@ -74,17 +75,17 @@ public class KafkaTopicConsumers {
     }
 
     public synchronized void addJob(Job job) {
-        if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+        if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
             logger.debug("Kafka job added {}", job.getId());
             KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
-            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
-            subscription.start();
-            activeSubscriptions.put(job.getId(), subscription);
+            KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
+            subscription.start(topicConsumer.getOutput());
+            consumers.put(job.getId(), subscription);
         }
     }
 
     public synchronized void removeJob(Job job) {
-        KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
+        KafkaJobDataConsumer d = consumers.remove(job.getId());
         if (d != null) {
             logger.debug("Kafka job removed {}", job.getId());
             d.stop();
@@ -93,11 +94,26 @@ public class KafkaTopicConsumers {
 
     @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
     public synchronized void restartNonRunningTasks() {
-        for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+
+        for (KafkaJobDataConsumer consumer : consumers.values()) {
             if (!consumer.isRunning()) {
-                consumer.start();
+                restartTopic(consumer);
             }
         }
     }
 
+    private void restartTopic(KafkaJobDataConsumer consumer) {
+        InfoType type = consumer.getJob().getType();
+        KafkaTopicListener topic = this.topicListeners.get(type.getId());
+        topic.start();
+        restartConsumersOfType(topic, type);
+    }
+
+    private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
+        this.consumers.forEach((jobId, consumer) -> {
+            if (consumer.getJob().getType().getId().equals(type.getId())) {
+                consumer.start(topic.getOutput());
+            }
+        });
+    }
 }
index 0452b88..d1045ee 100644 (file)
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -48,24 +47,25 @@ public class KafkaTopicListener {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
-    private final Many<String> output;
+    private Many<String> output;
+    private Disposable topicReceiverTask;
 
     public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
-
-        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
-        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
         this.type = type;
-        startKafkaTopicReceiver();
+        start();
     }
 
     public Many<String> getOutput() {
         return this.output;
     }
 
-    private Disposable startKafkaTopicReceiver() {
-        return KafkaReceiver.create(kafkaInputProperties()) //
+    public void start() {
+        stop();
+        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+        this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+        logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
+        topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
                 .receive() //
                 .doOnNext(this::onReceivedData) //
                 .subscribe(null, //
@@ -73,7 +73,14 @@ public class KafkaTopicListener {
                         () -> logger.warn("KafkaTopicReceiver stopped"));
     }
 
-    private void onReceivedData(ConsumerRecord<Integer, String> input) {
+    private void stop() {
+        if (topicReceiverTask != null) {
+            topicReceiverTask.dispose();
+            topicReceiverTask = null;
+        }
+    }
+
+    private void onReceivedData(ConsumerRecord<String, String> input) {
         logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
         output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
     }
@@ -82,17 +89,17 @@ public class KafkaTopicListener {
         logger.error("KafkaTopicReceiver error: {}", t.getMessage());
     }
 
-    private ReceiverOptions<Integer, String> kafkaInputProperties() {
+    private ReceiverOptions<String, String> kafkaInputProperties() {
         Map<String, Object> consumerProps = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
-        return ReceiverOptions.<Integer, String>create(consumerProps)
+        return ReceiverOptions.<String, String>create(consumerProps)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
     }
 
index 7b719e3..8b5b6cf 100644 (file)
@@ -87,7 +87,6 @@ public class ProducerRegstrationTask {
     }
 
     private void handleRegistrationCompleted() {
-        logger.debug("Registering types and producer completed");
         isRegisteredInEcs = true;
     }
 
@@ -106,7 +105,7 @@ public class ProducerRegstrationTask {
     private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
         ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
         if (isEqual(producerRegistrationInfo(), registerredInfo)) {
-            logger.trace("Already registered");
+            logger.trace("Already registered in ECS");
             return Mono.just(Boolean.TRUE);
         } else {
             return Mono.just(Boolean.FALSE);
index 1ca4fac..287c95e 100644 (file)
@@ -227,7 +227,8 @@ class ApplicationTest {
 
         ProducerJobInfo info = new ProducerJobInfo(null, "id", "typeId", "targetUri", "owner", "lastUpdated");
         String body = gson.toJson(info);
-        testErrorCode(restClient().post(jobUrl, body), HttpStatus.NOT_FOUND, "Could not find type");
+        testErrorCode(restClient().post(jobUrl, body, MediaType.APPLICATION_JSON), HttpStatus.NOT_FOUND,
+                "Could not find type");
     }
 
     @Test
index 8d1dda6..1cf8903 100644 (file)
@@ -105,7 +105,7 @@ public class EcsSimulatorController {
                 new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP");
         String body = gson.toJson(request);
         logger.info("ECS Simulator PUT job: {}", body);
-        restClient.post(url, body).block();
+        restClient.post(url, body, MediaType.APPLICATION_JSON).block();
     }
 
     public void deleteJob(String jobId, AsyncRestClient restClient) {
index a0db58a..470e114 100644 (file)
@@ -262,20 +262,24 @@ class IntegrationWithKafka {
         var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
         sendDataToStream(dataToSend);
 
-        verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]");
+        verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+
+        // Just for testing quoting
+        this.consumerController.testResults.reset();
+        dataToSend = Flux.just(senderRecord("Message\"_", 1));
+        sendDataToStream(dataToSend);
+        verifiedReceivedByConsumer("[\"Message\\\"_1\"]");
 
         // Delete the jobs
         this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
         this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
 
         await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+        await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty());
     }
 
     @Test
     void kafkaIOverflow() throws InterruptedException {
-        // This does not work. After an overflow, the kafka stream does not seem to work
-        //
         final String JOB_ID1 = "ID1";
         final String JOB_ID2 = "ID2";
 
@@ -290,18 +294,20 @@ class IntegrationWithKafka {
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
         var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
-        sendDataToStream(dataToSend); // this will overflow
+        sendDataToStream(dataToSend); // this should overflow
 
-        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+        KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next();
         await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
         this.consumerController.testResults.reset();
 
         kafkaTopicConsumers.restartNonRunningTasks();
+        this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+        Thread.sleep(1000); // Restarting the input seems to take some asynch time
 
-        dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+        dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i));
         sendDataToStream(dataToSend);
 
-        verifiedReceivedByConsumer("Message__1", "Message__1");
+        verifiedReceivedByConsumer("Howdy_1");
     }
 
 }
index bc09fdc..1c7f45c 100644 (file)
@@ -20,7 +20,7 @@
 ##
 ## Build
 ##
-FROM golang:1.17-bullseye AS build
+FROM nexus3.o-ran-sc.org:10001/golang:1.17-bullseye AS build
 WORKDIR /app
 COPY go.mod .
 COPY go.sum .
index 90f8471..2fd7194 100644 (file)
@@ -36,7 +36,7 @@ The configured public key and cerificate shall be PEM-encoded. A self signed cer
 
 At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types. If ICS is unavailable, the producer will retry to connect indefinetely. The same goes for MR.
 
-Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
 
 ## Development
 
index 1c42942..6dad5fd 100644 (file)
@@ -34,7 +34,7 @@ import (
 type TypeData struct {
        TypeId        string `json:"id"`
        DMaaPTopicURL string `json:"dmaapTopicUrl"`
-       jobHandler    *jobHandler
+       jobsHandler   *jobsHandler
 }
 
 type JobInfo struct {
@@ -52,8 +52,8 @@ type JobTypesManager interface {
 }
 
 type JobsManager interface {
-       AddJob(JobInfo) error
-       DeleteJob(jobId string)
+       AddJobFromRESTCall(JobInfo) error
+       DeleteJobFromRESTCall(jobId string)
 }
 
 type JobsManagerImpl struct {
@@ -64,17 +64,6 @@ type JobsManagerImpl struct {
        distributeClient restclient.HTTPClient
 }
 
-type jobHandler struct {
-       mu               sync.Mutex
-       typeId           string
-       topicUrl         string
-       jobs             map[string]JobInfo
-       addJobCh         chan JobInfo
-       deleteJobCh      chan string
-       pollClient       restclient.HTTPClient
-       distributeClient restclient.HTTPClient
-}
-
 func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
        return &JobsManagerImpl{
                configFile:       typeConfigFilePath,
@@ -85,10 +74,10 @@ func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPCli
        }
 }
 
-func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
+func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
        if err := jm.validateJobInfo(ji); err == nil {
                typeData := jm.allTypes[ji.InfoTypeIdentity]
-               typeData.jobHandler.addJobCh <- ji
+               typeData.jobsHandler.addJobCh <- ji
                log.Debug("Added job: ", ji)
                return nil
        } else {
@@ -96,10 +85,10 @@ func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
        }
 }
 
-func (jm *JobsManagerImpl) DeleteJob(jobId string) {
+func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
        for _, typeData := range jm.allTypes {
                log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
-               typeData.jobHandler.deleteJobCh <- jobId
+               typeData.jobsHandler.deleteJobCh <- jobId
        }
        log.Debug("Deleted job: ", jobId)
 }
@@ -131,21 +120,10 @@ func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition
                return nil, err
        }
        for _, typeDef := range typeDefs.Types {
-               addCh := make(chan JobInfo)
-               deleteCh := make(chan string)
-               jh := jobHandler{
-                       typeId:           typeDef.Id,
-                       topicUrl:         typeDef.DmaapTopicURL,
-                       jobs:             make(map[string]JobInfo),
-                       addJobCh:         addCh,
-                       deleteJobCh:      deleteCh,
-                       pollClient:       jm.pollClient,
-                       distributeClient: jm.distributeClient,
-               }
                jm.allTypes[typeDef.Id] = TypeData{
                        TypeId:        typeDef.Id,
                        DMaaPTopicURL: typeDef.DmaapTopicURL,
-                       jobHandler:    &jh,
+                       jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
                }
        }
        return typeDefs.Types, nil
@@ -159,15 +137,38 @@ func (jm *JobsManagerImpl) GetSupportedTypes() []string {
        return supportedTypes
 }
 
-func (jm *JobsManagerImpl) StartJobs() {
+func (jm *JobsManagerImpl) StartJobsForAllTypes() {
        for _, jobType := range jm.allTypes {
 
-               go jobType.jobHandler.start(jm.mrAddress)
+               go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
+
+       }
+}
+
+type jobsHandler struct {
+       mu               sync.Mutex
+       typeId           string
+       topicUrl         string
+       jobs             map[string]job
+       addJobCh         chan JobInfo
+       deleteJobCh      chan string
+       pollClient       restclient.HTTPClient
+       distributeClient restclient.HTTPClient
+}
 
+func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+       return &jobsHandler{
+               typeId:           typeId,
+               topicUrl:         topicURL,
+               jobs:             make(map[string]job),
+               addJobCh:         make(chan JobInfo),
+               deleteJobCh:      make(chan string),
+               pollClient:       pollClient,
+               distributeClient: distributeClient,
        }
 }
 
-func (jh *jobHandler) start(mRAddress string) {
+func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
        go func() {
                for {
                        jh.pollAndDistributeMessages(mRAddress)
@@ -181,45 +182,104 @@ func (jh *jobHandler) start(mRAddress string) {
        }()
 }
 
-func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) {
+func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
        log.Debugf("Processing jobs for type: %v", jh.typeId)
        messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
        if error != nil {
-               log.Warnf("Error getting data from MR. Cause: %v", error)
+               log.Warn("Error getting data from MR. Cause: ", error)
        }
-       log.Debugf("Received messages: %v", string(messagesBody))
+       log.Debug("Received messages: ", string(messagesBody))
        jh.distributeMessages(messagesBody)
 }
 
-func (jh *jobHandler) distributeMessages(messages []byte) {
+func (jh *jobsHandler) distributeMessages(messages []byte) {
        if len(messages) > 2 {
                jh.mu.Lock()
                defer jh.mu.Unlock()
-               for _, jobInfo := range jh.jobs {
-                       go jh.sendMessagesToConsumer(messages, jobInfo)
+               for _, job := range jh.jobs {
+                       if len(job.messagesChannel) < cap(job.messagesChannel) {
+                               job.messagesChannel <- messages
+                       } else {
+                               jh.emptyMessagesBuffer(job)
+                       }
                }
        }
 }
 
-func (jh *jobHandler) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
-       log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
-       if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
-               log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+func (jh *jobsHandler) emptyMessagesBuffer(job job) {
+       log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
+out:
+       for {
+               select {
+               case <-job.messagesChannel:
+               default:
+                       break out
+               }
        }
-       log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
 }
 
-func (jh *jobHandler) monitorManagementChannels() {
+func (jh *jobsHandler) monitorManagementChannels() {
        select {
        case addedJob := <-jh.addJobCh:
-               jh.mu.Lock()
-               log.Debugf("received %v from addJobCh\n", addedJob)
-               jh.jobs[addedJob.InfoJobIdentity] = addedJob
-               jh.mu.Unlock()
+               jh.addJob(addedJob)
        case deletedJob := <-jh.deleteJobCh:
-               jh.mu.Lock()
-               log.Debugf("received %v from deleteJobCh\n", deletedJob)
+               jh.deleteJob(deletedJob)
+       }
+}
+
+func (jh *jobsHandler) addJob(addedJob JobInfo) {
+       jh.mu.Lock()
+       log.Debug("Add job: ", addedJob)
+       newJob := newJob(addedJob, jh.distributeClient)
+       go newJob.start()
+       jh.jobs[addedJob.InfoJobIdentity] = newJob
+       jh.mu.Unlock()
+}
+
+func (jh *jobsHandler) deleteJob(deletedJob string) {
+       jh.mu.Lock()
+       log.Debug("Delete job: ", deletedJob)
+       j, exist := jh.jobs[deletedJob]
+       if exist {
+               j.controlChannel <- struct{}{}
                delete(jh.jobs, deletedJob)
-               jh.mu.Unlock()
        }
+       jh.mu.Unlock()
+}
+
+type job struct {
+       jobInfo         JobInfo
+       client          restclient.HTTPClient
+       messagesChannel chan []byte
+       controlChannel  chan struct{}
+}
+
+func newJob(j JobInfo, c restclient.HTTPClient) job {
+       return job{
+               jobInfo:         j,
+               client:          c,
+               messagesChannel: make(chan []byte, 10),
+               controlChannel:  make(chan struct{}),
+       }
+}
+
+func (j *job) start() {
+out:
+       for {
+               select {
+               case <-j.controlChannel:
+                       log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
+                       break out
+               case msg := <-j.messagesChannel:
+                       j.sendMessagesToConsumer(msg)
+               }
+       }
+}
+
+func (j *job) sendMessagesToConsumer(messages []byte) {
+       log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
+       if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
+               log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
+       }
+       log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
 }
index 3651a13..552b5fa 100644 (file)
@@ -36,7 +36,7 @@ import (
 
 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
 
-func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
        assertions := require.New(t)
        typesDir, err := os.MkdirTemp("", "configs")
        if err != nil {
@@ -63,7 +63,7 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes
        assertions.EqualValues([]string{"type1"}, supportedTypes)
 }
 
-func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
+func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        wantedJob := JobInfo{
@@ -74,36 +74,36 @@ func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
                InfoJobData:      "{}",
                InfoTypeIdentity: "type1",
        }
-       jobHandler := jobHandler{
+       jobsHandler := jobsHandler{
                addJobCh: make(chan JobInfo)}
        managerUnderTest.allTypes["type1"] = TypeData{
-               TypeId:     "type1",
-               jobHandler: &jobHandler,
+               TypeId:      "type1",
+               jobsHandler: &jobsHandler,
        }
 
        var err error
        go func() {
-               err = managerUnderTest.AddJob(wantedJob)
+               err = managerUnderTest.AddJobFromRESTCall(wantedJob)
        }()
 
        assertions.Nil(err)
-       addedJob := <-jobHandler.addJobCh
+       addedJob := <-jobsHandler.addJobCh
        assertions.Equal(wantedJob, addedJob)
 }
 
-func TestManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
        }
 
-       err := managerUnderTest.AddJob(jobInfo)
+       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
        assertions.Equal("type not supported: type1", err.Error())
 }
 
-func TestManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
@@ -113,12 +113,12 @@ func TestManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
        }
-       err := managerUnderTest.AddJob(jobInfo)
+       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
        assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
 }
 
-func TestManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
@@ -129,38 +129,42 @@ func TestManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
                InfoTypeIdentity: "type1",
                InfoJobIdentity:  "job1",
        }
-       err := managerUnderTest.AddJob(jobInfo)
+       err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
        assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
 }
 
-func TestManagerDeleteJob(t *testing.T) {
+func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
        assertions := require.New(t)
        managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
-       jobHandler := jobHandler{
+       jobsHandler := jobsHandler{
                deleteJobCh: make(chan string)}
        managerUnderTest.allTypes["type1"] = TypeData{
-               TypeId:     "type1",
-               jobHandler: &jobHandler,
+               TypeId:      "type1",
+               jobsHandler: &jobsHandler,
        }
 
-       go managerUnderTest.DeleteJob("job2")
+       go managerUnderTest.DeleteJobFromRESTCall("job2")
 
-       assertions.Equal("job2", <-jobHandler.deleteJobCh)
+       assertions.Equal("job2", <-jobsHandler.deleteJobCh)
 }
 
-func TestHandlerPollAndDistributeMessages(t *testing.T) {
+func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
        assertions := require.New(t)
 
-       wg := sync.WaitGroup{}
+       called := false
        messages := `[{"message": {"data": "data"}}]`
        pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://mrAddr/topicUrl" {
                        assertions.Equal(req.Method, "GET")
-                       wg.Done() // Signal that the poll call has been made
+                       body := "[]"
+                       if !called {
+                               called = true
+                               body = messages
+                       }
                        return &http.Response{
                                StatusCode: 200,
-                               Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
+                               Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
                                Header:     make(http.Header), // Must be set to non-nil value or it panics
                        }
                }
@@ -168,12 +172,14 @@ func TestHandlerPollAndDistributeMessages(t *testing.T) {
                t.Fail()
                return nil
        })
+
+       wg := sync.WaitGroup{}
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
-                       assertions.Equal(messages, getBodyAsString(req))
+                       assertions.Equal(messages, getBodyAsString(req, t))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
-                       wg.Done() // Signal that the distribution call has been made
+                       wg.Done()
                        return &http.Response{
                                StatusCode: 200,
                                Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
@@ -184,73 +190,73 @@ func TestHandlerPollAndDistributeMessages(t *testing.T) {
                t.Fail()
                return nil
        })
+       jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
+
+       jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+       jobsManager.allTypes["type1"] = TypeData{
+               DMaaPTopicURL: "/topicUrl",
+               TypeId:        "type1",
+               jobsHandler:   jobsHandler,
+       }
+
+       jobsManager.StartJobsForAllTypes()
 
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
                InfoJobIdentity:  "job1",
                TargetUri:        "http://consumerHost/target",
        }
-       handlerUnderTest := jobHandler{
-               topicUrl:         "/topicUrl",
-               jobs:             map[string]JobInfo{jobInfo.InfoJobIdentity: jobInfo},
-               pollClient:       pollClientMock,
-               distributeClient: distributeClientMock,
-       }
 
-       wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
-       handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
+       wg.Add(1) // Wait till the distribution has happened
+       err := jobsManager.AddJobFromRESTCall(jobInfo)
+       assertions.Nil(err)
 
-       if waitTimeout(&wg, 100*time.Millisecond) {
+       if waitTimeout(&wg, 2*time.Second) {
                t.Error("Not all calls to server were made")
                t.Fail()
        }
 }
 
-func TestHandlerAddJob_shouldAddJobToJobsMap(t *testing.T) {
-       assertions := require.New(t)
+func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
+       jobToDelete := newJob(JobInfo{}, nil)
+       go jobToDelete.start()
+       jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+       jobsHandler.jobs["job1"] = jobToDelete
 
-       jobInfo := JobInfo{
-               InfoTypeIdentity: "type1",
-               InfoJobIdentity:  "job1",
-               TargetUri:        "http://consumerHost/target",
-       }
+       go jobsHandler.monitorManagementChannels()
 
-       addCh := make(chan JobInfo)
-       handlerUnderTest := jobHandler{
-               mu:       sync.Mutex{},
-               jobs:     map[string]JobInfo{},
-               addJobCh: addCh,
-       }
+       jobsHandler.deleteJobCh <- "job1"
 
-       go func() {
-               addCh <- jobInfo
-       }()
-
-       handlerUnderTest.monitorManagementChannels()
-
-       assertions.Len(handlerUnderTest.jobs, 1)
-       assertions.Equal(jobInfo, handlerUnderTest.jobs["job1"])
+       deleted := false
+       for i := 0; i < 100; i++ {
+               if len(jobsHandler.jobs) == 0 {
+                       deleted = true
+                       break
+               }
+               time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
+       }
+       require.New(t).True(deleted, "Job not deleted")
 }
 
-func TestHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
-       assertions := require.New(t)
+func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
+       job := newJob(JobInfo{
+               InfoJobIdentity: "job",
+       }, nil)
 
-       deleteCh := make(chan string)
-       handlerUnderTest := jobHandler{
-               mu: sync.Mutex{},
-               jobs: map[string]JobInfo{"job1": {
-                       InfoJobIdentity: "job1",
-               }},
-               deleteJobCh: deleteCh,
-       }
+       jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+       jobsHandler.jobs["job1"] = job
 
-       go func() {
-               deleteCh <- "job1"
-       }()
+       fillMessagesBuffer(job.messagesChannel)
 
-       handlerUnderTest.monitorManagementChannels()
+       jobsHandler.distributeMessages([]byte("sent msg"))
 
-       assertions.Len(handlerUnderTest.jobs, 0)
+       require.New(t).Len(job.messagesChannel, 0)
+}
+
+func fillMessagesBuffer(mc chan []byte) {
+       for i := 0; i < cap(mc); i++ {
+               mc <- []byte("msg")
+       }
 }
 
 type RoundTripFunc func(req *http.Request) *http.Response
@@ -282,8 +288,10 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
        }
 }
 
-func getBodyAsString(req *http.Request) string {
+func getBodyAsString(req *http.Request, t *testing.T) string {
        buf := new(bytes.Buffer)
-       buf.ReadFrom(req.Body)
+       if _, err := buf.ReadFrom(req.Body); err != nil {
+               t.Fail()
+       }
        return buf.String()
 }
index 8bed1f9..79646c2 100644 (file)
@@ -71,7 +71,7 @@ func (h *ProducerCallbackHandler) addInfoJobHandler(w http.ResponseWriter, r *ht
                http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest)
                return
        }
-       if err := h.jobsManager.AddJob(jobInfo); err != nil {
+       if err := h.jobsManager.AddJobFromRESTCall(jobInfo); err != nil {
                http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
        }
 }
@@ -84,7 +84,7 @@ func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r
                return
        }
 
-       h.jobsManager.DeleteJob(id)
+       h.jobsManager.DeleteJobFromRESTCall(id)
 }
 
 type notFoundHandler struct{}
index 5c2027a..1d458c9 100644 (file)
@@ -136,7 +136,7 @@ func TestAddInfoJobHandler(t *testing.T) {
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        jobHandlerMock := jobhandler.JobHandler{}
-                       jobHandlerMock.On("AddJob", tt.args.job).Return(tt.args.mockReturn)
+                       jobHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
 
                        callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
 
@@ -148,7 +148,7 @@ func TestAddInfoJobHandler(t *testing.T) {
 
                        assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
                        assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
-                       jobHandlerMock.AssertCalled(t, "AddJob", tt.args.job)
+                       jobHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
                })
        }
 }
@@ -156,7 +156,7 @@ func TestAddInfoJobHandler(t *testing.T) {
 func TestDeleteJob(t *testing.T) {
        assertions := require.New(t)
        jobHandlerMock := jobhandler.JobHandler{}
-       jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil)
+       jobHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
 
        callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
 
@@ -168,7 +168,7 @@ func TestDeleteJob(t *testing.T) {
 
        assertions.Equal("", responseRecorder.Body.String())
 
-       jobHandlerMock.AssertCalled(t, "DeleteJob", "job1")
+       jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
 }
 
 func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
index 74f4edf..194ed75 100644 (file)
@@ -60,7 +60,7 @@ func main() {
        if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
                log.Fatalf("Stopping producer due to: %v", err)
        }
-       jobsManager.StartJobs()
+       jobsManager.StartJobsForAllTypes()
 
        log.Debug("Starting DMaaP Mediator Producer")
        go func() {
index 8e30b1c..ad20752 100644 (file)
@@ -13,7 +13,7 @@ type JobHandler struct {
 }
 
 // AddJob provides a mock function with given fields: _a0
-func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error {
+func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
        ret := _m.Called(_a0)
 
        var r0 error
@@ -27,6 +27,6 @@ func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error {
 }
 
 // DeleteJob provides a mock function with given fields: jobId
-func (_m *JobHandler) DeleteJob(jobId string) {
+func (_m *JobHandler) DeleteJobFromRESTCall(jobId string) {
        _m.Called(jobId)
 }
index 03e67c0..5cbcaea 100644 (file)
@@ -44,7 +44,7 @@ func main() {
        registerJob(*port)
 
        fmt.Print("Starting consumer on port: ", *port)
-       http.ListenAndServe(fmt.Sprintf(":%v", *port), nil)
+       fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), nil))
 }
 
 func registerJob(port int) {
index 82ae08d..36ffa39 100644 (file)
@@ -57,7 +57,7 @@ func main() {
        http.HandleFunc("/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", handleData)
 
        fmt.Print("Starting mr on port: ", *port)
-       http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../security/producer.crt", "../../security/producer.key", nil)
+       fmt.Println(http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../security/producer.crt", "../../security/producer.key", nil))
 
 }
 
index 3b916e4..558d6d2 160000 (submodule)
--- a/onap/oran
+++ b/onap/oran
@@ -1 +1 @@
-Subproject commit 3b916e4dc5777863cb4ee873b41ee460fb9aec27
+Subproject commit 558d6d2de33bb8cf4b16df980a0cdf3b1747a8e2
index 1ca37b3..ec542c1 100644 (file)
@@ -2,25 +2,52 @@
 
 This consumer creates a job of type `STD_Fault_Messages` in the Information Coordinator Service (ICS). When it recieves messages, it checks if they are link failure messages. If they are, it checks if the event severity is other than normal. If so, it looks up the O-DU ID mapped to the O-RU the message originates from and sends a configuration message to the O-DU through SDNC. If the event severity is normal, then it logs, on `Debug` level, that the link failure has been cleared.
 
-The producer takes a number of environment variables, described below, as configuration.
+## Configuration
+
+The consumer takes a number of environment variables, described below, as configuration.
 
 >- CONSUMER_HOST        **Required**. The host for the consumer.                                   Example: `http://mrproducer`
->- CONSUMER_HOST        **Required**. The port for the consumer.                                   Example: `8095`
->- LOG_LEVEL            Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`.  Defaults to `Info`.
+>- CONSUMER_PORT        **Required**. The port for the consumer.                                   Example: `8095`
+>- CONSUMER_CERT_PATH   **Required**. The path to the certificate to use for https.                Defaults to `security/producer.crt`
+>- CONSUMER_KEY_PATH    **Required**. The path to the key to the certificate to use for https.     Defaults to `security/producer.key`
 >- INFO_COORD_ADDR      Optional. The address of the Information Coordinator.                      Defaults to `http://enrichmentservice:8083`.
->- SDNR_HOST            Optional. The host for SDNR.                                               Defaults to `http://localhost`.
->- SDNR_PORT            Optional. The port for SDNR.                                               Defaults to `3904`.
+>- SDNR_ADDRESS         Optional. The address for SDNR.                                            Defaults to `http://localhost:3904`.
 >- SDNR_USER            Optional. The user for the SDNR.                                           Defaults to `admin`.
 >- SDNR_PASSWORD        Optional. The password for the SDNR user.                                  Defaults to `Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U`.
 >- ORU_TO_ODU_MAP_FILE  Optional. The file containing the mapping from O-RU ID to O-DU ID.         Defaults to `o-ru-to-o-du-map.csv`.
+>- LOG_LEVEL            Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`.  Defaults to `Info`.
+
+Any of the addresses used by this product can be configured to use https, by specifying it as the scheme of the address URI. The client will not use server certificate verification. The consumer's own callback will only listen to the scheme configured in the scheme of the consumer host address.
+
+The configured public key and cerificate shall be PEM-encoded. A self signed certificate and key are provided in the `security` folder of the project. These files should be replaced for production. To generate a self signed key and certificate, use the example code below:
+
+    openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
 
-The creation of the job is not done when the consumer is started. Instead the consumer provides a REST API where it can be started and stopped, described below.
+T## Functionality
+
+he creation of the job is not done when the consumer is started. Instead the consumer provides a REST API where it can be started and stopped, described below.
 
 >- /start  Creates the job in ICS.
 >- /stop   Deletes the job in ICS.
 
 If the consumer is shut down with a SIGTERM, it will also delete the job before exiting.
 
+## Development
+
+To make it easy to test during development of the consumer, two stubs are provided in the `stub` folder.
+
+One, under the `producer` folder, called `producer` that stubs the producer and pushes an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. To build and start the stub, do the following:
+>1. cd stub/producer
+>2. go build
+>3. ./producer
+
+One, under the `sdnr` folder, called `sdnr` that at startup will listen for REST calls and print the body of them. By default, it listens to the port `3904`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+>1. cd stub/sdnr
+>2. go build
+>3. ./sdnr
+
+Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in.
+
 ## License
 
 Copyright (C) 2021 Nordix Foundation.
index 754bba1..2eaa371 100644 (file)
@@ -9,10 +9,14 @@ require (
 
 require (
        github.com/davecgh/go-spew v1.1.1 // indirect
-       github.com/google/uuid v1.3.0 // indirect
-       github.com/gorilla/mux v1.8.0 // indirect
+       github.com/gorilla/mux v1.8.0
        github.com/pmezard/go-difflib v1.0.0 // indirect
        github.com/stretchr/objx v0.1.1 // indirect
        golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
        gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
 )
+
+require (
+       github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
+       github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
+)
index 6ce7604..970999b 100644 (file)
@@ -5,6 +5,11 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
 github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
+github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
+github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
+github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
index 43656b7..718f435 100644 (file)
@@ -21,6 +21,7 @@
 package config
 
 import (
+       "fmt"
        "os"
        "strconv"
 
@@ -28,31 +29,37 @@ import (
 )
 
 type Config struct {
-       LogLevel               log.Level
        ConsumerHost           string
        ConsumerPort           int
        InfoCoordinatorAddress string
-       SDNRHost               string
-       SDNRPort               int
+       SDNRAddress            string
        SDNRUser               string
        SDNPassword            string
        ORUToODUMapFile        string
+       ConsumerCertPath       string
+       ConsumerKeyPath        string
+       LogLevel               log.Level
 }
 
 func New() *Config {
        return &Config{
-               LogLevel:               getLogLevel(),
                ConsumerHost:           getEnv("CONSUMER_HOST", ""),
                ConsumerPort:           getEnvAsInt("CONSUMER_PORT", 0),
                InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
-               SDNRHost:               getEnv("SDNR_HOST", "http://localhost"),
-               SDNRPort:               getEnvAsInt("SDNR_PORT", 3904),
+               SDNRAddress:            getEnv("SDNR_ADDR", "http://localhost:3904"),
                SDNRUser:               getEnv("SDNR_USER", "admin"),
                SDNPassword:            getEnv("SDNR_PASSWORD", "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U"),
                ORUToODUMapFile:        getEnv("ORU_TO_ODU_MAP_FILE", "o-ru-to-o-du-map.csv"),
+               ConsumerCertPath:       getEnv("CONSUMER_CERT_PATH", "security/consumer.crt"),
+               ConsumerKeyPath:        getEnv("CONSUMER_KEY_PATH", "security/consumer.key"),
+               LogLevel:               getLogLevel(),
        }
 }
 
+func (c Config) String() string {
+       return fmt.Sprintf("ConsumerHost: %v, ConsumerPort: %v, InfoCoordinatorAddress: %v, SDNRAddress: %v, SDNRUser: %v, SDNRPassword: %v, ORUToODUMapFile: %v, ConsumerCertPath: %v, ConsumerKeyPath: %v, LogLevel: %v", c.ConsumerHost, c.ConsumerPort, c.InfoCoordinatorAddress, c.SDNRAddress, c.SDNRUser, c.SDNPassword, c.ORUToODUMapFile, c.ConsumerCertPath, c.ConsumerKeyPath, c.LogLevel)
+}
+
 func getEnv(key string, defaultVal string) string {
        if value, exists := os.LookupEnv(key); exists {
                return value
index a5b1624..3d9983a 100644 (file)
@@ -31,28 +31,30 @@ import (
 
 func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        assertions := require.New(t)
-       os.Setenv("LOG_LEVEL", "Debug")
        os.Setenv("CONSUMER_HOST", "consumerHost")
        os.Setenv("CONSUMER_PORT", "8095")
        os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
-       os.Setenv("SDNR_HOST", "sdnrHost")
-       os.Setenv("SDNR_PORT", "3908")
+       os.Setenv("SDNR_ADDR", "sdnrHost:3908")
        os.Setenv("SDNR_USER", "admin")
        os.Setenv("SDNR_PASSWORD", "pwd")
        os.Setenv("ORU_TO_ODU_MAP_FILE", "file")
+       os.Setenv("CONSUMER_CERT_PATH", "cert")
+       os.Setenv("CONSUMER_KEY_PATH", "key")
+       os.Setenv("LOG_LEVEL", "Debug")
        t.Cleanup(func() {
                os.Clearenv()
        })
        wantConfig := Config{
-               LogLevel:               log.DebugLevel,
                ConsumerHost:           "consumerHost",
                ConsumerPort:           8095,
                InfoCoordinatorAddress: "infoCoordAddr",
-               SDNRHost:               "sdnrHost",
-               SDNRPort:               3908,
+               SDNRAddress:            "sdnrHost:3908",
                SDNRUser:               "admin",
                SDNPassword:            "pwd",
                ORUToODUMapFile:        "file",
+               ConsumerCertPath:       "cert",
+               ConsumerKeyPath:        "key",
+               LogLevel:               log.DebugLevel,
        }
 
        got := New()
@@ -70,15 +72,16 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T
                os.Clearenv()
        })
        wantConfig := Config{
-               LogLevel:               log.InfoLevel,
                ConsumerHost:           "",
                ConsumerPort:           0,
                InfoCoordinatorAddress: "http://enrichmentservice:8083",
-               SDNRHost:               "http://localhost",
-               SDNRPort:               3904,
+               SDNRAddress:            "http://localhost:3904",
                SDNRUser:               "admin",
                SDNPassword:            "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
                ORUToODUMapFile:        "o-ru-to-o-du-map.csv",
+               ConsumerCertPath:       "security/consumer.crt",
+               ConsumerKeyPath:        "security/consumer.key",
+               LogLevel:               log.InfoLevel,
        }
 
        got := New()
@@ -99,15 +102,16 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
                os.Clearenv()
        })
        wantConfig := Config{
-               LogLevel:               log.InfoLevel,
                ConsumerHost:           "",
                ConsumerPort:           0,
                InfoCoordinatorAddress: "http://enrichmentservice:8083",
-               SDNRHost:               "http://localhost",
-               SDNRPort:               3904,
+               SDNRAddress:            "http://localhost:3904",
                SDNRUser:               "admin",
                SDNPassword:            "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
                ORUToODUMapFile:        "o-ru-to-o-du-map.csv",
+               ConsumerCertPath:       "security/consumer.crt",
+               ConsumerKeyPath:        "security/consumer.key",
+               LogLevel:               log.InfoLevel,
        }
        got := New()
        assertions.Equal(&wantConfig, got)
index 01f121a..3aecf45 100644 (file)
@@ -76,10 +76,10 @@ func (lfh LinkFailureHandler) sendUnlockMessage(oRuId string) {
                if error := restclient.Put(lfh.config.SDNRAddress+sdnrPath, unlockMessage, lfh.client, lfh.config.SDNRUser, lfh.config.SDNRPassword); error == nil {
                        log.Debugf("Sent unlock message for O-RU: %v to O-DU: %v.", oRuId, oDuId)
                } else {
-                       log.Warn(error)
+                       log.Warn("Send of unlock message failed due to ", error)
                }
        } else {
-               log.Warn(err)
+               log.Warn("Send of unlock message failed due to ", err)
        }
 
 }
index 036819a..fdd0549 100644 (file)
@@ -22,9 +22,15 @@ package restclient
 
 import (
        "bytes"
+       "crypto/tls"
        "fmt"
        "io"
+       "math"
        "net/http"
+       "net/url"
+       "time"
+
+       "github.com/hashicorp/go-retryablehttp"
 )
 
 type RequestError struct {
@@ -33,7 +39,7 @@ type RequestError struct {
 }
 
 func (e RequestError) Error() string {
-       return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", e.StatusCode, string(e.Body))
+       return fmt.Sprintf("error response with status: %v and body: %v", e.StatusCode, string(e.Body))
 }
 
 // HTTPClient interface
@@ -55,6 +61,40 @@ func Delete(url string, client HTTPClient) error {
        return do(http.MethodDelete, url, nil, client)
 }
 
+func CreateClientCertificate(certPath string, keyPath string) (tls.Certificate, error) {
+       if cert, err := tls.LoadX509KeyPair(certPath, keyPath); err == nil {
+               return cert, nil
+       } else {
+               return tls.Certificate{}, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", certPath, keyPath, err)
+       }
+}
+
+func CreateRetryClient(cert tls.Certificate) *http.Client {
+       rawRetryClient := retryablehttp.NewClient()
+       rawRetryClient.RetryWaitMax = time.Minute
+       rawRetryClient.RetryMax = math.MaxInt
+       rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
+
+       client := rawRetryClient.StandardClient()
+       return client
+}
+
+func IsUrlSecure(configUrl string) bool {
+       u, _ := url.Parse(configUrl)
+       return u.Scheme == "https"
+}
+
+func getSecureTransportWithoutVerify(cert tls.Certificate) *http.Transport {
+       return &http.Transport{
+               TLSClientConfig: &tls.Config{
+                       Certificates: []tls.Certificate{
+                               cert,
+                       },
+                       InsecureSkipVerify: true,
+               },
+       }
+}
+
 func do(method string, url string, body []byte, client HTTPClient, userInfo ...string) error {
        if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
                if body != nil {
index 8271fd0..2c915fd 100644 (file)
@@ -21,11 +21,16 @@ package restclient
 
 import (
        "bytes"
+       "crypto/tls"
        "fmt"
        "io/ioutil"
+       "math"
        "net/http"
+       "reflect"
        "testing"
+       "time"
 
+       "github.com/hashicorp/go-retryablehttp"
        "github.com/stretchr/testify/mock"
        "github.com/stretchr/testify/require"
        "oransc.org/usecase/oruclosedloop/mocks"
@@ -38,7 +43,7 @@ func TestRequestError_Error(t *testing.T) {
                StatusCode: http.StatusBadRequest,
                Body:       []byte("error"),
        }
-       assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error())
+       assertions.Equal("error response with status: 400 and body: error", actualError.Error())
 }
 
 func TestPutWithoutAuth(t *testing.T) {
@@ -167,3 +172,63 @@ func Test_doErrorCases(t *testing.T) {
                })
        }
 }
+
+func Test_createClientCertificate(t *testing.T) {
+       assertions := require.New(t)
+       wantedCert, _ := tls.LoadX509KeyPair("../../security/consumer.crt", "../../security/consumer.key")
+       type args struct {
+               certPath string
+               keyPath  string
+       }
+       tests := []struct {
+               name     string
+               args     args
+               wantCert tls.Certificate
+               wantErr  error
+       }{
+               {
+                       name: "Paths to cert info ok should return cerftificate",
+                       args: args{
+                               certPath: "../../security/consumer.crt",
+                               keyPath:  "../../security/consumer.key",
+                       },
+                       wantCert: wantedCert,
+               },
+               {
+                       name: "Paths to cert info not ok should return error with info about error",
+                       args: args{
+                               certPath: "wrong_cert",
+                               keyPath:  "wrong_key",
+                       },
+                       wantErr: fmt.Errorf("cannot create x509 keypair from cert file wrong_cert and key file wrong_key due to: open wrong_cert: no such file or directory"),
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       cert, err := CreateClientCertificate(tt.args.certPath, tt.args.keyPath)
+                       assertions.Equal(tt.wantCert, cert, tt.name)
+                       assertions.Equal(tt.wantErr, err, tt.name)
+               })
+       }
+}
+
+func Test_CreateRetryClient(t *testing.T) {
+       assertions := require.New(t)
+
+       client := CreateRetryClient(tls.Certificate{})
+
+       transport := client.Transport
+       assertions.Equal("*retryablehttp.RoundTripper", reflect.TypeOf(transport).String())
+       retryableTransport := transport.(*retryablehttp.RoundTripper)
+       retryableClient := retryableTransport.Client
+       assertions.Equal(time.Minute, retryableClient.RetryWaitMax)
+       assertions.Equal(math.MaxInt, retryableClient.RetryMax)
+}
+
+func TestIsUrlSecured(t *testing.T) {
+       assertions := require.New(t)
+
+       assertions.True(IsUrlSecure("https://url"))
+
+       assertions.False(IsUrlSecure("http://url"))
+}
index bef9e24..b7d6895 100644 (file)
 package main
 
 import (
+       "crypto/tls"
        "encoding/json"
        "fmt"
        "net/http"
        "os"
        "os/signal"
        "syscall"
-       "time"
 
        "github.com/gorilla/mux"
        log "github.com/sirupsen/logrus"
@@ -41,7 +41,6 @@ type Server interface {
        ListenAndServe() error
 }
 
-const timeoutHTTPClient = time.Second * 5
 const jobId = "14e7bb84-a44d-44c1-90b7-6995a92ad43c"
 
 var jobRegistrationInfo = struct {
@@ -70,16 +69,13 @@ func doInit() {
        configuration = config.New()
 
        log.SetLevel(configuration.LogLevel)
-
-       client = &http.Client{
-               Timeout: timeoutHTTPClient,
-       }
+       log.Debug("Using configuration: ", configuration)
 
        consumerPort = fmt.Sprint(configuration.ConsumerPort)
        jobRegistrationInfo.JobResultUri = configuration.ConsumerHost + ":" + consumerPort
 
        linkfailureConfig = linkfailure.Configuration{
-               SDNRAddress:  configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort),
+               SDNRAddress:  configuration.SDNRAddress,
                SDNRUser:     configuration.SDNRUser,
                SDNRPassword: configuration.SDNPassword,
        }
@@ -95,12 +91,16 @@ func main() {
                log.Fatalf("Unable to create LookupService due to inability to get O-RU-ID to O-DU-ID map. Cause: %v", initErr)
        }
 
+       var cert tls.Certificate
+       if c, err := restclient.CreateClientCertificate(configuration.ConsumerCertPath, configuration.ConsumerKeyPath); err == nil {
+               cert = c
+       } else {
+               log.Fatalf("Stopping producer due to error: %v", err)
+       }
+       client = restclient.CreateRetryClient(cert)
+
        go func() {
-               startServer(&http.Server{
-                       Addr:    ":" + consumerPort,
-                       Handler: getRouter(),
-               })
-               deleteJob()
+               startServer()
                os.Exit(1) // If the startServer function exits, it is because there has been a failure in the server, so we exit.
        }()
 
@@ -116,6 +116,11 @@ func validateConfiguration(configuration *config.Config) error {
        if configuration.ConsumerHost == "" || configuration.ConsumerPort == 0 {
                return fmt.Errorf("consumer host and port must be provided")
        }
+
+       if configuration.ConsumerCertPath == "" || configuration.ConsumerKeyPath == "" {
+               return fmt.Errorf("missing CONSUMER_CERT and/or CONSUMER_KEY")
+       }
+
        return nil
 }
 
@@ -135,8 +140,14 @@ func getRouter() *mux.Router {
        return r
 }
 
-func startServer(server Server) {
-       if err := server.ListenAndServe(); err != nil {
+func startServer() {
+       var err error
+       if restclient.IsUrlSecure(configuration.ConsumerHost) {
+               err = http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.ConsumerPort), configuration.ConsumerCertPath, configuration.ConsumerKeyPath, getRouter())
+       } else {
+               err = http.ListenAndServe(fmt.Sprintf(":%v", configuration.ConsumerPort), getRouter())
+       }
+       if err != nil {
                log.Errorf("Server stopped unintentionally due to: %v. Deleteing job.", err)
                if deleteErr := deleteJob(); deleteErr != nil {
                        log.Error(fmt.Sprintf("Unable to delete consumer job due to: %v. Please remove job %v manually.", deleteErr, jobId))
index 99419bf..3fcb23e 100644 (file)
@@ -54,15 +54,16 @@ func Test_init(t *testing.T) {
        doInit()
 
        wantedConfiguration := &config.Config{
-               LogLevel:               log.InfoLevel,
                ConsumerHost:           "consumerHost",
                ConsumerPort:           8095,
                InfoCoordinatorAddress: "http://enrichmentservice:8083",
-               SDNRHost:               "http://localhost",
-               SDNRPort:               3904,
+               SDNRAddress:            "http://localhost:3904",
                SDNRUser:               "admin",
                SDNPassword:            "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
                ORUToODUMapFile:        "o-ru-to-o-du-map.csv",
+               ConsumerCertPath:       "security/consumer.crt",
+               ConsumerKeyPath:        "security/consumer.key",
+               LogLevel:               log.InfoLevel,
        }
        assertions.Equal(wantedConfiguration, configuration)
 
@@ -70,7 +71,7 @@ func Test_init(t *testing.T) {
        assertions.Equal(wantedConfiguration.ConsumerHost+":"+fmt.Sprint(wantedConfiguration.ConsumerPort), jobRegistrationInfo.JobResultUri)
 
        wantedLinkFailureConfig := linkfailure.Configuration{
-               SDNRAddress:  wantedConfiguration.SDNRHost + ":" + fmt.Sprint(wantedConfiguration.SDNRPort),
+               SDNRAddress:  wantedConfiguration.SDNRAddress,
                SDNRUser:     wantedConfiguration.SDNRUser,
                SDNRPassword: wantedConfiguration.SDNPassword,
        }
@@ -92,8 +93,10 @@ func Test_validateConfiguration(t *testing.T) {
                        name: "Valid config, should return nil",
                        args: args{
                                configuration: &config.Config{
-                                       ConsumerHost: "host",
-                                       ConsumerPort: 80,
+                                       ConsumerHost:     "host",
+                                       ConsumerPort:     80,
+                                       ConsumerCertPath: "security/consumer.crt",
+                                       ConsumerKeyPath:  "security/consumer.key",
                                },
                        },
                },
@@ -188,28 +191,6 @@ func Test_getRouter_shouldContainAllPathsWithHandlers(t *testing.T) {
        assertions.Equal("/admin/stop", path)
 }
 
-func Test_startServer_shouldDeleteJobWhenServerStopsWithErrorAndLog(t *testing.T) {
-       assertions := require.New(t)
-
-       var buf bytes.Buffer
-       log.SetOutput(&buf)
-
-       os.Setenv("CONSUMER_PORT", "wrong")
-       t.Cleanup(func() {
-               log.SetOutput(os.Stderr)
-       })
-
-       mockServer := &mocks.Server{}
-       mockServer.On("ListenAndServe").Return(errors.New("Server failure"))
-
-       startServer(mockServer)
-
-       log := buf.String()
-       assertions.Contains(log, "level=error")
-       assertions.Contains(log, "Server stopped unintentionally due to: Server failure. Deleteing job.")
-       assertions.Contains(log, "Please remove job 14e7bb84-a44d-44c1-90b7-6995a92ad43c manually")
-}
-
 func Test_startHandler(t *testing.T) {
        assertions := require.New(t)
 
diff --git a/test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go b/test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go
deleted file mode 100644 (file)
index ad16503..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-// Code generated by mockery v1.0.0. DO NOT EDIT.
-
-package mocks
-
-import mock "github.com/stretchr/testify/mock"
-
-// Server is an autogenerated mock type for the Server type
-type Server struct {
-       mock.Mock
-}
-
-// ListenAndServe provides a mock function with given fields:
-func (_m *Server) ListenAndServe() error {
-       ret := _m.Called()
-
-       var r0 error
-       if rf, ok := ret.Get(0).(func() error); ok {
-               r0 = rf()
-       } else {
-               r0 = ret.Error(0)
-       }
-
-       return r0
-}
diff --git a/test/usecases/oruclosedlooprecovery/goversion/security/consumer.crt b/test/usecases/oruclosedlooprecovery/goversion/security/consumer.crt
new file mode 100644 (file)
index 0000000..0f6d8a3
--- /dev/null
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDXzCCAkegAwIBAgIUEbuDTP0ixwxCxCQ9tR5DijGCbtkwDQYJKoZIhvcNAQEL
+BQAwPzELMAkGA1UEBhMCc2UxDDAKBgNVBAoMA0VTVDERMA8GA1UECwwIRXJpY3Nz
+b24xDzANBgNVBAMMBnNlcnZlcjAeFw0yMTEwMTkxNDA1MzVaFw0zMTEwMTcxNDA1
+MzVaMD8xCzAJBgNVBAYTAnNlMQwwCgYDVQQKDANFU1QxETAPBgNVBAsMCEVyaWNz
+c29uMQ8wDQYDVQQDDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
+AoIBAQDnH4imV8kx/mXz6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEE
+Y6VWU47bSYRn2xJOP+wmfKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe
+72iMbTc5q2uYi0ImB5/m3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNrua
+I4iOnMvvuc71gvHol7appRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVo
+uUZYYJseFhOlIANaXn6qmz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPD
+x9PjmGmf6eOEC2ZHBi4OHwjIzmLnAgMBAAGjUzBRMB0GA1UdDgQWBBRjeDLPpLm2
+W623wna7xBCbHxtxVjAfBgNVHSMEGDAWgBRjeDLPpLm2W623wna7xBCbHxtxVjAP
+BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAbFUAWFZaIMXmd5qv/
+xJYr1oPJpsmbgWGRWZWDZqbUabvWObyXlDJWIau60BerfcC5TmyElBjTyONSGwCT
+tq+SVB0PXpgqa8ZQ25Ytn2AMDFWhrGbOefCXs6te3HGq6BNubTWrOVIvJypCwC95
++iXVuDd4eg+n2fWv7h7fZRZHum/zLoRxB2lKoMMbc/BQX9hbtP6xyvIVvaYdhcJw
+VzJJGIDqpMiMH6IBaOFSmgfOyGblGKAicj3E3kpGBfadLx3R+9V6aG7zyBnVbr2w
+YJbV2Ay4PrF+PTpCMB/mNwC5RBTYHpSNdrCMSyq3I+QPVJq8dPJr7fd1Uwl3WHqX
+FV0h
+-----END CERTIFICATE-----
diff --git a/test/usecases/oruclosedlooprecovery/goversion/security/consumer.key b/test/usecases/oruclosedlooprecovery/goversion/security/consumer.key
new file mode 100644 (file)
index 0000000..5346bb7
--- /dev/null
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDnH4imV8kx/mXz
+6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEEY6VWU47bSYRn2xJOP+wm
+fKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe72iMbTc5q2uYi0ImB5/m
+3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNruaI4iOnMvvuc71gvHol7ap
+pRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVouUZYYJseFhOlIANaXn6q
+mz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPDx9PjmGmf6eOEC2ZHBi4O
+HwjIzmLnAgMBAAECggEBAMq1lZyPkh8PCUyLVX3VhC4jRybyAWBI+piKx+4EI6l/
+laP5dZcegCoo+w/mdbTpRHqAWGjec4e9+Nkoq8rLG6B2SCfaRJUYiEQSEvSBHAid
+BZqKK4B82GXQavNU91Vy1OT3vD7mpPXF6jEK6gAA0C4Wt7Lzo7ZfqEavRBDMsNnV
+jOxLwWJCFSKhfeA6grJCnagmEDKSxxazlNBgCahjPf/+IRJZ7Vk4Zjq+I/5nWKf8
+lYaQExKDIANuM/jMRnYVq5k4g2MKHUADWGTSvG1DMJiMHzdxb2miZovpIkEE86bC
+wKBuele9IR6Rb/wygYj7WdaWysZ081OT7mNyju08e4ECgYEA8+q7vv4Nlz8bAcmY
+Ip5517s15M5D9iLsE2Q5m9Zs99rUyQv0E8ekpChhtTSdvj+eNl39O4hji46Gyceu
+MYPfNL7+YWaFDxuyaXEe/OFuKbFqgE1p08HXFcQJCvgqD1MWO5b9BRDc0qpNFIA8
+eN9xFBMQ2UFaALBMAup7Ef85q4kCgYEA8pKOAIsgmlnO8P9cPzkMC1oozslraAti
+1JnOJjwPLoHFubtH2u7WoIkSvNfeNwfrsVXwAP0m7C8p7qhYppS+0XGjKpYNSezK
+1GCqCVv8R1m+AsSseSUUaQCmEydd+gQbBq3r4u3wU3ylrgAoR3m+7SVyhvD+vbwI
+7+zfj+O3zu8CgYEAqaAsQH5c5Tm1hmCztB+RjD1dFWl8ScevdSzWA1HzJcrA/6+Y
+ZckI7kBG8sVMjemgFR735FbNI1hS1DBRK44Rw5SvQv0Qu5j/UeShMCt1ePkwn1k2
+p1S+Rxy1TTOXzGBzra0q+ELpzncwc3lalJSPBu7bYLrZ5HC167E1NSbQ7EECgYBo
+e/IIj+TyNz7pFcVhQixK84HiWGYYQddHJhzi4TnU2XcWonG3/uqZ6ZEVoJIJ+DJw
+h0jC1EggscwJDaBp2GY9Bwq2PD3rGsDfK+fx8ho/jYtH2/lCkVMyS2I9m9Zh68TM
+YrvZWo4LGASxZ0XyS6GOunOTZlkD1uuulMRTUU4KJwKBgQCwyjs0/ElVFvO0lPIC
+JJ//B5rqI7hNMJuTBvr4yiqVZdbgFukaU7FBVyNYDMpZi/nRbpglm+psFcwXtL8n
+bHOIGLkh8vB7OuETRYhXs567lPYtO4BmHZlXW70Sq/0xqi/Mmz1RuEg4SQ1Ug5oy
+wG6IV5EWSQAhsGirdybQ+bY7Kw==
+-----END PRIVATE KEY-----
diff --git a/test/usecases/oruclosedlooprecovery/goversion/stub/sdnr/sdnrstub.go b/test/usecases/oruclosedlooprecovery/goversion/stub/sdnr/sdnrstub.go
new file mode 100644 (file)
index 0000000..b59dbd9
--- /dev/null
@@ -0,0 +1,49 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+       "flag"
+       "fmt"
+       "io"
+       "net/http"
+
+       "github.com/gorilla/mux"
+)
+
+func main() {
+       port := flag.Int("port", 3904, "The port this SDNR stub will listen on")
+       flag.Parse()
+
+       r := mux.NewRouter()
+       r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={O-DU-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection={O-RU-ID}", handleData)
+
+       fmt.Println("Starting SDNR on port: ", *port)
+       http.ListenAndServe(fmt.Sprintf(":%v", *port), r)
+
+}
+
+func handleData(w http.ResponseWriter, req *http.Request) {
+       defer req.Body.Close()
+       if reqData, err := io.ReadAll(req.Body); err == nil {
+               fmt.Println("SDNR received body: ", string(reqData))
+       }
+}