Merge "Implement secure communications"
authorJohn Keeney <john.keeney@est.tech>
Mon, 1 Nov 2021 10:55:57 +0000 (10:55 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Mon, 1 Nov 2021 10:55:57 +0000 (10:55 +0000)
15 files changed:
dmaap-mediator-producer/.gitignore
dmaap-mediator-producer/README.md
dmaap-mediator-producer/configs/producer.crt [new file with mode: 0644]
dmaap-mediator-producer/configs/producer.key [new file with mode: 0644]
dmaap-mediator-producer/configs/type_config.json
dmaap-mediator-producer/internal/config/config.go
dmaap-mediator-producer/internal/config/config_test.go
dmaap-mediator-producer/internal/config/registrator_test.go
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/internal/restclient/HTTPClient.go
dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
dmaap-mediator-producer/main.go
dmaap-mediator-producer/simulator/consumer/consumersimulator.go [moved from dmaap-mediator-producer/simulator/consumersimulator.go with 97% similarity]
dmaap-mediator-producer/simulator/dmaap/mrsimulator.go [new file with mode: 0644]

index 9f5396c..5b1f8f9 100644 (file)
@@ -4,4 +4,7 @@ coverage.*
 main
 dmaapmediatorproducer
 __debug_bin*
-simulator
+consumer
+!consumer/
+dmaap
+!dmaap/
index e4a2ffd..c22c561 100644 (file)
@@ -7,9 +7,10 @@ The producer takes a number of environment variables, described below, as config
 >- INFO_PRODUCER_HOST  **Required**. The host for the producer.                                   Example: `http://mrproducer`
 >- LOG_LEVEL           Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`.  Defaults to `Info`.
 >- INFO_PRODUCER_PORT  Optional. The port for the product.                                        Defaults to `8085`.
->- INFO_COORD_ADDR     Optional. The address of the Information Coordinator.                      Defaults to `http://enrichmentservice:8083`.
->- MR_HOST             Optional. The host for the DMaaP Message Router.                           Defaults to `http://message-router.onap`.
->- MR_PORT             Optional. The port for the DMaaP Message Router.                           Defaults to `3904`.
+>- INFO_COORD_ADDR     Optional. The address of the Information Coordinator.                      Defaults to `https://enrichmentservice:8434`.
+>- DMAAP_MR_ADDR       Optional. The address of the DMaaP Message Router.                         Defaults to `https://message-router.onap:3905`.
+>- PRODUCER_CERT_PATH  Optional. The path to the certificate to use for https.                    Defaults to `configs/producer.crt`
+>- PRODUCER_KEY_PATH   Optional. The path to the key to the certificate to use for https.         Defaults to `configs/producer.key`
 
 The file `configs/type_config.json` contains the configuration of job types that the producer will support.
 
@@ -23,6 +24,8 @@ The file `configs/type_config.json` contains the configuration of job types that
       ]
     }
 
+The server part of the producer uses https, and the communication towards ICS and MR use https, with no server certificate verification.
+
 At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types.
 
 Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
diff --git a/dmaap-mediator-producer/configs/producer.crt b/dmaap-mediator-producer/configs/producer.crt
new file mode 100644 (file)
index 0000000..0f6d8a3
--- /dev/null
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDXzCCAkegAwIBAgIUEbuDTP0ixwxCxCQ9tR5DijGCbtkwDQYJKoZIhvcNAQEL
+BQAwPzELMAkGA1UEBhMCc2UxDDAKBgNVBAoMA0VTVDERMA8GA1UECwwIRXJpY3Nz
+b24xDzANBgNVBAMMBnNlcnZlcjAeFw0yMTEwMTkxNDA1MzVaFw0zMTEwMTcxNDA1
+MzVaMD8xCzAJBgNVBAYTAnNlMQwwCgYDVQQKDANFU1QxETAPBgNVBAsMCEVyaWNz
+c29uMQ8wDQYDVQQDDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
+AoIBAQDnH4imV8kx/mXz6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEE
+Y6VWU47bSYRn2xJOP+wmfKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe
+72iMbTc5q2uYi0ImB5/m3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNrua
+I4iOnMvvuc71gvHol7appRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVo
+uUZYYJseFhOlIANaXn6qmz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPD
+x9PjmGmf6eOEC2ZHBi4OHwjIzmLnAgMBAAGjUzBRMB0GA1UdDgQWBBRjeDLPpLm2
+W623wna7xBCbHxtxVjAfBgNVHSMEGDAWgBRjeDLPpLm2W623wna7xBCbHxtxVjAP
+BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAbFUAWFZaIMXmd5qv/
+xJYr1oPJpsmbgWGRWZWDZqbUabvWObyXlDJWIau60BerfcC5TmyElBjTyONSGwCT
+tq+SVB0PXpgqa8ZQ25Ytn2AMDFWhrGbOefCXs6te3HGq6BNubTWrOVIvJypCwC95
++iXVuDd4eg+n2fWv7h7fZRZHum/zLoRxB2lKoMMbc/BQX9hbtP6xyvIVvaYdhcJw
+VzJJGIDqpMiMH6IBaOFSmgfOyGblGKAicj3E3kpGBfadLx3R+9V6aG7zyBnVbr2w
+YJbV2Ay4PrF+PTpCMB/mNwC5RBTYHpSNdrCMSyq3I+QPVJq8dPJr7fd1Uwl3WHqX
+FV0h
+-----END CERTIFICATE-----
diff --git a/dmaap-mediator-producer/configs/producer.key b/dmaap-mediator-producer/configs/producer.key
new file mode 100644 (file)
index 0000000..5346bb7
--- /dev/null
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDnH4imV8kx/mXz
+6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEEY6VWU47bSYRn2xJOP+wm
+fKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe72iMbTc5q2uYi0ImB5/m
+3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNruaI4iOnMvvuc71gvHol7ap
+pRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVouUZYYJseFhOlIANaXn6q
+mz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPDx9PjmGmf6eOEC2ZHBi4O
+HwjIzmLnAgMBAAECggEBAMq1lZyPkh8PCUyLVX3VhC4jRybyAWBI+piKx+4EI6l/
+laP5dZcegCoo+w/mdbTpRHqAWGjec4e9+Nkoq8rLG6B2SCfaRJUYiEQSEvSBHAid
+BZqKK4B82GXQavNU91Vy1OT3vD7mpPXF6jEK6gAA0C4Wt7Lzo7ZfqEavRBDMsNnV
+jOxLwWJCFSKhfeA6grJCnagmEDKSxxazlNBgCahjPf/+IRJZ7Vk4Zjq+I/5nWKf8
+lYaQExKDIANuM/jMRnYVq5k4g2MKHUADWGTSvG1DMJiMHzdxb2miZovpIkEE86bC
+wKBuele9IR6Rb/wygYj7WdaWysZ081OT7mNyju08e4ECgYEA8+q7vv4Nlz8bAcmY
+Ip5517s15M5D9iLsE2Q5m9Zs99rUyQv0E8ekpChhtTSdvj+eNl39O4hji46Gyceu
+MYPfNL7+YWaFDxuyaXEe/OFuKbFqgE1p08HXFcQJCvgqD1MWO5b9BRDc0qpNFIA8
+eN9xFBMQ2UFaALBMAup7Ef85q4kCgYEA8pKOAIsgmlnO8P9cPzkMC1oozslraAti
+1JnOJjwPLoHFubtH2u7WoIkSvNfeNwfrsVXwAP0m7C8p7qhYppS+0XGjKpYNSezK
+1GCqCVv8R1m+AsSseSUUaQCmEydd+gQbBq3r4u3wU3ylrgAoR3m+7SVyhvD+vbwI
+7+zfj+O3zu8CgYEAqaAsQH5c5Tm1hmCztB+RjD1dFWl8ScevdSzWA1HzJcrA/6+Y
+ZckI7kBG8sVMjemgFR735FbNI1hS1DBRK44Rw5SvQv0Qu5j/UeShMCt1ePkwn1k2
+p1S+Rxy1TTOXzGBzra0q+ELpzncwc3lalJSPBu7bYLrZ5HC167E1NSbQ7EECgYBo
+e/IIj+TyNz7pFcVhQixK84HiWGYYQddHJhzi4TnU2XcWonG3/uqZ6ZEVoJIJ+DJw
+h0jC1EggscwJDaBp2GY9Bwq2PD3rGsDfK+fx8ho/jYtH2/lCkVMyS2I9m9Zh68TM
+YrvZWo4LGASxZ0XyS6GOunOTZlkD1uuulMRTUU4KJwKBgQCwyjs0/ElVFvO0lPIC
+JJ//B5rqI7hNMJuTBvr4yiqVZdbgFukaU7FBVyNYDMpZi/nRbpglm+psFcwXtL8n
+bHOIGLkh8vB7OuETRYhXs567lPYtO4BmHZlXW70Sq/0xqi/Mmz1RuEg4SQ1Ug5oy
+wG6IV5EWSQAhsGirdybQ+bY7Kw==
+-----END PRIVATE KEY-----
index 983d0f3..f75d0e4 100644 (file)
@@ -3,7 +3,7 @@
     [
       {
         "id": "STD_Fault_Messages",
-        "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+        "dmaapTopicUrl": "/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
       }
   ]
 }
\ No newline at end of file
index dfd2505..b31b334 100644 (file)
@@ -32,8 +32,9 @@ type Config struct {
        InfoProducerHost       string
        InfoProducerPort       int
        InfoCoordinatorAddress string
-       MRHost                 string
-       MRPort                 int
+       DMaaPMRAddress         string
+       ProducerCertPath       string
+       ProducerKeyPath        string
 }
 
 func New() *Config {
@@ -41,9 +42,10 @@ func New() *Config {
                LogLevel:               getLogLevel(),
                InfoProducerHost:       getEnv("INFO_PRODUCER_HOST", ""),
                InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
-               InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
-               MRHost:                 getEnv("MR_HOST", "http://message-router.onap"),
-               MRPort:                 getEnvAsInt("MR_PORT", 3904),
+               InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://enrichmentservice:8434"),
+               DMaaPMRAddress:         getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
+               ProducerCertPath:       getEnv("PRODUCER_CERT_PATH", "configs/producer.crt"),
+               ProducerKeyPath:        getEnv("PRODUCER_KEY_PATH", "configs/producer.key"),
        }
 }
 
index fc64e57..9420a2a 100644 (file)
@@ -36,8 +36,9 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        os.Setenv("INFO_PRODUCER_HOST", "producerHost")
        os.Setenv("INFO_PRODUCER_PORT", "8095")
        os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
-       os.Setenv("MR_HOST", "mrHost")
-       os.Setenv("MR_PORT", "3908")
+       os.Setenv("DMAAP_MR_ADDR", "mrHost:3908")
+       os.Setenv("PRODUCER_CERT_PATH", "cert")
+       os.Setenv("PRODUCER_KEY_PATH", "key")
        t.Cleanup(func() {
                os.Clearenv()
        })
@@ -46,8 +47,9 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
                InfoProducerHost:       "producerHost",
                InfoProducerPort:       8095,
                InfoCoordinatorAddress: "infoCoordAddr",
-               MRHost:                 "mrHost",
-               MRPort:                 3908,
+               DMaaPMRAddress:         "mrHost:3908",
+               ProducerCertPath:       "cert",
+               ProducerKeyPath:        "key",
        }
        got := New()
 
@@ -68,9 +70,10 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T
                LogLevel:               log.InfoLevel,
                InfoProducerHost:       "",
                InfoProducerPort:       8085,
-               InfoCoordinatorAddress: "http://enrichmentservice:8083",
-               MRHost:                 "http://message-router.onap",
-               MRPort:                 3904,
+               InfoCoordinatorAddress: "https://enrichmentservice:8434",
+               DMaaPMRAddress:         "https://message-router.onap:3905",
+               ProducerCertPath:       "configs/producer.crt",
+               ProducerKeyPath:        "configs/producer.key",
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -94,9 +97,10 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
                LogLevel:               log.InfoLevel,
                InfoProducerHost:       "",
                InfoProducerPort:       8085,
-               InfoCoordinatorAddress: "http://enrichmentservice:8083",
-               MRHost:                 "http://message-router.onap",
-               MRPort:                 3904,
+               InfoCoordinatorAddress: "https://enrichmentservice:8434",
+               DMaaPMRAddress:         "https://message-router.onap:3905",
+               ProducerCertPath:       "configs/producer.crt",
+               ProducerKeyPath:        "configs/producer.key",
        }
 
        got := New()
index 2cffa2c..324aed0 100644 (file)
@@ -57,7 +57,7 @@ func TestRegisterTypes(t *testing.T) {
        assertions.Equal("http", actualRequest.URL.Scheme)
        assertions.Equal("localhost:9990", actualRequest.URL.Host)
        assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path)
-       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
        body, _ := ioutil.ReadAll(actualRequest.Body)
        expectedBody := []byte(`{"info_job_data_schema": {"type": "object","properties": {},"additionalProperties": false}}`)
        assertions.Equal(expectedBody, body)
@@ -92,7 +92,7 @@ func TestRegisterProducer(t *testing.T) {
        assertions.Equal("http", actualRequest.URL.Scheme)
        assertions.Equal("localhost:9990", actualRequest.URL.Host)
        assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path)
-       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
        body, _ := ioutil.ReadAll(actualRequest.Body)
        expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`)
        assertions.Equal(expectedBody, body)
index 7b21b00..854372a 100644 (file)
@@ -154,11 +154,12 @@ func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) {
        defer jh.mu.Unlock()
        for typeId, typeInfo := range jh.allTypes {
                log.Debugf("Processing jobs for type: %v", typeId)
-               messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient)
+               messagesBody, error := restclient.Get(mRAddress+typeInfo.DMaaPTopicURL, jh.pollClient)
                if error != nil {
                        log.Warnf("Error getting data from MR. Cause: %v", error)
                        continue
                }
+               log.Debugf("Received messages: %v", string(messagesBody))
                jh.distributeMessages(messagesBody, typeInfo)
        }
 }
@@ -176,6 +177,7 @@ func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInf
        if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
                log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
        }
+       log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
 }
 
 func (jh *JobHandlerImpl) clearAll() {
index 8533719..555285c 100644 (file)
@@ -185,7 +185,7 @@ func TestPollAndDistributeMessages(t *testing.T) {
                if req.URL.String() == "http://consumerHost/target" {
                        assertions.Equal(req.Method, "POST")
                        assertions.Equal(messages, getBodyAsString(req))
-                       assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
+                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
                        wg.Done() // Signal that the distribution call has been made
                        return &http.Response{
                                StatusCode: 200,
@@ -207,7 +207,7 @@ func TestPollAndDistributeMessages(t *testing.T) {
        }
        handlerUnderTest.allTypes["type1"] = TypeData{
                TypeId:        "type1",
-               DMaaPTopicURL: "topicUrl",
+               DMaaPTopicURL: "/topicUrl",
                Jobs:          map[string]JobInfo{"job1": jobInfo},
        }
        t.Cleanup(func() {
index 2b3a0cf..7c762d9 100644 (file)
@@ -70,7 +70,7 @@ func Post(url string, body []byte, client HTTPClient) error {
 
 func do(method string, url string, body []byte, client HTTPClient) error {
        if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
-               req.Header.Set("Content-Type", "application/json; charset=utf-8")
+               req.Header.Set("Content-Type", "application/json")
                if response, respErr := client.Do(req); respErr == nil {
                        if isResponseSuccess(response.StatusCode) {
                                return nil
index 7fe3cc7..3ded9be 100644 (file)
@@ -122,7 +122,7 @@ func TestPutOk(t *testing.T) {
        assertions.Equal(http.MethodPut, actualRequest.Method)
        assertions.Equal("http", actualRequest.URL.Scheme)
        assertions.Equal("localhost:9990", actualRequest.URL.Host)
-       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
        body, _ := ioutil.ReadAll(actualRequest.Body)
        expectedBody := []byte("body")
        assertions.Equal(expectedBody, body)
@@ -148,7 +148,7 @@ func TestPostOk(t *testing.T) {
        assertions.Equal(http.MethodPost, actualRequest.Method)
        assertions.Equal("http", actualRequest.URL.Scheme)
        assertions.Equal("localhost:9990", actualRequest.URL.Host)
-       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       assertions.Equal("application/json", actualRequest.Header.Get("Content-Type"))
        body, _ := ioutil.ReadAll(actualRequest.Body)
        expectedBody := []byte("body")
        assertions.Equal(expectedBody, body)
index 1a91af4..cdfde57 100644 (file)
@@ -21,9 +21,9 @@
 package main
 
 import (
+       "crypto/tls"
        "fmt"
        "net/http"
-       "sync"
        "time"
 
        "github.com/hashicorp/go-retryablehttp"
@@ -34,13 +34,7 @@ import (
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
 
-const timeoutDistributionClient = time.Second * 5
-const retryWaitMax = time.Minute
-const retryMax = int(^uint(0) >> 1)
-
 var configuration *config.Config
-var retryClient restclient.HTTPClient
-var jobHandler *jobs.JobHandlerImpl
 
 func init() {
        configuration = config.New()
@@ -54,52 +48,68 @@ func main() {
        }
        callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
 
-       distributionClient := &http.Client{
-               Timeout: timeoutDistributionClient,
+       var retryClient restclient.HTTPClient
+       if cert, err := createClientCertificate(); err == nil {
+               retryClient = createRetryClient(cert)
+       } else {
+               log.Fatalf("Stopping producer due to error: %v", err)
        }
 
-       rawRetryClient := retryablehttp.NewClient()
-       rawRetryClient.RetryWaitMax = retryWaitMax
-       rawRetryClient.RetryMax = retryMax
-       retryClient = rawRetryClient.StandardClient()
-
-       jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, distributionClient)
-       if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
+       jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{
+               Timeout: time.Second * 5,
+       })
+       if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
                log.Fatalf("Stopping producer due to: %v", err)
        }
 
        log.Debug("Starting DMaaP Mediator Producer")
-       wg := new(sync.WaitGroup)
-
-       // add two goroutines to `wg` WaitGroup, one for each running go routine
-       wg.Add(2)
-
-       log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
        go func() {
+               log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
                r := server.NewRouter(jobHandler)
-               log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
-               wg.Done()
+               log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
        }()
 
-       go func() {
-               jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
-               wg.Done()
-       }()
+       go jobHandler.RunJobs(configuration.DMaaPMRAddress)
 
-       // wait until WaitGroup is done
-       wg.Wait()
-       log.Debug("Stopping DMaaP Mediator Producer")
+       keepProducerAlive()
 }
 
 func validateConfiguration(configuration *config.Config) error {
        if configuration.InfoProducerHost == "" {
                return fmt.Errorf("missing INFO_PRODUCER_HOST")
        }
+       if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
+               return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
+       }
        return nil
 }
 
-func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
-       registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient)
+func createClientCertificate() (*tls.Certificate, error) {
+       if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
+               return &cert, nil
+       } else {
+               return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s", configuration.ProducerCertPath, configuration.ProducerKeyPath)
+       }
+}
+
+func createRetryClient(cert *tls.Certificate) *http.Client {
+       rawRetryClient := retryablehttp.NewClient()
+       rawRetryClient.RetryWaitMax = time.Minute
+       rawRetryClient.RetryMax = int(^uint(0) >> 1)
+       rawRetryClient.HTTPClient.Transport = &http.Transport{
+               TLSClientConfig: &tls.Config{
+                       Certificates: []tls.Certificate{
+                               *cert,
+                       },
+                       InsecureSkipVerify: true,
+               },
+       }
+
+       return rawRetryClient.StandardClient()
+}
+
+func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+       registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
        if types, err := jobHandler.GetTypes(); err == nil {
                if regErr := registrator.RegisterTypes(types); regErr != nil {
                        return fmt.Errorf("unable to register all types due to: %v", regErr)
@@ -117,3 +127,8 @@ func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAdd
        }
        return nil
 }
+
+func keepProducerAlive() {
+       forever := make(chan int)
+       <-forever
+}
@@ -70,6 +70,6 @@ func registerJob(port int) {
 func handleData(w http.ResponseWriter, req *http.Request) {
        defer req.Body.Close()
        if reqData, err := io.ReadAll(req.Body); err == nil {
-               fmt.Print("Consumer received body: ", string(reqData))
+               fmt.Println("Consumer received body: ", string(reqData))
        }
 }
diff --git a/dmaap-mediator-producer/simulator/dmaap/mrsimulator.go b/dmaap-mediator-producer/simulator/dmaap/mrsimulator.go
new file mode 100644 (file)
index 0000000..0ec38cc
--- /dev/null
@@ -0,0 +1,102 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+       "encoding/json"
+       "flag"
+       "fmt"
+       "math/rand"
+       "net/http"
+       "time"
+)
+
+var r = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+type FaultMessage struct {
+       Event Event `json:"event"`
+}
+
+type Event struct {
+       CommonEventHeader CommonEventHeader `json:"commonEventHeader"`
+       FaultFields       FaultFields       `json:"faultFields"`
+}
+
+type CommonEventHeader struct {
+       Domain     string `json:"domain"`
+       SourceName string `json:"sourceName"`
+}
+
+type FaultFields struct {
+       AlarmCondition string `json:"alarmCondition"`
+       EventSeverity  string `json:"eventSeverity"`
+}
+
+func main() {
+       port := flag.Int("port", 3905, "The port this message router will listen on")
+       flag.Parse()
+
+       http.HandleFunc("/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", handleData)
+
+       fmt.Print("Starting mr on port: ", *port)
+       http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../configs/producer.crt", "../../configs/producer.key", nil)
+
+}
+
+var critical = true
+
+func handleData(w http.ResponseWriter, req *http.Request) {
+       time.Sleep(time.Duration(r.Intn(3)) * time.Second)
+
+       w.Header().Set("Content-Type", "application/json")
+
+       var responseBody []byte
+       if critical {
+               responseBody = getFaultMessage("CRITICAL")
+               critical = false
+       } else {
+               responseBody = getFaultMessage("NORMAL")
+               critical = true
+       }
+       // w.Write(responseBody)
+       fmt.Fprint(w, string(responseBody))
+}
+
+func getFaultMessage(eventSeverity string) []byte {
+       linkFailureMessage := FaultMessage{
+               Event: Event{
+                       CommonEventHeader: CommonEventHeader{
+                               Domain:     "fault",
+                               SourceName: "ERICSSON-O-RU-11220",
+                       },
+                       FaultFields: FaultFields{
+                               AlarmCondition: "28",
+                               EventSeverity:  eventSeverity,
+                       },
+               },
+       }
+       fmt.Printf("Sending message: %v\n", linkFailureMessage)
+
+       messageAsByteArray, _ := json.Marshal(linkFailureMessage)
+       response := [1]string{string(messageAsByteArray)}
+       responseAsByteArray, _ := json.Marshal(response)
+       return responseAsByteArray
+}