From 382870d3056e724f1e587ab21cd25411bceaaac7 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Thu, 23 Sep 2021 11:09:09 +0200 Subject: [PATCH] Improve DMaaP Mediator Producer Issue-ID: NONRTRIC-586 Signed-off-by: elinuxhenrik Change-Id: Iab915f878874b687b1c1b2effc05293582fb254c --- dmaap-mediator-producer/.gitignore | 1 + .../configs/STD_Fault_Messages.json | 12 -- dmaap-mediator-producer/configs/type_config.json | 9 ++ dmaap-mediator-producer/internal/config/config.go | 28 ++--- .../internal/config/config_test.go | 52 ++++---- .../internal/config/registrator.go | 7 +- .../internal/config/registrator_test.go | 7 +- dmaap-mediator-producer/internal/jobs/jobs.go | 110 ++++++++-------- dmaap-mediator-producer/internal/jobs/jobs_test.go | 140 +++++++++++++-------- dmaap-mediator-producer/internal/server/server.go | 18 +-- .../internal/server/server_test.go | 10 +- dmaap-mediator-producer/main.go | 34 ++--- .../simulator/consumersimulator.go | 37 ++++-- 13 files changed, 240 insertions(+), 225 deletions(-) delete mode 100644 dmaap-mediator-producer/configs/STD_Fault_Messages.json create mode 100644 dmaap-mediator-producer/configs/type_config.json diff --git a/dmaap-mediator-producer/.gitignore b/dmaap-mediator-producer/.gitignore index 0d08f66c..9f5396c9 100644 --- a/dmaap-mediator-producer/.gitignore +++ b/dmaap-mediator-producer/.gitignore @@ -4,3 +4,4 @@ coverage.* main dmaapmediatorproducer __debug_bin* +simulator diff --git a/dmaap-mediator-producer/configs/STD_Fault_Messages.json b/dmaap-mediator-producer/configs/STD_Fault_Messages.json deleted file mode 100644 index 258d7436..00000000 --- a/dmaap-mediator-producer/configs/STD_Fault_Messages.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "id": "STD_Fault_Messages", - "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", - "schema": { - "$schema": "https://json-schema.org/draft/2019-09/schema", - "title": "STD_Fault_Messages", - "description": "Schema for job delivering fault messages from DMaaP Message Router", - "type": "object", - "properties": {}, - "additionalProperties": false - } -} \ No newline at end of file diff --git a/dmaap-mediator-producer/configs/type_config.json b/dmaap-mediator-producer/configs/type_config.json new file mode 100644 index 00000000..983d0f3d --- /dev/null +++ b/dmaap-mediator-producer/configs/type_config.json @@ -0,0 +1,9 @@ +{ + "types": + [ + { + "id": "STD_Fault_Messages", + "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages" + } + ] +} \ No newline at end of file diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 8a2784a9..9b7b1dd1 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -28,14 +28,12 @@ import ( ) type Config struct { - LogLevel string - InfoProducerSupervisionCallbackHost string - InfoProducerSupervisionCallbackPort int - InfoJobCallbackHost string - InfoJobCallbackPort int - InfoCoordinatorAddress string - MRHost string - MRPort int + LogLevel string + InfoProducerHost string + InfoProducerPort int + InfoCoordinatorAddress string + MRHost string + MRPort int } type ProducerRegistrationInfo struct { @@ -46,14 +44,12 @@ type ProducerRegistrationInfo struct { func New() *Config { return &Config{ - LogLevel: getEnv("LOG_LEVEL", "Info"), - InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""), - InfoProducerSupervisionCallbackPort: getEnvAsInt("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", 8085), - InfoJobCallbackHost: getEnv("INFO_JOB_CALLBACK_HOST", ""), - InfoJobCallbackPort: getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086), - InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), - MRHost: getEnv("MR_HOST", "http://message-router.onap"), - MRPort: getEnvAsInt("MR_PORT", 3904), + LogLevel: getEnv("LOG_LEVEL", "Info"), + InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""), + InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085), + InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), + MRHost: getEnv("MR_HOST", "http://message-router.onap"), + MRPort: getEnvAsInt("MR_PORT", 3904), } } diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index 10430272..0fcbdd3f 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -32,10 +32,8 @@ import ( func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Setenv("LOG_LEVEL", "Debug") - os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost") - os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8095") - os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost") - os.Setenv("INFO_JOB_CALLBACK_PORT", "8096") + os.Setenv("INFO_PRODUCER_HOST", "producerHost") + os.Setenv("INFO_PRODUCER_PORT", "8095") os.Setenv("INFO_COORD_ADDR", "infoCoordAddr") os.Setenv("MR_HOST", "mrHost") os.Setenv("MR_PORT", "3908") @@ -43,14 +41,12 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Clearenv() }) wantConfig := Config{ - LogLevel: "Debug", - InfoProducerSupervisionCallbackHost: "supervisionCallbackHost", - InfoProducerSupervisionCallbackPort: 8095, - InfoJobCallbackHost: "jobCallbackHost", - InfoJobCallbackPort: 8096, - InfoCoordinatorAddress: "infoCoordAddr", - MRHost: "mrHost", - MRPort: 3908, + LogLevel: "Debug", + InfoProducerHost: "producerHost", + InfoProducerPort: 8095, + InfoCoordinatorAddress: "infoCoordAddr", + MRHost: "mrHost", + MRPort: 3908, } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) @@ -62,38 +58,34 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T var buf bytes.Buffer log.SetOutput(&buf) - os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "wrong") + os.Setenv("INFO_PRODUCER_PORT", "wrong") t.Cleanup(func() { log.SetOutput(os.Stderr) os.Clearenv() }) wantConfig := Config{ - LogLevel: "Info", - InfoProducerSupervisionCallbackHost: "", - InfoProducerSupervisionCallbackPort: 8085, - InfoJobCallbackHost: "", - InfoJobCallbackPort: 8086, - InfoCoordinatorAddress: "http://enrichmentservice:8083", - MRHost: "http://message-router.onap", - MRPort: 3904, + LogLevel: "Info", + InfoProducerHost: "", + InfoProducerPort: 8085, + InfoCoordinatorAddress: "http://enrichmentservice:8083", + MRHost: "http://message-router.onap", + MRPort: 3904, } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) } logString := buf.String() - assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_SUPERVISION_CALLBACK_PORT. Default value: 8085 will be used") + assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used") } func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) { wantConfig := Config{ - LogLevel: "Info", - InfoProducerSupervisionCallbackHost: "", - InfoProducerSupervisionCallbackPort: 8085, - InfoJobCallbackHost: "", - InfoJobCallbackPort: 8086, - InfoCoordinatorAddress: "http://enrichmentservice:8083", - MRHost: "http://message-router.onap", - MRPort: 3904, + LogLevel: "Info", + InfoProducerHost: "", + InfoProducerPort: 8085, + InfoCoordinatorAddress: "http://enrichmentservice:8083", + MRHost: "http://message-router.onap", + MRPort: 3904, } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go index 37225eda..db46c544 100644 --- a/dmaap-mediator-producer/internal/config/registrator.go +++ b/dmaap-mediator-producer/internal/config/registrator.go @@ -33,9 +33,10 @@ import ( const registerTypePath = "/data-producer/v1/info-types/" const registerProducerPath = "/data-producer/v1/info-producers/" +const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}` type Registrator interface { - RegisterTypes(types []*jobs.Type) error + RegisterTypes(types []*jobs.TypeData) error RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) } @@ -49,9 +50,9 @@ func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl { } } -func (r RegistratorImpl) RegisterTypes(jobTypes []*jobs.Type) error { +func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error { for _, jobType := range jobTypes { - body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema) + body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema) if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil { return error } diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go index a89c43f8..353e9de8 100644 --- a/dmaap-mediator-producer/internal/config/registrator_test.go +++ b/dmaap-mediator-producer/internal/config/registrator_test.go @@ -43,11 +43,10 @@ func TestRegisterTypes(t *testing.T) { restclient.Client = &clientMock - type1 := jobs.Type{ + type1 := jobs.TypeData{ TypeId: "Type1", - Schema: `{"title": "Type 1"}`, } - types := []*jobs.Type{&type1} + types := []jobs.TypeData{type1} r := NewRegistratorImpl("http://localhost:9990") err := r.RegisterTypes(types) @@ -64,7 +63,7 @@ func TestRegisterTypes(t *testing.T) { assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path) assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) body, _ := ioutil.ReadAll(actualRequest.Body) - expectedBody := []byte(`{"info_job_data_schema": {"title": "Type 1"}}`) + expectedBody := []byte(`{"info_job_data_schema": {"type": "object","properties": {},"additionalProperties": false}}`) assertions.Equal(expectedBody, body) clientMock.AssertNumberOfCalls(t, "Do", 1) } diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index eec59c39..09d38916 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -24,28 +24,33 @@ import ( "encoding/json" "fmt" "os" - "path/filepath" - "strings" "sync" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) -type Type struct { - TypeId string `json:"id"` - DMaaPTopic string `json:"dmaapTopic"` - Schema string `json:"schema"` - Jobs map[string]JobInfo +type TypeDefinitions struct { + Types []TypeDefinition `json:"types"` +} +type TypeDefinition struct { + Id string `json:"id"` + DmaapTopicURL string `json:"dmaapTopicUrl"` +} + +type TypeData struct { + TypeId string `json:"id"` + DMaaPTopicURL string `json:"dmaapTopicUrl"` + Jobs map[string]JobInfo } type JobInfo struct { - Owner string `json:"owner"` - LastUpdated string `json:"last_updated"` - InfoJobIdentity string `json:"info_job_identity"` - TargetUri string `json:"target_uri"` - InfoJobData string `json:"info_job_data"` - InfoTypeIdentity string `json:"info_type_identity"` + Owner string `json:"owner"` + LastUpdated string `json:"last_updated"` + InfoJobIdentity string `json:"info_job_identity"` + TargetUri string `json:"target_uri"` + InfoJobData interface{} `json:"info_job_data"` + InfoTypeIdentity string `json:"info_type_identity"` } type JobHandler interface { @@ -53,10 +58,10 @@ type JobHandler interface { } var ( - mu sync.Mutex - typeDir = "configs" - Handler JobHandler - allJobs = make(map[string]Type) + mu sync.Mutex + configFile = "configs/type_config.json" + Handler JobHandler + allTypes = make(map[string]TypeData) ) func init() { @@ -73,8 +78,9 @@ func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { mu.Lock() defer mu.Unlock() if err := validateJobInfo(ji); err == nil { - jobs := allJobs[ji.InfoTypeIdentity].Jobs + jobs := allTypes[ji.InfoTypeIdentity].Jobs jobs[ji.InfoJobIdentity] = ji + log.Debug("Added job: ", ji) return nil } else { return err @@ -82,7 +88,7 @@ func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { } func validateJobInfo(ji JobInfo) error { - if _, ok := allJobs[ji.InfoTypeIdentity]; !ok { + if _, ok := allTypes[ji.InfoTypeIdentity]; !ok { return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity) } if ji.InfoJobIdentity == "" { @@ -95,25 +101,30 @@ func validateJobInfo(ji JobInfo) error { return nil } -func GetTypes() ([]*Type, error) { +func GetTypes() ([]TypeData, error) { mu.Lock() defer mu.Unlock() - types := make([]*Type, 0, 1) - err := filepath.Walk(typeDir, - func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if strings.Contains(path, ".json") { - if jobType, err := getType(path); err == nil { - types = append(types, jobType) - } - } - return nil - }) + types := make([]TypeData, 0, 1) + typeDefsByte, err := os.ReadFile(configFile) + if err != nil { + return nil, err + } + typeDefs := TypeDefinitions{} + err = json.Unmarshal(typeDefsByte, &typeDefs) if err != nil { return nil, err } + for _, typeDef := range typeDefs.Types { + typeInfo := TypeData{ + TypeId: typeDef.Id, + DMaaPTopicURL: typeDef.DmaapTopicURL, + Jobs: make(map[string]JobInfo), + } + if _, ok := allTypes[typeInfo.TypeId]; !ok { + allTypes[typeInfo.TypeId] = typeInfo + } + types = append(types, typeInfo) + } return types, nil } @@ -121,7 +132,7 @@ func GetSupportedTypes() []string { mu.Lock() defer mu.Unlock() supportedTypes := []string{} - for k := range allJobs { + for k := range allTypes { supportedTypes = append(supportedTypes, k) } return supportedTypes @@ -131,29 +142,6 @@ func AddJob(job JobInfo) error { return Handler.AddJob(job) } -func getType(path string) (*Type, error) { - if typeDefinition, err := os.ReadFile(path); err == nil { - var dat map[string]interface{} - if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil { - schema, _ := json.Marshal(dat["schema"]) - typeInfo := Type{ - TypeId: dat["id"].(string), - DMaaPTopic: dat["dmaapTopic"].(string), - Schema: string(schema), - Jobs: make(map[string]JobInfo), - } - if _, ok := allJobs[typeInfo.TypeId]; !ok { - allJobs[typeInfo.TypeId] = typeInfo - } - return &typeInfo, nil - } else { - return nil, marshalError - } - } else { - return nil, err - } -} - func RunJobs(mRAddress string) { for { pollAndDistributeMessages(mRAddress) @@ -161,9 +149,9 @@ func RunJobs(mRAddress string) { } func pollAndDistributeMessages(mRAddress string) { - for typeId, typeInfo := range allJobs { + for typeId, typeInfo := range allTypes { log.Debugf("Processing jobs for type: %v", typeId) - messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic)) + messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL)) if error != nil { log.Warnf("Error getting data from MR. Cause: %v", error) continue @@ -172,7 +160,7 @@ func pollAndDistributeMessages(mRAddress string) { } } -func distributeMessages(messages []byte, typeInfo Type) { +func distributeMessages(messages []byte, typeInfo TypeData) { if len(messages) > 2 { mu.Lock() for _, jobInfo := range typeInfo.Jobs { @@ -190,5 +178,5 @@ func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) { } func clearAll() { - allJobs = make(map[string]Type) + allTypes = make(map[string]TypeData) } diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index b53d85e6..3bb25787 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -26,16 +26,15 @@ import ( "net/http" "os" "path/filepath" + "sync" "testing" "time" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" - "oransc.org/nonrtric/dmaapmediatorproducer/mocks" ) -const typeDefinition = `{"id": "type1", "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", "schema": {"title": "Type 1"}}` +const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) @@ -47,19 +46,18 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes os.RemoveAll(typesDir) clearAll() }) - typeDir = typesDir - fname := filepath.Join(typesDir, "type1.json") + fname := filepath.Join(typesDir, "type_config.json") + configFile = fname if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { - t.Errorf("Unable to create temporary files for types due to: %v", err) + t.Errorf("Unable to create temporary config file for types due to: %v", err) } types, err := GetTypes() - wantedType := Type{ - TypeId: "type1", - DMaaPTopic: "unauthenticated.SEC_FAULT_OUTPUT", - Schema: `{"title":"Type 1"}`, - Jobs: make(map[string]JobInfo), + wantedType := TypeData{ + TypeId: "type1", + DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + Jobs: make(map[string]JobInfo), } - wantedTypes := []*Type{&wantedType} + wantedTypes := []TypeData{wantedType} assertions.EqualValues(wantedTypes, types) assertions.Nil(err) @@ -77,7 +75,7 @@ func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { InfoJobData: "{}", InfoTypeIdentity: "type1", } - allJobs["type1"] = Type{ + allTypes["type1"] = TypeData{ TypeId: "type1", Jobs: map[string]JobInfo{"job1": wantedJob}, } @@ -87,8 +85,8 @@ func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { err := AddJob(wantedJob) assertions.Nil(err) - assertions.Equal(1, len(allJobs["type1"].Jobs)) - assertions.Equal(wantedJob, allJobs["type1"].Jobs["job1"]) + assertions.Equal(1, len(allTypes["type1"].Jobs)) + assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"]) } func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { @@ -104,7 +102,7 @@ func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allJobs["type1"] = Type{ + allTypes["type1"] = TypeData{ TypeId: "type1", } t.Cleanup(func() { @@ -116,12 +114,12 @@ func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { err := AddJob(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required job identity: { type1}", err.Error()) + assertions.Equal("missing required job identity: { type1}", err.Error()) } func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allJobs["type1"] = Type{ + allTypes["type1"] = TypeData{ TypeId: "type1", } jobInfo := JobInfo{ @@ -131,10 +129,9 @@ func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { err := AddJob(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required target URI: { job1 type1}", err.Error()) + assertions.Equal("missing required target URI: { job1 type1}", err.Error()) clearAll() } - func TestPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) jobInfo := JobInfo{ @@ -142,45 +139,84 @@ func TestPollAndDistributeMessages(t *testing.T) { InfoJobIdentity: "job1", TargetUri: "http://consumerHost/target", } - allJobs["type1"] = Type{ - TypeId: "type1", - DMaaPTopic: "topic", - Jobs: map[string]JobInfo{"job1": jobInfo}, + allTypes["type1"] = TypeData{ + TypeId: "type1", + DMaaPTopicURL: "topicUrl", + Jobs: map[string]JobInfo{"job1": jobInfo}, } t.Cleanup(func() { clearAll() }) - body := ioutil.NopCloser(bytes.NewReader([]byte(`[{"message": {"data": "data"}}]`))) - clientMock := mocks.HTTPClient{} - clientMock.On("Get", mock.Anything).Return(&http.Response{ - StatusCode: http.StatusOK, - Body: body, - }, nil) - - clientMock.On("Do", mock.Anything).Return(&http.Response{ - StatusCode: http.StatusOK, - }, nil) + wg := sync.WaitGroup{} + wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute + messages := `[{"message": {"data": "data"}}]` + clientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://mrAddr/topicUrl" { + assertions.Equal(req.Method, "GET") + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewReader([]byte(messages))), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } else if req.URL.String() == "http://consumerHost/target" { + assertions.Equal(req.Method, "POST") + assertions.Equal(messages, getBodyAsString(req)) + assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) - restclient.Client = &clientMock + restclient.Client = clientMock pollAndDistributeMessages("http://mrAddr") - time.Sleep(100 * time.Millisecond) - - var actualRequest *http.Request - clientMock.AssertCalled(t, "Get", "http://mrAddr/events/topic/users/dmaapmediatorproducer") - clientMock.AssertNumberOfCalls(t, "Get", 1) - - clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool { - actualRequest = req - return true - })) - assertions.Equal(http.MethodPost, actualRequest.Method) - assertions.Equal("consumerHost", actualRequest.URL.Host) - assertions.Equal("/target", actualRequest.URL.Path) - assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) - actualBody, _ := ioutil.ReadAll(actualRequest.Body) - assertions.Equal([]byte(`[{"message": {"data": "data"}}]`), actualBody) - clientMock.AssertNumberOfCalls(t, "Do", 1) + if waitTimeout(&wg, 100*time.Millisecond) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +type RoundTripFunc func(req *http.Request) *http.Response + +func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req), nil +} + +//NewTestClient returns *http.Client with Transport replaced to avoid making real calls +func NewTestClient(fn RoundTripFunc) *http.Client { + return &http.Client{ + Transport: RoundTripFunc(fn), + } +} + +// waitTimeout waits for the waitgroup for the specified max timeout. +// Returns true if waiting timed out. +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +} + +func getBodyAsString(req *http.Request) string { + buf := new(bytes.Buffer) + buf.ReadFrom(req.Body) + return buf.String() } diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go index c3a1331c..0b5e5b8c 100644 --- a/dmaap-mediator-producer/internal/server/server.go +++ b/dmaap-mediator-producer/internal/server/server.go @@ -29,8 +29,11 @@ import ( "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" ) +const StatusCallbackPath = "/status" +const JobsCallbackPath = "/jobs" + func StatusHandler(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/" { + if r.URL.Path != StatusCallbackPath { http.Error(w, "404 not found.", http.StatusNotFound) return } @@ -44,7 +47,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { } func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/producer_simulator/info_job" { + if r.URL.Path != JobsCallbackPath { http.Error(w, "404 not found.", http.StatusNotFound) return } @@ -68,14 +71,3 @@ func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest) } } - -func CreateServer(port int, handlerFunc func(http.ResponseWriter, *http.Request)) *http.Server { - - mux := http.NewServeMux() - mux.HandleFunc("/", handlerFunc) - server := http.Server{ - Addr: fmt.Sprintf(":%v", port), // :{port} - Handler: mux, - } - return &server -} diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index 444deba4..a4b19c43 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -51,7 +51,7 @@ func TestStatusHandler(t *testing.T) { name: "StatusHandler with correct path and method, should return OK", args: args{ responseRecorder: httptest.NewRecorder(), - r: newRequest("GET", "/", nil, t), + r: newRequest("GET", "/status", nil, t), }, wantedStatus: http.StatusOK, wantedBody: "All is well!", @@ -69,7 +69,7 @@ func TestStatusHandler(t *testing.T) { name: "StatusHandler with incorrect method, should return MethodNotAllowed", args: args{ responseRecorder: httptest.NewRecorder(), - r: newRequest("PUT", "/", nil, t), + r: newRequest("PUT", "/status", nil, t), }, wantedStatus: http.StatusMethodNotAllowed, wantedBody: "Method is not supported.\n", @@ -119,7 +119,7 @@ func TestCreateInfoJobHandler(t *testing.T) { name: "CreateInfoJobHandler with correct path and method, should return OK", args: args{ responseRecorder: httptest.NewRecorder(), - r: newRequest("POST", "/producer_simulator/info_job", &goodJobInfo, t), + r: newRequest("POST", "/jobs", &goodJobInfo, t), }, wantedStatus: http.StatusOK, wantedBody: "", @@ -128,7 +128,7 @@ func TestCreateInfoJobHandler(t *testing.T) { name: "CreateInfoJobHandler with incorrect job info, should return BadRequest", args: args{ responseRecorder: httptest.NewRecorder(), - r: newRequest("POST", "/producer_simulator/info_job", &badJobInfo, t), + r: newRequest("POST", "/jobs", &badJobInfo, t), }, wantedStatus: http.StatusBadRequest, wantedBody: "Invalid job info. Cause: error", @@ -146,7 +146,7 @@ func TestCreateInfoJobHandler(t *testing.T) { name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed", args: args{ responseRecorder: httptest.NewRecorder(), - r: newRequest("PUT", "/producer_simulator/info_job", nil, t), + r: newRequest("PUT", "/jobs", nil, t), }, wantedStatus: http.StatusMethodNotAllowed, wantedBody: "Method is not supported.", diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 47e12e9a..79fcb6b5 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -22,6 +22,7 @@ package main import ( "fmt" + "net/http" "sync" log "github.com/sirupsen/logrus" @@ -31,8 +32,7 @@ import ( ) var configuration *config.Config -var supervisionCallbackAddress string -var jobInfoCallbackAddress string +var callbackAddress string func init() { configuration = config.New() @@ -43,15 +43,10 @@ func init() { } log.Debug("Initializing DMaaP Mediator Producer") - if configuration.InfoProducerSupervisionCallbackHost == "" { + if configuration.InfoProducerHost == "" { log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST") } - supervisionCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerSupervisionCallbackHost, configuration.InfoProducerSupervisionCallbackPort) - - if configuration.InfoJobCallbackHost == "" { - log.Fatal("Missing INFO_JOB_CALLBACK_HOST") - } - jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort) + callbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort) registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress) if types, err := jobs.GetTypes(); err == nil { @@ -62,9 +57,9 @@ func init() { log.Fatalf("Unable to get types to register due to: %v", err) } producer := config.ProducerRegistrationInfo{ - InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress, + InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusCallbackPath, SupportedInfoTypes: jobs.GetSupportedTypes(), - InfoJobCallbackUrl: jobInfoCallbackAddress, + InfoJobCallbackUrl: callbackAddress + server.JobsCallbackPath, } if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { log.Fatalf("Unable to register producer due to: %v", err) @@ -75,19 +70,14 @@ func main() { log.Debug("Starting DMaaP Mediator Producer") wg := new(sync.WaitGroup) - // add two goroutines to `wg` WaitGroup, one for each avilable server - wg.Add(3) - - log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort) - go func() { - server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler) - log.Warn(server.ListenAndServe()) - wg.Done() - }() + // add two goroutines to `wg` WaitGroup, one for each running go routine + wg.Add(2) + log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort) go func() { - server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler) - log.Warn(server.ListenAndServe()) + http.HandleFunc(server.StatusCallbackPath, server.StatusHandler) + http.HandleFunc(server.JobsCallbackPath, server.CreateInfoJobHandler) + log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), nil)) wg.Done() }() diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go index 25421ae6..144f56f3 100644 --- a/dmaap-mediator-producer/simulator/consumersimulator.go +++ b/dmaap-mediator-producer/simulator/consumersimulator.go @@ -21,20 +21,43 @@ package main import ( + "encoding/json" + "flag" "fmt" "io" http "net/http" + + "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) +func main() { + port := flag.Int("port", 40935, "The port this consumer will listen on") + flag.Parse() + http.HandleFunc("/jobs", handleData) + + fmt.Print("Starting consumer on port: ", *port) + http.ListenAndServe(fmt.Sprintf(":%v", *port), nil) + registerJob(*port) +} + +func registerJob(port int) { + jobInfo := struct { + JobOwner string `json:"job_owner"` + JobResultUri string `json:"job_result_uri"` + InfoTypeId string `json:"info_type_id"` + JobDefinition string `json:"job_definition"` + }{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"} + fmt.Print("Registering consumer: ", jobInfo) + body, _ := json.Marshal(jobInfo) + putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body) + if putErr != nil { + fmt.Printf("Unable to register consumer: %v", putErr) + } +} + func handleData(w http.ResponseWriter, req *http.Request) { defer req.Body.Close() if reqData, err := io.ReadAll(req.Body); err == nil { - fmt.Printf("Consumer received body: %v\n", string(reqData)) + fmt.Print("Consumer received body: ", string(reqData)) } } - -func main() { - http.HandleFunc("/jobs", handleData) - - http.ListenAndServe(":40935", nil) -} -- 2.16.6