Implement secure communications 65/6965/5
authorelinuxhenrik <henrik.b.andersson@est.tech>
Thu, 28 Oct 2021 14:27:57 +0000 (16:27 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Fri, 29 Oct 2021 14:38:47 +0000 (16:38 +0200)
The communication towards the consumer is not secured in this commit.

Also changed the configuration so that the address of the DMaaP Message
Router is given in one variable, named DMAAP_MR_ADDR.

Issue-ID: NONRTRIC-601
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Icb5b3c255367e823fcae2168ab37603092815893

15 files changed:
dmaap-mediator-producer/.gitignore
dmaap-mediator-producer/README.md
dmaap-mediator-producer/configs/producer.crt [new file with mode: 0644]
dmaap-mediator-producer/configs/producer.key [new file with mode: 0644]
dmaap-mediator-producer/configs/type_config.json
dmaap-mediator-producer/internal/config/config.go
dmaap-mediator-producer/internal/config/config_test.go
dmaap-mediator-producer/internal/config/registrator_test.go
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/internal/restclient/HTTPClient.go
dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
dmaap-mediator-producer/main.go
dmaap-mediator-producer/simulator/consumer/consumersimulator.go [moved from dmaap-mediator-producer/simulator/consumersimulator.go with 97% similarity]
dmaap-mediator-producer/simulator/dmaap/mrsimulator.go [new file with mode: 0644]

index 9f5396c..5b1f8f9 100644 (file)
@@ -4,4 +4,7 @@ coverage.*
 main
 dmaapmediatorproducer
 __debug_bin*
-simulator
+consumer
+!consumer/
+dmaap
+!dmaap/
index e4a2ffd..c22c561 100644 (file)
@@ -7,9 +7,10 @@ The producer takes a number of environment variables, described below, as config
 >- INFO_PRODUCER_HOST  **Required**. The host for the producer.                                   Example: `http://mrproducer`
 >- LOG_LEVEL           Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`.  Defaults to `Info`.
 >- INFO_PRODUCER_PORT  Optional. The port for the product.                                        Defaults to `8085`.
->- INFO_COORD_ADDR     Optional. The address of the Information Coordinator.                      Defaults to `http://enrichmentservice:8083`.
->- MR_HOST             Optional. The host for the DMaaP Message Router.                           Defaults to `http://message-router.onap`.
->- MR_PORT             Optional. The port for the DMaaP Message Router.                           Defaults to `3904`.
+>- INFO_COORD_ADDR     Optional. The address of the Information Coordinator.                      Defaults to `https://enrichmentservice:8434`.
+>- DMAAP_MR_ADDR       Optional. The address of the DMaaP Message Router.                         Defaults to `https://message-router.onap:3905`.
+>- PRODUCER_CERT_PATH  Optional. The path to the certificate to use for https.                    Defaults to `configs/producer.crt`
+>- PRODUCER_KEY_PATH   Optional. The path to the key to the certificate to use for https.         Defaults to `configs/producer.key`
 
 The file `configs/type_config.json` contains the configuration of job types that the producer will support.
 
@@ -23,6 +24,8 @@ The file `configs/type_config.json` contains the configuration of job types that
       ]
     }
 
+The server part of the producer uses https, and the communication towards ICS and MR use https, with no server certificate verification.
+
 At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types.
 
 Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
diff --git a/dmaap-mediator-producer/configs/producer.crt b/dmaap-mediator-producer/configs/producer.crt
new file mode 100644 (file)
index 0000000..0f6d8a3
--- /dev/null
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDXzCCAkegAwIBAgIUEbuDTP0ixwxCxCQ9tR5DijGCbtkwDQYJKoZIhvcNAQEL
+BQAwPzELMAkGA1UEBhMCc2UxDDAKBgNVBAoMA0VTVDERMA8GA1UECwwIRXJpY3Nz
+b24xDzANBgNVBAMMBnNlcnZlcjAeFw0yMTEwMTkxNDA1MzVaFw0zMTEwMTcxNDA1
+MzVaMD8xCzAJBgNVBAYTAnNlMQwwCgYDVQQKDANFU1QxETAPBgNVBAsMCEVyaWNz
+c29uMQ8wDQYDVQQDDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
+AoIBAQDnH4imV8kx/mXz6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEE
+Y6VWU47bSYRn2xJOP+wmfKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe
+72iMbTc5q2uYi0ImB5/m3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNrua
+I4iOnMvvuc71gvHol7appRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVo
+uUZYYJseFhOlIANaXn6qmz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPD
+x9PjmGmf6eOEC2ZHBi4OHwjIzmLnAgMBAAGjUzBRMB0GA1UdDgQWBBRjeDLPpLm2
+W623wna7xBCbHxtxVjAfBgNVHSMEGDAWgBRjeDLPpLm2W623wna7xBCbHxtxVjAP
+BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAbFUAWFZaIMXmd5qv/
+xJYr1oPJpsmbgWGRWZWDZqbUabvWObyXlDJWIau60BerfcC5TmyElBjTyONSGwCT
+tq+SVB0PXpgqa8ZQ25Ytn2AMDFWhrGbOefCXs6te3HGq6BNubTWrOVIvJypCwC95
++iXVuDd4eg+n2fWv7h7fZRZHum/zLoRxB2lKoMMbc/BQX9hbtP6xyvIVvaYdhcJw
+VzJJGIDqpMiMH6IBaOFSmgfOyGblGKAicj3E3kpGBfadLx3R+9V6aG7zyBnVbr2w
+YJbV2Ay4PrF+PTpCMB/mNwC5RBTYHpSNdrCMSyq3I+QPVJq8dPJr7fd1Uwl3WHqX
+FV0h
+-----END CERTIFICATE-----
diff --git a/dmaap-mediator-producer/configs/producer.key b/dmaap-mediator-producer/configs/producer.key
new file mode 100644 (file)
index 0000000..5346bb7
--- /dev/null
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDnH4imV8kx/mXz
+6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEEY6VWU47bSYRn2xJOP+wm
+fKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe72iMbTc5q2uYi0ImB5/m
+3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNruaI4iOnMvvuc71gvHol7ap
+pRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVouUZYYJseFhOlIANaXn6q
+mz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPDx9PjmGmf6eOEC2ZHBi4O
+HwjIzmLnAgMBAAECggEBAMq1lZyPkh8PCUyLVX3VhC4jRybyAWBI+piKx+4EI6l/
+laP5dZcegCoo+w/mdbTpRHqAWGjec4e9+Nkoq8rLG6B2SCfaRJUYiEQSEvSBHAid
+BZqKK4B82GXQavNU91Vy1OT3vD7mpPXF6jEK6gAA0C4Wt7Lzo7ZfqEavRBDMsNnV
+jOxLwWJCFSKhfeA6grJCnagmEDKSxxazlNBgCahjPf/+IRJZ7Vk4Zjq+I/5nWKf8
+lYaQExKDIANuM/jMRnYVq5k4g2MKHUADWGTSvG1DMJiMHzdxb2miZovpIkEE86bC
+wKBuele9IR6Rb/wygYj7WdaWysZ081OT7mNyju08e4ECgYEA8+q7vv4Nlz8bAcmY
+Ip5517s15M5D9iLsE2Q5m9Zs99rUyQv0E8ekpChhtTSdvj+eNl39O4hji46Gyceu
+MYPfNL7+YWaFDxuyaXEe/OFuKbFqgE1p08HXFcQJCvgqD1MWO5b9BRDc0qpNFIA8
+eN9xFBMQ2UFaALBMAup7Ef85q4kCgYEA8pKOAIsgmlnO8P9cPzkMC1oozslraAti
+1JnOJjwPLoHFubtH2u7WoIkSvNfeNwfrsVXwAP0m7C8p7qhYppS+0XGjKpYNSezK
+1GCqCVv8R1m+AsSseSUUaQCmEydd+gQbBq3r4u3wU3ylrgAoR3m+7SVyhvD+vbwI
+7+zfj+O3zu8CgYEAqaAsQH5c5Tm1hmCztB+RjD1dFWl8ScevdSzWA1HzJcrA/6+Y
+ZckI7kBG8sVMjemgFR735FbNI1hS1DBRK44Rw5SvQv0Qu5j/UeShMCt1ePkwn1k2
+p1S+Rxy1TTOXzGBzra0q+ELpzncwc3lalJSPBu7bYLrZ5HC167E1NSbQ7EECgYBo
+e/IIj+TyNz7pFcVhQixK84HiWGYYQddHJhzi4TnU2XcWonG3/uqZ6ZEVoJIJ+DJw
+h0jC1EggscwJDaBp2GY9Bwq2PD3rGsDfK+fx8ho/jYtH2/lCkVMyS2I9m9Zh68TM
+YrvZWo4LGASxZ0XyS6GOunOTZlkD1uuulMRTUU4KJwKBgQCwyjs0/ElVFvO0lPIC
+JJ//B5rqI7hNMJuTBvr4yiqVZdbgFukaU7FBVyNYDMpZi/nRbpglm+psFcwXtL8n
+bHOIGLkh8vB7OuETRYhXs567lPYtO4BmHZlXW70Sq/0xqi/Mmz1RuEg4SQ1Ug5oy
+wG6IV5EWSQAhsGirdybQ+bY7Kw==
+-----END PRIVATE KEY-----
index 983d0f3..f75d0e4 100644 (file)
@@ -3,7 +3,7 @@
     [
       {
         "id": "STD_Fault_Messages",
-        "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+        "dmaapTopicUrl": "/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
       }
   ]
 }
\ No newline at end of file
index dfd2505..b31b334 100644 (file)
@@ -32,8 +32,9 @@ type Config struct {
        InfoProducerHost       string
        InfoProducerPort       int
        InfoCoordinatorAddress string
-       MRHost                 string
-       MRPort                 int
+       DMaaPMRAddress         string
+       ProducerCertPath       string
+       ProducerKeyPath        string
 }
 
 func New() *Config {
@@ -41,9 +42,10 @@ func New() *Config {
                LogLevel:               getLogLevel(),
                InfoProducerHost:       getEnv("INFO_PRODUCER_HOST", ""),
                InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
-               InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
-               MRHost:                 getEnv("MR_HOST", "http://message-router.onap"),
-               MRPort:                 getEnvAsInt("MR_PORT", 3904),
+               InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://enrichmentservice:8434"),
+               DMaaPMRAddress:         getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
+               ProducerCertPath:       getEnv("PRODUCER_CERT_PATH", "configs/producer.crt"),
+               ProducerKeyPath:        getEnv("PRODUCER_KEY_PATH", "configs/producer.key"),
        }
 }
 
index fc64e57..9420a2a 100644 (file)
@@ -36,8 +36,9 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        os.Setenv("INFO_PRODUCER_HOST", "producerHost")
        os.Setenv("INFO_PRODUCER_PORT", "8095")
        os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
-       os.Setenv("MR_HOST", "mrHost")
-       os.Setenv("MR_PORT", "3908")
+       os.Setenv("DMAAP_MR_ADDR", "mrHost:3908")
+       os.Setenv("PRODUCER_CERT_PATH", "cert")
+       os.Setenv("PRODUCER_KEY_PATH", "key")
        t.Cleanup(func() {
                os.Clearenv()
        })
@@ -46,8 +47,9 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
                InfoProducerHost:       "producerHost",
                InfoProducerPort:       8095,
                InfoCoordinatorAddress: "infoCoordAddr",
-               MRHost:                 "mrHost",
-               MRPort:                 3908,
+               DMaaPMRAddress:         "mrHost:3908",
+               ProducerCertPath:       "cert",
+               ProducerKeyPath:        "key",
        }
        got := New()
 
@@ -68,9 +70,10 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T
                LogLevel:               log.InfoLevel,
                InfoProducerHost:       "",
                InfoProducerPort:       8085,
-               InfoCoordinatorAddress: "http://enrichmentservice:8083",
-               MRHost:                 "http://message-router.onap",
-               MRPort:                 3904,
+               InfoCoordinatorAddress: "https://enrichmentservice:8434",
+               DMaaPMRAddress:         "https://message-router.onap:3905",
+               ProducerCertPath:       "configs/producer.crt",
+               ProducerKeyPath:        "configs/producer.key",
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -94,9 +97,10 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
                LogLevel:               log.InfoLevel,
                InfoProducerHost:       "",
                InfoProducerPort:       8085,
-               InfoCoordinatorAddress: "http://enrichmentservice:8083",
-               MRHost:                 "http://message-router.onap",
-               MRPort:                 3904,
+               InfoCoordinatorAddress: "https://enrichmentservice:8434",
+               DMaaPMRAddress:         "https://message-router.onap:3905",
+               ProducerCertPath:       "configs/producer.crt",
+               ProducerKeyPath:        "configs/producer.key",
        }
 
        got := New()
index 2cffa2c..324aed0 100644 (file)
@@ -57,7 +57,7 @@ func TestRegisterTypes(t *testing.T) {
        assertions.Equal("http", actualRequest.URL.Scheme)
        assertions.Equal("localhost:9990", actualRequest.URL.Host)
        assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path)
-       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
        body, _ := ioutil.ReadAll(actualRequest.Body)
        expectedBody := []byte(`{"info_job_data_schema": {"type": "object","properties": {},"additionalProperties": false}}`)
        assertions.Equal(expectedBody, body)
@@ -92,7 +92,7 @@ func TestRegisterProducer(t *testing.T) {
        assertions.Equal("http", actualRequest.URL.Scheme)
        assertions.Equal("localhost:9990", actualRequest.URL.Host)
        assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path)
-       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
        body, _ := ioutil.ReadAll(actualRequest.Body)
        expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`)
        assertions.Equal(expectedBody, body)
index 7b21b00..854372a 100644 (file)
@@ -154,11 +154,12 @@ func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) {
        defer jh.mu.Unlock()
        for typeId, typeInfo := range jh.allTypes {
                log.Debugf("Processing jobs for type: %v", typeId)
-               messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient)
+               messagesBody, error := restclient.Get(mRAddress+typeInfo.DMaaPTopicURL, jh.pollClient)
                if error != nil {
                        log.Warnf("Error getting data from MR. Cause: %v", error)
                        continue
                }
+               log.Debugf("Received messages: %v", string(messagesBody))
                jh.distributeMessages(messagesBody, typeInfo)
        }
 }
@@ -176,6 +177,7 @@ func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInf
        if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
                log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
        }
+       log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
 }
 
 func (jh *JobHandlerImpl) clearAll() {
index 8533719..555285c 100644 (file)
@@ -185,7 +185,7 @@ func TestPollAndDistributeMessages(t *testing.T) {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
                        assertions.Equal(messages, getBodyAsString(req))
-                       assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
+                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
                        wg.Done() // Signal that the distribution call has been made
                        return &http.Response{
                                StatusCode: 200,
@@ -207,7 +207,7 @@ func TestPollAndDistributeMessages(t *testing.T) {
        }
        handlerUnderTest.allTypes["type1"] = TypeData{
                TypeId:        "type1",
-               DMaaPTopicURL: "topicUrl",
+               DMaaPTopicURL: "/topicUrl",
                Jobs:          map[string]JobInfo{"job1": jobInfo},
        }
        t.Cleanup(func() {
index 2b3a0cf..7c762d9 100644 (file)
@@ -70,7 +70,7 @@ func Post(url string, body []byte, client HTTPClient) error {
 
 func do(method string, url string, body []byte, client HTTPClient) error {
        if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
-               req.Header.Set("Content-Type", "application/json; charset=utf-8")
+               req.Header.Set("Content-Type", "application/json")
                if response, respErr := client.Do(req); respErr == nil {
                        if isResponseSuccess(response.StatusCode) {
                                return nil
index 7fe3cc7..3ded9be 100644 (file)
@@ -122,7 +122,7 @@ func TestPutOk(t *testing.T) {
        assertions.Equal(http.MethodPut, actualRequest.Method)
        assertions.Equal("http", actualRequest.URL.Scheme)
        assertions.Equal("localhost:9990", actualRequest.URL.Host)
-       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
        body, _ := ioutil.ReadAll(actualRequest.Body)
        expectedBody := []byte("body")
        assertions.Equal(expectedBody, body)
@@ -148,7 +148,7 @@ func TestPostOk(t *testing.T) {
        assertions.Equal(http.MethodPost, actualRequest.Method)
        assertions.Equal("http", actualRequest.URL.Scheme)
        assertions.Equal("localhost:9990", actualRequest.URL.Host)
-       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
        body, _ := ioutil.ReadAll(actualRequest.Body)
        expectedBody := []byte("body")
        assertions.Equal(expectedBody, body)
index 1a91af4..cdfde57 100644 (file)
@@ -21,9 +21,9 @@
 package main
 
 import (
+       "crypto/tls"
        "fmt"
        "net/http"
-       "sync"
        "time"
 
        "github.com/hashicorp/go-retryablehttp"
@@ -34,13 +34,7 @@ import (
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
 
-const timeoutDistributionClient = time.Second * 5
-const retryWaitMax = time.Minute
-const retryMax = int(^uint(0) >> 1)
-
 var configuration *config.Config
-var retryClient restclient.HTTPClient
-var jobHandler *jobs.JobHandlerImpl
 
 func init() {
        configuration = config.New()
@@ -54,52 +48,68 @@ func main() {
        }
        callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
 
-       distributionClient := &http.Client{
-               Timeout: timeoutDistributionClient,
+       var retryClient restclient.HTTPClient
+       if cert, err := createClientCertificate(); err == nil {
+               retryClient = createRetryClient(cert)
+       } else {
+               log.Fatalf("Stopping producer due to error: %v", err)
        }
 
-       rawRetryClient := retryablehttp.NewClient()
-       rawRetryClient.RetryWaitMax = retryWaitMax
-       rawRetryClient.RetryMax = retryMax
-       retryClient = rawRetryClient.StandardClient()
-
-       jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, distributionClient)
-       if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
+       jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{
+               Timeout: time.Second * 5,
+       })
+       if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
                log.Fatalf("Stopping producer due to: %v", err)
        }
 
        log.Debug("Starting DMaaP Mediator Producer")
-       wg := new(sync.WaitGroup)
-
-       // add two goroutines to `wg` WaitGroup, one for each running go routine
-       wg.Add(2)
-
-       log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
        go func() {
+               log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
                r := server.NewRouter(jobHandler)
-               log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
-               wg.Done()
+               log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
        }()
 
-       go func() {
-               jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
-               wg.Done()
-       }()
+       go jobHandler.RunJobs(configuration.DMaaPMRAddress)
 
-       // wait until WaitGroup is done
-       wg.Wait()
-       log.Debug("Stopping DMaaP Mediator Producer")
+       keepProducerAlive()
 }
 
 func validateConfiguration(configuration *config.Config) error {
        if configuration.InfoProducerHost == "" {
                return fmt.Errorf("missing INFO_PRODUCER_HOST")
        }
+       if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
+               return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
+       }
        return nil
 }
 
-func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
-       registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient)
+func createClientCertificate() (*tls.Certificate, error) {
+       if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
+               return &cert, nil
+       } else {
+               return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s", configuration.ProducerCertPath, configuration.ProducerKeyPath)
+       }
+}
+
+func createRetryClient(cert *tls.Certificate) *http.Client {
+       rawRetryClient := retryablehttp.NewClient()
+       rawRetryClient.RetryWaitMax = time.Minute
+       rawRetryClient.RetryMax = int(^uint(0) >> 1)
+       rawRetryClient.HTTPClient.Transport = &http.Transport{
+               TLSClientConfig: &tls.Config{
+                       Certificates: []tls.Certificate{
+                               *cert,
+                       },
+                       InsecureSkipVerify: true,
+               },
+       }
+
+       return rawRetryClient.StandardClient()
+}
+
+func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+       registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
        if types, err := jobHandler.GetTypes(); err == nil {
                if regErr := registrator.RegisterTypes(types); regErr != nil {
                        return fmt.Errorf("unable to register all types due to: %v", regErr)
@@ -117,3 +127,8 @@ func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAdd
        }
        return nil
 }
+
+func keepProducerAlive() {
+       forever := make(chan int)
+       <-forever
+}
@@ -70,6 +70,6 @@ func registerJob(port int) {
 func handleData(w http.ResponseWriter, req *http.Request) {
        defer req.Body.Close()
        if reqData, err := io.ReadAll(req.Body); err == nil {
-               fmt.Print("Consumer received body: ", string(reqData))
+               fmt.Println("Consumer received body: ", string(reqData))
        }
 }
diff --git a/dmaap-mediator-producer/simulator/dmaap/mrsimulator.go b/dmaap-mediator-producer/simulator/dmaap/mrsimulator.go
new file mode 100644 (file)
index 0000000..0ec38cc
--- /dev/null
@@ -0,0 +1,102 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+       "encoding/json"
+       "flag"
+       "fmt"
+       "math/rand"
+       "net/http"
+       "time"
+)
+
+var r = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+type FaultMessage struct {
+       Event Event `json:"event"`
+}
+
+type Event struct {
+       CommonEventHeader CommonEventHeader `json:"commonEventHeader"`
+       FaultFields       FaultFields       `json:"faultFields"`
+}
+
+type CommonEventHeader struct {
+       Domain     string `json:"domain"`
+       SourceName string `json:"sourceName"`
+}
+
+type FaultFields struct {
+       AlarmCondition string `json:"alarmCondition"`
+       EventSeverity  string `json:"eventSeverity"`
+}
+
+func main() {
+       port := flag.Int("port", 3905, "The port this message router will listen on")
+       flag.Parse()
+
+       http.HandleFunc("/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", handleData)
+
+       fmt.Print("Starting mr on port: ", *port)
+       http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../configs/producer.crt", "../../configs/producer.key", nil)
+
+}
+
+var critical = true
+
+func handleData(w http.ResponseWriter, req *http.Request) {
+       time.Sleep(time.Duration(r.Intn(3)) * time.Second)
+
+       w.Header().Set("Content-Type", "application/json")
+
+       var responseBody []byte
+       if critical {
+               responseBody = getFaultMessage("CRITICAL")
+               critical = false
+       } else {
+               responseBody = getFaultMessage("NORMAL")
+               critical = true
+       }
+       // w.Write(responseBody)
+       fmt.Fprint(w, string(responseBody))
+}
+
+func getFaultMessage(eventSeverity string) []byte {
+       linkFailureMessage := FaultMessage{
+               Event: Event{
+                       CommonEventHeader: CommonEventHeader{
+                               Domain:     "fault",
+                               SourceName: "ERICSSON-O-RU-11220",
+                       },
+                       FaultFields: FaultFields{
+                               AlarmCondition: "28",
+                               EventSeverity:  eventSeverity,
+                       },
+               },
+       }
+       fmt.Printf("Sending message: %v\n", linkFailureMessage)
+
+       messageAsByteArray, _ := json.Marshal(linkFailureMessage)
+       response := [1]string{string(messageAsByteArray)}
+       responseAsByteArray, _ := json.Marshal(response)
+       return responseAsByteArray
+}