From: elinuxhenrik Date: Thu, 28 Oct 2021 14:27:57 +0000 (+0200) Subject: Implement secure communications X-Git-Tag: 1.2.0~58^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;ds=sidebyside;h=c4960f1dc6688b039d6ee29da03e0c0de47b6fbb;p=nonrtric.git Implement secure communications The communication towards the consumer is not secured in this commit. Also changed the configuration so that the address of the DMaaP Message Router is given in one variable, named DMAAP_MR_ADDR. Issue-ID: NONRTRIC-601 Signed-off-by: elinuxhenrik Change-Id: Icb5b3c255367e823fcae2168ab37603092815893 --- diff --git a/dmaap-mediator-producer/.gitignore b/dmaap-mediator-producer/.gitignore index 9f5396c9..5b1f8f91 100644 --- a/dmaap-mediator-producer/.gitignore +++ b/dmaap-mediator-producer/.gitignore @@ -4,4 +4,7 @@ coverage.* main dmaapmediatorproducer __debug_bin* -simulator +consumer +!consumer/ +dmaap +!dmaap/ diff --git a/dmaap-mediator-producer/README.md b/dmaap-mediator-producer/README.md index e4a2ffdb..c22c5617 100644 --- a/dmaap-mediator-producer/README.md +++ b/dmaap-mediator-producer/README.md @@ -7,9 +7,10 @@ The producer takes a number of environment variables, described below, as config >- INFO_PRODUCER_HOST **Required**. The host for the producer. Example: `http://mrproducer` >- LOG_LEVEL Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`. Defaults to `Info`. >- INFO_PRODUCER_PORT Optional. The port for the product. Defaults to `8085`. ->- INFO_COORD_ADDR Optional. The address of the Information Coordinator. Defaults to `http://enrichmentservice:8083`. ->- MR_HOST Optional. The host for the DMaaP Message Router. Defaults to `http://message-router.onap`. ->- MR_PORT Optional. The port for the DMaaP Message Router. Defaults to `3904`. +>- INFO_COORD_ADDR Optional. The address of the Information Coordinator. Defaults to `https://enrichmentservice:8434`. +>- DMAAP_MR_ADDR Optional. The address of the DMaaP Message Router. Defaults to `https://message-router.onap:3905`. +>- PRODUCER_CERT_PATH Optional. The path to the certificate to use for https. Defaults to `configs/producer.crt` +>- PRODUCER_KEY_PATH Optional. The path to the key to the certificate to use for https. Defaults to `configs/producer.key` The file `configs/type_config.json` contains the configuration of job types that the producer will support. @@ -23,6 +24,8 @@ The file `configs/type_config.json` contains the configuration of job types that ] } +The server part of the producer uses https, and the communication towards ICS and MR use https, with no server certificate verification. + At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types. Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer. diff --git a/dmaap-mediator-producer/configs/producer.crt b/dmaap-mediator-producer/configs/producer.crt new file mode 100644 index 00000000..0f6d8a35 --- /dev/null +++ b/dmaap-mediator-producer/configs/producer.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDXzCCAkegAwIBAgIUEbuDTP0ixwxCxCQ9tR5DijGCbtkwDQYJKoZIhvcNAQEL +BQAwPzELMAkGA1UEBhMCc2UxDDAKBgNVBAoMA0VTVDERMA8GA1UECwwIRXJpY3Nz +b24xDzANBgNVBAMMBnNlcnZlcjAeFw0yMTEwMTkxNDA1MzVaFw0zMTEwMTcxNDA1 +MzVaMD8xCzAJBgNVBAYTAnNlMQwwCgYDVQQKDANFU1QxETAPBgNVBAsMCEVyaWNz +c29uMQ8wDQYDVQQDDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQDnH4imV8kx/mXz6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEE +Y6VWU47bSYRn2xJOP+wmfKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe +72iMbTc5q2uYi0ImB5/m3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNrua +I4iOnMvvuc71gvHol7appRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVo +uUZYYJseFhOlIANaXn6qmz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPD +x9PjmGmf6eOEC2ZHBi4OHwjIzmLnAgMBAAGjUzBRMB0GA1UdDgQWBBRjeDLPpLm2 +W623wna7xBCbHxtxVjAfBgNVHSMEGDAWgBRjeDLPpLm2W623wna7xBCbHxtxVjAP +BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAbFUAWFZaIMXmd5qv/ +xJYr1oPJpsmbgWGRWZWDZqbUabvWObyXlDJWIau60BerfcC5TmyElBjTyONSGwCT +tq+SVB0PXpgqa8ZQ25Ytn2AMDFWhrGbOefCXs6te3HGq6BNubTWrOVIvJypCwC95 ++iXVuDd4eg+n2fWv7h7fZRZHum/zLoRxB2lKoMMbc/BQX9hbtP6xyvIVvaYdhcJw +VzJJGIDqpMiMH6IBaOFSmgfOyGblGKAicj3E3kpGBfadLx3R+9V6aG7zyBnVbr2w +YJbV2Ay4PrF+PTpCMB/mNwC5RBTYHpSNdrCMSyq3I+QPVJq8dPJr7fd1Uwl3WHqX +FV0h +-----END CERTIFICATE----- diff --git a/dmaap-mediator-producer/configs/producer.key b/dmaap-mediator-producer/configs/producer.key new file mode 100644 index 00000000..5346bb7f --- /dev/null +++ b/dmaap-mediator-producer/configs/producer.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDnH4imV8kx/mXz +6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEEY6VWU47bSYRn2xJOP+wm +fKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe72iMbTc5q2uYi0ImB5/m +3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNruaI4iOnMvvuc71gvHol7ap +pRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVouUZYYJseFhOlIANaXn6q +mz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPDx9PjmGmf6eOEC2ZHBi4O +HwjIzmLnAgMBAAECggEBAMq1lZyPkh8PCUyLVX3VhC4jRybyAWBI+piKx+4EI6l/ +laP5dZcegCoo+w/mdbTpRHqAWGjec4e9+Nkoq8rLG6B2SCfaRJUYiEQSEvSBHAid +BZqKK4B82GXQavNU91Vy1OT3vD7mpPXF6jEK6gAA0C4Wt7Lzo7ZfqEavRBDMsNnV +jOxLwWJCFSKhfeA6grJCnagmEDKSxxazlNBgCahjPf/+IRJZ7Vk4Zjq+I/5nWKf8 +lYaQExKDIANuM/jMRnYVq5k4g2MKHUADWGTSvG1DMJiMHzdxb2miZovpIkEE86bC +wKBuele9IR6Rb/wygYj7WdaWysZ081OT7mNyju08e4ECgYEA8+q7vv4Nlz8bAcmY +Ip5517s15M5D9iLsE2Q5m9Zs99rUyQv0E8ekpChhtTSdvj+eNl39O4hji46Gyceu +MYPfNL7+YWaFDxuyaXEe/OFuKbFqgE1p08HXFcQJCvgqD1MWO5b9BRDc0qpNFIA8 +eN9xFBMQ2UFaALBMAup7Ef85q4kCgYEA8pKOAIsgmlnO8P9cPzkMC1oozslraAti +1JnOJjwPLoHFubtH2u7WoIkSvNfeNwfrsVXwAP0m7C8p7qhYppS+0XGjKpYNSezK +1GCqCVv8R1m+AsSseSUUaQCmEydd+gQbBq3r4u3wU3ylrgAoR3m+7SVyhvD+vbwI +7+zfj+O3zu8CgYEAqaAsQH5c5Tm1hmCztB+RjD1dFWl8ScevdSzWA1HzJcrA/6+Y +ZckI7kBG8sVMjemgFR735FbNI1hS1DBRK44Rw5SvQv0Qu5j/UeShMCt1ePkwn1k2 +p1S+Rxy1TTOXzGBzra0q+ELpzncwc3lalJSPBu7bYLrZ5HC167E1NSbQ7EECgYBo +e/IIj+TyNz7pFcVhQixK84HiWGYYQddHJhzi4TnU2XcWonG3/uqZ6ZEVoJIJ+DJw +h0jC1EggscwJDaBp2GY9Bwq2PD3rGsDfK+fx8ho/jYtH2/lCkVMyS2I9m9Zh68TM +YrvZWo4LGASxZ0XyS6GOunOTZlkD1uuulMRTUU4KJwKBgQCwyjs0/ElVFvO0lPIC +JJ//B5rqI7hNMJuTBvr4yiqVZdbgFukaU7FBVyNYDMpZi/nRbpglm+psFcwXtL8n +bHOIGLkh8vB7OuETRYhXs567lPYtO4BmHZlXW70Sq/0xqi/Mmz1RuEg4SQ1Ug5oy +wG6IV5EWSQAhsGirdybQ+bY7Kw== +-----END PRIVATE KEY----- diff --git a/dmaap-mediator-producer/configs/type_config.json b/dmaap-mediator-producer/configs/type_config.json index 983d0f3d..f75d0e4b 100644 --- a/dmaap-mediator-producer/configs/type_config.json +++ b/dmaap-mediator-producer/configs/type_config.json @@ -3,7 +3,7 @@ [ { "id": "STD_Fault_Messages", - "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages" + "dmaapTopicUrl": "/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages" } ] } \ No newline at end of file diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index dfd2505b..b31b334a 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -32,8 +32,9 @@ type Config struct { InfoProducerHost string InfoProducerPort int InfoCoordinatorAddress string - MRHost string - MRPort int + DMaaPMRAddress string + ProducerCertPath string + ProducerKeyPath string } func New() *Config { @@ -41,9 +42,10 @@ func New() *Config { LogLevel: getLogLevel(), InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""), InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085), - InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), - MRHost: getEnv("MR_HOST", "http://message-router.onap"), - MRPort: getEnvAsInt("MR_PORT", 3904), + InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://enrichmentservice:8434"), + DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"), + ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "configs/producer.crt"), + ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "configs/producer.key"), } } diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index fc64e575..9420a2ad 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -36,8 +36,9 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Setenv("INFO_PRODUCER_HOST", "producerHost") os.Setenv("INFO_PRODUCER_PORT", "8095") os.Setenv("INFO_COORD_ADDR", "infoCoordAddr") - os.Setenv("MR_HOST", "mrHost") - os.Setenv("MR_PORT", "3908") + os.Setenv("DMAAP_MR_ADDR", "mrHost:3908") + os.Setenv("PRODUCER_CERT_PATH", "cert") + os.Setenv("PRODUCER_KEY_PATH", "key") t.Cleanup(func() { os.Clearenv() }) @@ -46,8 +47,9 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { InfoProducerHost: "producerHost", InfoProducerPort: 8095, InfoCoordinatorAddress: "infoCoordAddr", - MRHost: "mrHost", - MRPort: 3908, + DMaaPMRAddress: "mrHost:3908", + ProducerCertPath: "cert", + ProducerKeyPath: "key", } got := New() @@ -68,9 +70,10 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T LogLevel: log.InfoLevel, InfoProducerHost: "", InfoProducerPort: 8085, - InfoCoordinatorAddress: "http://enrichmentservice:8083", - MRHost: "http://message-router.onap", - MRPort: 3904, + InfoCoordinatorAddress: "https://enrichmentservice:8434", + DMaaPMRAddress: "https://message-router.onap:3905", + ProducerCertPath: "configs/producer.crt", + ProducerKeyPath: "configs/producer.key", } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) @@ -94,9 +97,10 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) { LogLevel: log.InfoLevel, InfoProducerHost: "", InfoProducerPort: 8085, - InfoCoordinatorAddress: "http://enrichmentservice:8083", - MRHost: "http://message-router.onap", - MRPort: 3904, + InfoCoordinatorAddress: "https://enrichmentservice:8434", + DMaaPMRAddress: "https://message-router.onap:3905", + ProducerCertPath: "configs/producer.crt", + ProducerKeyPath: "configs/producer.key", } got := New() diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go index 2cffa2c3..324aed0c 100644 --- a/dmaap-mediator-producer/internal/config/registrator_test.go +++ b/dmaap-mediator-producer/internal/config/registrator_test.go @@ -57,7 +57,7 @@ func TestRegisterTypes(t *testing.T) { assertions.Equal("http", actualRequest.URL.Scheme) assertions.Equal("localhost:9990", actualRequest.URL.Host) assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path) - assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + assertions.Equal("application/json", actualRequest.Header.Get("Content-Type")) body, _ := ioutil.ReadAll(actualRequest.Body) expectedBody := []byte(`{"info_job_data_schema": {"type": "object","properties": {},"additionalProperties": false}}`) assertions.Equal(expectedBody, body) @@ -92,7 +92,7 @@ func TestRegisterProducer(t *testing.T) { assertions.Equal("http", actualRequest.URL.Scheme) assertions.Equal("localhost:9990", actualRequest.URL.Host) assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path) - assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + assertions.Equal("application/json", actualRequest.Header.Get("Content-Type")) body, _ := ioutil.ReadAll(actualRequest.Body) expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`) assertions.Equal(expectedBody, body) diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 7b21b002..854372ad 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -154,11 +154,12 @@ func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) { defer jh.mu.Unlock() for typeId, typeInfo := range jh.allTypes { log.Debugf("Processing jobs for type: %v", typeId) - messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient) + messagesBody, error := restclient.Get(mRAddress+typeInfo.DMaaPTopicURL, jh.pollClient) if error != nil { log.Warnf("Error getting data from MR. Cause: %v", error) continue } + log.Debugf("Received messages: %v", string(messagesBody)) jh.distributeMessages(messagesBody, typeInfo) } } @@ -176,6 +177,7 @@ func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInf if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil { log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr) } + log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner) } func (jh *JobHandlerImpl) clearAll() { diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 8533719e..555285c5 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -185,7 +185,7 @@ func TestPollAndDistributeMessages(t *testing.T) { if req.URL.String() == "http://consumerHost/target" { assertions.Equal(req.Method, "POST") assertions.Equal(messages, getBodyAsString(req)) - assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type")) + assertions.Equal("application/json", req.Header.Get("Content-Type")) wg.Done() // Signal that the distribution call has been made return &http.Response{ StatusCode: 200, @@ -207,7 +207,7 @@ func TestPollAndDistributeMessages(t *testing.T) { } handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", - DMaaPTopicURL: "topicUrl", + DMaaPTopicURL: "/topicUrl", Jobs: map[string]JobInfo{"job1": jobInfo}, } t.Cleanup(func() { diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go index 2b3a0cf3..7c762d99 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go @@ -70,7 +70,7 @@ func Post(url string, body []byte, client HTTPClient) error { func do(method string, url string, body []byte, client HTTPClient) error { if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil { - req.Header.Set("Content-Type", "application/json; charset=utf-8") + req.Header.Set("Content-Type", "application/json") if response, respErr := client.Do(req); respErr == nil { if isResponseSuccess(response.StatusCode) { return nil diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go index 7fe3cc7a..3ded9be2 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go @@ -122,7 +122,7 @@ func TestPutOk(t *testing.T) { assertions.Equal(http.MethodPut, actualRequest.Method) assertions.Equal("http", actualRequest.URL.Scheme) assertions.Equal("localhost:9990", actualRequest.URL.Host) - assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + assertions.Equal("application/json", actualRequest.Header.Get("Content-Type")) body, _ := ioutil.ReadAll(actualRequest.Body) expectedBody := []byte("body") assertions.Equal(expectedBody, body) @@ -148,7 +148,7 @@ func TestPostOk(t *testing.T) { assertions.Equal(http.MethodPost, actualRequest.Method) assertions.Equal("http", actualRequest.URL.Scheme) assertions.Equal("localhost:9990", actualRequest.URL.Host) - assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + assertions.Equal("application/json", actualRequest.Header.Get("Content-Type")) body, _ := ioutil.ReadAll(actualRequest.Body) expectedBody := []byte("body") assertions.Equal(expectedBody, body) diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 1a91af40..cdfde579 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -21,9 +21,9 @@ package main import ( + "crypto/tls" "fmt" "net/http" - "sync" "time" "github.com/hashicorp/go-retryablehttp" @@ -34,13 +34,7 @@ import ( "oransc.org/nonrtric/dmaapmediatorproducer/internal/server" ) -const timeoutDistributionClient = time.Second * 5 -const retryWaitMax = time.Minute -const retryMax = int(^uint(0) >> 1) - var configuration *config.Config -var retryClient restclient.HTTPClient -var jobHandler *jobs.JobHandlerImpl func init() { configuration = config.New() @@ -54,52 +48,68 @@ func main() { } callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort) - distributionClient := &http.Client{ - Timeout: timeoutDistributionClient, + var retryClient restclient.HTTPClient + if cert, err := createClientCertificate(); err == nil { + retryClient = createRetryClient(cert) + } else { + log.Fatalf("Stopping producer due to error: %v", err) } - rawRetryClient := retryablehttp.NewClient() - rawRetryClient.RetryWaitMax = retryWaitMax - rawRetryClient.RetryMax = retryMax - retryClient = rawRetryClient.StandardClient() - - jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, distributionClient) - if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil { + jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{ + Timeout: time.Second * 5, + }) + if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil { log.Fatalf("Stopping producer due to: %v", err) } log.Debug("Starting DMaaP Mediator Producer") - wg := new(sync.WaitGroup) - - // add two goroutines to `wg` WaitGroup, one for each running go routine - wg.Add(2) - - log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort) go func() { + log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort) r := server.NewRouter(jobHandler) - log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r)) - wg.Done() + log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r)) }() - go func() { - jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort)) - wg.Done() - }() + go jobHandler.RunJobs(configuration.DMaaPMRAddress) - // wait until WaitGroup is done - wg.Wait() - log.Debug("Stopping DMaaP Mediator Producer") + keepProducerAlive() } func validateConfiguration(configuration *config.Config) error { if configuration.InfoProducerHost == "" { return fmt.Errorf("missing INFO_PRODUCER_HOST") } + if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" { + return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY") + } return nil } -func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error { - registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient) +func createClientCertificate() (*tls.Certificate, error) { + if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil { + return &cert, nil + } else { + return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s", configuration.ProducerCertPath, configuration.ProducerKeyPath) + } +} + +func createRetryClient(cert *tls.Certificate) *http.Client { + rawRetryClient := retryablehttp.NewClient() + rawRetryClient.RetryWaitMax = time.Minute + rawRetryClient.RetryMax = int(^uint(0) >> 1) + rawRetryClient.HTTPClient.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{ + *cert, + }, + InsecureSkipVerify: true, + }, + } + + return rawRetryClient.StandardClient() +} + +func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error { + registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client) if types, err := jobHandler.GetTypes(); err == nil { if regErr := registrator.RegisterTypes(types); regErr != nil { return fmt.Errorf("unable to register all types due to: %v", regErr) @@ -117,3 +127,8 @@ func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAdd } return nil } + +func keepProducerAlive() { + forever := make(chan int) + <-forever +} diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumer/consumersimulator.go similarity index 97% rename from dmaap-mediator-producer/simulator/consumersimulator.go rename to dmaap-mediator-producer/simulator/consumer/consumersimulator.go index df813857..03e67c02 100644 --- a/dmaap-mediator-producer/simulator/consumersimulator.go +++ b/dmaap-mediator-producer/simulator/consumer/consumersimulator.go @@ -70,6 +70,6 @@ func registerJob(port int) { func handleData(w http.ResponseWriter, req *http.Request) { defer req.Body.Close() if reqData, err := io.ReadAll(req.Body); err == nil { - fmt.Print("Consumer received body: ", string(reqData)) + fmt.Println("Consumer received body: ", string(reqData)) } } diff --git a/dmaap-mediator-producer/simulator/dmaap/mrsimulator.go b/dmaap-mediator-producer/simulator/dmaap/mrsimulator.go new file mode 100644 index 00000000..0ec38cc2 --- /dev/null +++ b/dmaap-mediator-producer/simulator/dmaap/mrsimulator.go @@ -0,0 +1,102 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "math/rand" + "net/http" + "time" +) + +var r = rand.New(rand.NewSource(time.Now().UnixNano())) + +type FaultMessage struct { + Event Event `json:"event"` +} + +type Event struct { + CommonEventHeader CommonEventHeader `json:"commonEventHeader"` + FaultFields FaultFields `json:"faultFields"` +} + +type CommonEventHeader struct { + Domain string `json:"domain"` + SourceName string `json:"sourceName"` +} + +type FaultFields struct { + AlarmCondition string `json:"alarmCondition"` + EventSeverity string `json:"eventSeverity"` +} + +func main() { + port := flag.Int("port", 3905, "The port this message router will listen on") + flag.Parse() + + http.HandleFunc("/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", handleData) + + fmt.Print("Starting mr on port: ", *port) + http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../configs/producer.crt", "../../configs/producer.key", nil) + +} + +var critical = true + +func handleData(w http.ResponseWriter, req *http.Request) { + time.Sleep(time.Duration(r.Intn(3)) * time.Second) + + w.Header().Set("Content-Type", "application/json") + + var responseBody []byte + if critical { + responseBody = getFaultMessage("CRITICAL") + critical = false + } else { + responseBody = getFaultMessage("NORMAL") + critical = true + } + // w.Write(responseBody) + fmt.Fprint(w, string(responseBody)) +} + +func getFaultMessage(eventSeverity string) []byte { + linkFailureMessage := FaultMessage{ + Event: Event{ + CommonEventHeader: CommonEventHeader{ + Domain: "fault", + SourceName: "ERICSSON-O-RU-11220", + }, + FaultFields: FaultFields{ + AlarmCondition: "28", + EventSeverity: eventSeverity, + }, + }, + } + fmt.Printf("Sending message: %v\n", linkFailureMessage) + + messageAsByteArray, _ := json.Marshal(linkFailureMessage) + response := [1]string{string(messageAsByteArray)} + responseAsByteArray, _ := json.Marshal(response) + return responseAsByteArray +}