From 280385634f160bb78a8944998e9106a2f6549eb0 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Tue, 21 Sep 2021 15:43:11 +0200 Subject: [PATCH] Poll MR and send messages to consumers Issue-ID: NONRTRIC-586 Signed-off-by: elinuxhenrik Change-Id: I7e261aedb1a528c193390f6a3e99a49d7783d35e --- .gitignore | 2 + .../configs/STD_Fault_Messages.json | 15 ++-- dmaap-mediator-producer/internal/config/config.go | 4 + .../internal/config/config_test.go | 8 ++ dmaap-mediator-producer/internal/jobs/jobs.go | 86 ++++++++++++++++---- dmaap-mediator-producer/internal/jobs/jobs_test.go | 94 ++++++++++++++++++---- .../internal/restclient/HTTPClient.go | 10 ++- .../internal/server/server_test.go | 4 +- dmaap-mediator-producer/main.go | 7 +- .../mocks/{ => jobhandler}/JobHandler.go | 4 +- .../simulator/consumersimulator.go | 40 +++++++++ 11 files changed, 233 insertions(+), 41 deletions(-) rename dmaap-mediator-producer/mocks/{ => jobhandler}/JobHandler.go (86%) create mode 100644 dmaap-mediator-producer/simulator/consumersimulator.go diff --git a/.gitignore b/.gitignore index df309a12..59150806 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ infer-out/ .vscode .factorypath + +coverage.* diff --git a/dmaap-mediator-producer/configs/STD_Fault_Messages.json b/dmaap-mediator-producer/configs/STD_Fault_Messages.json index b944802f..258d7436 100644 --- a/dmaap-mediator-producer/configs/STD_Fault_Messages.json +++ b/dmaap-mediator-producer/configs/STD_Fault_Messages.json @@ -1,7 +1,12 @@ { - "$schema": "https://json-schema.org/draft/2019-09/schema", - "title": "STD_Fault_Messages", - "description": "Schema for job delivering fault messages from DMaaP Message Router", - "type": "object", - "properties": {} + "id": "STD_Fault_Messages", + "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", + "schema": { + "$schema": "https://json-schema.org/draft/2019-09/schema", + "title": "STD_Fault_Messages", + "description": "Schema for job delivering fault messages from DMaaP Message Router", + "type": "object", + "properties": {}, + "additionalProperties": false + } } \ No newline at end of file diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 3616c584..8a2784a9 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -34,6 +34,8 @@ type Config struct { InfoJobCallbackHost string InfoJobCallbackPort int InfoCoordinatorAddress string + MRHost string + MRPort int } type ProducerRegistrationInfo struct { @@ -50,6 +52,8 @@ func New() *Config { InfoJobCallbackHost: getEnv("INFO_JOB_CALLBACK_HOST", ""), InfoJobCallbackPort: getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086), InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), + MRHost: getEnv("MR_HOST", "http://message-router.onap"), + MRPort: getEnvAsInt("MR_PORT", 3904), } } diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index 4a65dc0d..10430272 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -37,6 +37,8 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost") os.Setenv("INFO_JOB_CALLBACK_PORT", "8096") os.Setenv("INFO_COORD_ADDR", "infoCoordAddr") + os.Setenv("MR_HOST", "mrHost") + os.Setenv("MR_PORT", "3908") t.Cleanup(func() { os.Clearenv() }) @@ -47,6 +49,8 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { InfoJobCallbackHost: "jobCallbackHost", InfoJobCallbackPort: 8096, InfoCoordinatorAddress: "infoCoordAddr", + MRHost: "mrHost", + MRPort: 3908, } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) @@ -70,6 +74,8 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T InfoJobCallbackHost: "", InfoJobCallbackPort: 8086, InfoCoordinatorAddress: "http://enrichmentservice:8083", + MRHost: "http://message-router.onap", + MRPort: 3904, } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) @@ -86,6 +92,8 @@ func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) { InfoJobCallbackHost: "", InfoJobCallbackPort: 8086, InfoCoordinatorAddress: "http://enrichmentservice:8083", + MRHost: "http://message-router.onap", + MRPort: 3904, } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 73471789..eec59c39 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -21,15 +21,22 @@ package jobs import ( + "encoding/json" "fmt" "os" "path/filepath" "strings" + "sync" + + log "github.com/sirupsen/logrus" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) type Type struct { - TypeId string - Schema string + TypeId string `json:"id"` + DMaaPTopic string `json:"dmaapTopic"` + Schema string `json:"schema"` + Jobs map[string]JobInfo } type JobInfo struct { @@ -46,9 +53,10 @@ type JobHandler interface { } var ( + mu sync.Mutex typeDir = "configs" Handler JobHandler - allJobs = make(map[string]map[string]JobInfo) + allJobs = make(map[string]Type) ) func init() { @@ -62,8 +70,10 @@ func newJobHandlerImpl() *jobHandlerImpl { } func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { + mu.Lock() + defer mu.Unlock() if err := validateJobInfo(ji); err == nil { - jobs := allJobs[ji.InfoTypeIdentity] + jobs := allJobs[ji.InfoTypeIdentity].Jobs jobs[ji.InfoJobIdentity] = ji return nil } else { @@ -86,6 +96,8 @@ func validateJobInfo(ji JobInfo) error { } func GetTypes() ([]*Type, error) { + mu.Lock() + defer mu.Unlock() types := make([]*Type, 0, 1) err := filepath.Walk(typeDir, func(path string, info os.FileInfo, err error) error { @@ -106,6 +118,8 @@ func GetTypes() ([]*Type, error) { } func GetSupportedTypes() []string { + mu.Lock() + defer mu.Unlock() supportedTypes := []string{} for k := range allJobs { supportedTypes = append(supportedTypes, k) @@ -118,23 +132,63 @@ func AddJob(job JobInfo) error { } func getType(path string) (*Type, error) { - fileName := filepath.Base(path) - typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) - - if typeSchema, err := os.ReadFile(path); err == nil { - typeInfo := Type{ - TypeId: typeName, - Schema: string(typeSchema), - } - if _, ok := allJobs[typeName]; !ok { - allJobs[typeName] = make(map[string]JobInfo) + if typeDefinition, err := os.ReadFile(path); err == nil { + var dat map[string]interface{} + if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil { + schema, _ := json.Marshal(dat["schema"]) + typeInfo := Type{ + TypeId: dat["id"].(string), + DMaaPTopic: dat["dmaapTopic"].(string), + Schema: string(schema), + Jobs: make(map[string]JobInfo), + } + if _, ok := allJobs[typeInfo.TypeId]; !ok { + allJobs[typeInfo.TypeId] = typeInfo + } + return &typeInfo, nil + } else { + return nil, marshalError } - return &typeInfo, nil } else { return nil, err } } +func RunJobs(mRAddress string) { + for { + pollAndDistributeMessages(mRAddress) + } +} + +func pollAndDistributeMessages(mRAddress string) { + for typeId, typeInfo := range allJobs { + log.Debugf("Processing jobs for type: %v", typeId) + messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic)) + if error != nil { + log.Warnf("Error getting data from MR. Cause: %v", error) + continue + } + distributeMessages(messagesBody, typeInfo) + } +} + +func distributeMessages(messages []byte, typeInfo Type) { + if len(messages) > 2 { + mu.Lock() + for _, jobInfo := range typeInfo.Jobs { + go sendMessagesToConsumer(messages, jobInfo) + } + mu.Unlock() + } +} + +func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) { + log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity) + if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil { + log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr) + } +} + func clearAll() { - allJobs = make(map[string]map[string]JobInfo) + allJobs = make(map[string]Type) } diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 09410338..b53d85e6 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -21,14 +21,21 @@ package jobs import ( + "bytes" + "io/ioutil" + "net/http" "os" "path/filepath" "testing" + "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks" ) -const type1Schema = `{"title": "Type 1"}` +const typeDefinition = `{"id": "type1", "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", "schema": {"title": "Type 1"}}` func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) @@ -42,13 +49,15 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes }) typeDir = typesDir fname := filepath.Join(typesDir, "type1.json") - if err = os.WriteFile(fname, []byte(type1Schema), 0666); err != nil { + if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { t.Errorf("Unable to create temporary files for types due to: %v", err) } types, err := GetTypes() wantedType := Type{ - TypeId: "type1", - Schema: type1Schema, + TypeId: "type1", + DMaaPTopic: "unauthenticated.SEC_FAULT_OUTPUT", + Schema: `{"title":"Type 1"}`, + Jobs: make(map[string]JobInfo), } wantedTypes := []*Type{&wantedType} assertions.EqualValues(wantedTypes, types) @@ -60,11 +69,7 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { assertions := require.New(t) - allJobs["type1"] = make(map[string]JobInfo) - t.Cleanup(func() { - clearAll() - }) - jobInfo := JobInfo{ + wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", InfoJobIdentity: "job1", @@ -72,11 +77,18 @@ func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { InfoJobData: "{}", InfoTypeIdentity: "type1", } + allJobs["type1"] = Type{ + TypeId: "type1", + Jobs: map[string]JobInfo{"job1": wantedJob}, + } + t.Cleanup(func() { + clearAll() + }) - err := AddJob(jobInfo) + err := AddJob(wantedJob) assertions.Nil(err) - assertions.Equal(1, len(allJobs["type1"])) - assertions.Equal(jobInfo, allJobs["type1"]["job1"]) + assertions.Equal(1, len(allJobs["type1"].Jobs)) + assertions.Equal(wantedJob, allJobs["type1"].Jobs["job1"]) } func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { @@ -92,7 +104,9 @@ func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allJobs["type1"] = make(map[string]JobInfo) + allJobs["type1"] = Type{ + TypeId: "type1", + } t.Cleanup(func() { clearAll() }) @@ -107,7 +121,9 @@ func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allJobs["type1"] = make(map[string]JobInfo) + allJobs["type1"] = Type{ + TypeId: "type1", + } jobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", @@ -118,3 +134,53 @@ func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions.Equal("missing required target URI: { job1 type1}", err.Error()) clearAll() } + +func TestPollAndDistributeMessages(t *testing.T) { + assertions := require.New(t) + jobInfo := JobInfo{ + InfoTypeIdentity: "type1", + InfoJobIdentity: "job1", + TargetUri: "http://consumerHost/target", + } + allJobs["type1"] = Type{ + TypeId: "type1", + DMaaPTopic: "topic", + Jobs: map[string]JobInfo{"job1": jobInfo}, + } + t.Cleanup(func() { + clearAll() + }) + + body := ioutil.NopCloser(bytes.NewReader([]byte(`[{"message": {"data": "data"}}]`))) + clientMock := mocks.HTTPClient{} + clientMock.On("Get", mock.Anything).Return(&http.Response{ + StatusCode: http.StatusOK, + Body: body, + }, nil) + + clientMock.On("Do", mock.Anything).Return(&http.Response{ + StatusCode: http.StatusOK, + }, nil) + + restclient.Client = &clientMock + + pollAndDistributeMessages("http://mrAddr") + + time.Sleep(100 * time.Millisecond) + + var actualRequest *http.Request + clientMock.AssertCalled(t, "Get", "http://mrAddr/events/topic/users/dmaapmediatorproducer") + clientMock.AssertNumberOfCalls(t, "Get", 1) + + clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool { + actualRequest = req + return true + })) + assertions.Equal(http.MethodPost, actualRequest.Method) + assertions.Equal("consumerHost", actualRequest.URL.Host) + assertions.Equal("/target", actualRequest.URL.Path) + assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + actualBody, _ := ioutil.ReadAll(actualRequest.Body) + assertions.Equal([]byte(`[{"message": {"data": "data"}}]`), actualBody) + clientMock.AssertNumberOfCalls(t, "Do", 1) +} diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go index 78a02b6f..a783f7e6 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go @@ -73,7 +73,15 @@ func Get(url string) ([]byte, error) { } func Put(url string, body []byte) error { - if req, reqErr := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body)); reqErr == nil { + return do(http.MethodPut, url, body) +} + +func Post(url string, body []byte) error { + return do(http.MethodPost, url, body) +} + +func do(method string, url string, body []byte) error { + if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil { req.Header.Set("Content-Type", "application/json; charset=utf-8") if response, respErr := Client.Do(req); respErr == nil { if isResponseSuccess(response.StatusCode) { diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index d221c933..444deba4 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -32,7 +32,7 @@ import ( "github.com/stretchr/testify/require" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" - "oransc.org/nonrtric/dmaapmediatorproducer/mocks" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler" ) func TestStatusHandler(t *testing.T) { @@ -88,7 +88,7 @@ func TestStatusHandler(t *testing.T) { func TestCreateInfoJobHandler(t *testing.T) { assertions := require.New(t) - jobHandlerMock := mocks.JobHandler{} + jobHandlerMock := jobhandler.JobHandler{} goodJobInfo := jobs.JobInfo{ Owner: "owner", diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 3fe92dca..47e12e9a 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -76,7 +76,7 @@ func main() { wg := new(sync.WaitGroup) // add two goroutines to `wg` WaitGroup, one for each avilable server - wg.Add(2) + wg.Add(3) log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort) go func() { @@ -91,6 +91,11 @@ func main() { wg.Done() }() + go func() { + jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort)) + wg.Done() + }() + // wait until WaitGroup is done wg.Wait() log.Debug("Stopping DMaaP Mediator Producer") diff --git a/dmaap-mediator-producer/mocks/JobHandler.go b/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go similarity index 86% rename from dmaap-mediator-producer/mocks/JobHandler.go rename to dmaap-mediator-producer/mocks/jobhandler/JobHandler.go index 4914e4d7..edb2bf51 100644 --- a/dmaap-mediator-producer/mocks/JobHandler.go +++ b/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go @@ -1,10 +1,10 @@ // Code generated by mockery v2.9.3. DO NOT EDIT. -package mocks +package jobhandler import ( mock "github.com/stretchr/testify/mock" - jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" ) // JobHandler is an autogenerated mock type for the JobHandler type diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go new file mode 100644 index 00000000..25421ae6 --- /dev/null +++ b/dmaap-mediator-producer/simulator/consumersimulator.go @@ -0,0 +1,40 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package main + +import ( + "fmt" + "io" + http "net/http" +) + +func handleData(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + if reqData, err := io.ReadAll(req.Body); err == nil { + fmt.Printf("Consumer received body: %v\n", string(reqData)) + } +} + +func main() { + http.HandleFunc("/jobs", handleData) + + http.ListenAndServe(":40935", nil) +} -- 2.16.6