From: Henrik Andersson Date: Thu, 21 Oct 2021 14:36:44 +0000 (+0000) Subject: Merge "NONRTRIC - Enrichment Coordinator Service, Changed error codes" X-Git-Tag: 1.2.0~70 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=f1cee0f81c6bc482f73182c8f4c903e8376381e8;hp=4cd9f067e6985a28c045dd672bd86581b8b5b1a9;p=nonrtric.git Merge "NONRTRIC - Enrichment Coordinator Service, Changed error codes" --- diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 9b7b1dd1..dfd2505b 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -28,7 +28,7 @@ import ( ) type Config struct { - LogLevel string + LogLevel log.Level InfoProducerHost string InfoProducerPort int InfoCoordinatorAddress string @@ -36,15 +36,9 @@ type Config struct { MRPort int } -type ProducerRegistrationInfo struct { - InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"` - SupportedInfoTypes []string `json:"supported_info_types"` - InfoJobCallbackUrl string `json:"info_job_callback_url"` -} - func New() *Config { return &Config{ - LogLevel: getEnv("LOG_LEVEL", "Info"), + LogLevel: getLogLevel(), InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""), InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085), InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), @@ -71,3 +65,13 @@ func getEnvAsInt(name string, defaultVal int) int { return defaultVal } + +func getLogLevel() log.Level { + logLevelStr := getEnv("LOG_LEVEL", "Info") + if loglevel, err := log.ParseLevel(logLevelStr); err == nil { + return loglevel + } else { + log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr) + return log.InfoLevel + } +} diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index 0fcbdd3f..fc64e575 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -31,6 +31,7 @@ import ( ) func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { + assertions := require.New(t) os.Setenv("LOG_LEVEL", "Debug") os.Setenv("INFO_PRODUCER_HOST", "producerHost") os.Setenv("INFO_PRODUCER_PORT", "8095") @@ -41,16 +42,16 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Clearenv() }) wantConfig := Config{ - LogLevel: "Debug", + LogLevel: log.DebugLevel, InfoProducerHost: "producerHost", InfoProducerPort: 8095, InfoCoordinatorAddress: "infoCoordAddr", MRHost: "mrHost", MRPort: 3908, } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + got := New() + + assertions.Equal(&wantConfig, got) } func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) { @@ -64,7 +65,7 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T os.Clearenv() }) wantConfig := Config{ - LogLevel: "Info", + LogLevel: log.InfoLevel, InfoProducerHost: "", InfoProducerPort: 8085, InfoCoordinatorAddress: "http://enrichmentservice:8083", @@ -78,16 +79,29 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used") } -func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) { +func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) { + assertions := require.New(t) + var buf bytes.Buffer + log.SetOutput(&buf) + + os.Setenv("LOG_LEVEL", "wrong") + t.Cleanup(func() { + log.SetOutput(os.Stderr) + os.Clearenv() + }) + wantConfig := Config{ - LogLevel: "Info", + LogLevel: log.InfoLevel, InfoProducerHost: "", InfoProducerPort: 8085, InfoCoordinatorAddress: "http://enrichmentservice:8083", MRHost: "http://message-router.onap", MRPort: 3904, } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + + got := New() + + assertions.Equal(&wantConfig, got) + logString := buf.String() + assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!") } diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go index db46c544..83ed43f2 100644 --- a/dmaap-mediator-producer/internal/config/registrator.go +++ b/dmaap-mediator-producer/internal/config/registrator.go @@ -27,7 +27,6 @@ import ( log "github.com/sirupsen/logrus" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) @@ -35,25 +34,38 @@ const registerTypePath = "/data-producer/v1/info-types/" const registerProducerPath = "/data-producer/v1/info-producers/" const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}` +type TypeDefinition struct { + Id string `json:"id"` + DmaapTopicURL string `json:"dmaapTopicUrl"` +} + +type ProducerRegistrationInfo struct { + InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"` + SupportedInfoTypes []string `json:"supported_info_types"` + InfoJobCallbackUrl string `json:"info_job_callback_url"` +} + type Registrator interface { - RegisterTypes(types []*jobs.TypeData) error + RegisterTypes(types []TypeDefinition) error RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) } type RegistratorImpl struct { infoCoordinatorAddress string + httpClient restclient.HTTPClient } -func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl { +func NewRegistratorImpl(infoCoordAddr string, client restclient.HTTPClient) *RegistratorImpl { return &RegistratorImpl{ infoCoordinatorAddress: infoCoordAddr, + httpClient: client, } } -func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error { +func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error { for _, jobType := range jobTypes { body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema) - if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil { + if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Id), []byte(body), r.httpClient); error != nil { return error } log.Debugf("Registered type: %v", jobType) @@ -63,7 +75,7 @@ func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error { func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error { if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil { - if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil { + if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body), r.httpClient); putErr != nil { return putErr } log.Debugf("Registered producer: %v", producerId) diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go index 353e9de8..2cffa2c3 100644 --- a/dmaap-mediator-producer/internal/config/registrator_test.go +++ b/dmaap-mediator-producer/internal/config/registrator_test.go @@ -27,28 +27,24 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" - "oransc.org/nonrtric/dmaapmediatorproducer/mocks" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient" ) func TestRegisterTypes(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} + clientMock := httpclient.HTTPClient{} clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusCreated, }, nil) - restclient.Client = &clientMock - - type1 := jobs.TypeData{ - TypeId: "Type1", + type1 := TypeDefinition{ + Id: "Type1", } - types := []jobs.TypeData{type1} + types := []TypeDefinition{type1} - r := NewRegistratorImpl("http://localhost:9990") + r := NewRegistratorImpl("http://localhost:9990", &clientMock) err := r.RegisterTypes(types) assertions.Nil(err) @@ -71,21 +67,19 @@ func TestRegisterTypes(t *testing.T) { func TestRegisterProducer(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} + clientMock := httpclient.HTTPClient{} clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusCreated, }, nil) - restclient.Client = &clientMock - producer := ProducerRegistrationInfo{ InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl", SupportedInfoTypes: []string{"type1"}, InfoJobCallbackUrl: "jobCallbackUrl", } - r := NewRegistratorImpl("http://localhost:9990") + r := NewRegistratorImpl("http://localhost:9990", &clientMock) err := r.RegisterProducer("Producer1", &producer) assertions.Nil(err) diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index e5a1070b..7b21b002 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -27,17 +27,10 @@ import ( "sync" log "github.com/sirupsen/logrus" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) -type TypeDefinitions struct { - Types []TypeDefinition `json:"types"` -} -type TypeDefinition struct { - Id string `json:"id"` - DmaapTopicURL string `json:"dmaapTopicUrl"` -} - type TypeData struct { TypeId string `json:"id"` DMaaPTopicURL string `json:"dmaapTopicUrl"` @@ -53,33 +46,38 @@ type JobInfo struct { InfoTypeIdentity string `json:"info_type_identity"` } +type JobTypeHandler interface { + GetTypes() ([]config.TypeDefinition, error) + GetSupportedTypes() []string +} + type JobHandler interface { AddJob(JobInfo) error DeleteJob(jobId string) } -var ( - mu sync.Mutex - configFile = "configs/type_config.json" - Handler JobHandler - allTypes = make(map[string]TypeData) -) - -func init() { - Handler = newJobHandlerImpl() +type JobHandlerImpl struct { + mu sync.Mutex + configFile string + allTypes map[string]TypeData + pollClient restclient.HTTPClient + distributeClient restclient.HTTPClient } -type jobHandlerImpl struct{} - -func newJobHandlerImpl() *jobHandlerImpl { - return &jobHandlerImpl{} +func NewJobHandlerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *JobHandlerImpl { + return &JobHandlerImpl{ + configFile: typeConfigFilePath, + allTypes: make(map[string]TypeData), + pollClient: pollClient, + distributeClient: distributeClient, + } } -func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { - mu.Lock() - defer mu.Unlock() - if err := validateJobInfo(ji); err == nil { - jobs := allTypes[ji.InfoTypeIdentity].Jobs +func (jh *JobHandlerImpl) AddJob(ji JobInfo) error { + jh.mu.Lock() + defer jh.mu.Unlock() + if err := jh.validateJobInfo(ji); err == nil { + jobs := jh.allTypes[ji.InfoTypeIdentity].Jobs jobs[ji.InfoJobIdentity] = ji log.Debug("Added job: ", ji) return nil @@ -88,17 +86,17 @@ func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { } } -func (jh *jobHandlerImpl) DeleteJob(jobId string) { - mu.Lock() - defer mu.Unlock() - for _, typeData := range allTypes { +func (jh *JobHandlerImpl) DeleteJob(jobId string) { + jh.mu.Lock() + defer jh.mu.Unlock() + for _, typeData := range jh.allTypes { delete(typeData.Jobs, jobId) } log.Debug("Deleted job: ", jobId) } -func validateJobInfo(ji JobInfo) error { - if _, ok := allTypes[ji.InfoTypeIdentity]; !ok { +func (jh *JobHandlerImpl) validateJobInfo(ji JobInfo) error { + if _, ok := jh.allTypes[ji.InfoTypeIdentity]; !ok { return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity) } if ji.InfoJobIdentity == "" { @@ -111,86 +109,75 @@ func validateJobInfo(ji JobInfo) error { return nil } -func GetTypes() ([]TypeData, error) { - mu.Lock() - defer mu.Unlock() - types := make([]TypeData, 0, 1) - typeDefsByte, err := os.ReadFile(configFile) +func (jh *JobHandlerImpl) GetTypes() ([]config.TypeDefinition, error) { + jh.mu.Lock() + defer jh.mu.Unlock() + typeDefsByte, err := os.ReadFile(jh.configFile) if err != nil { return nil, err } - typeDefs := TypeDefinitions{} + typeDefs := struct { + Types []config.TypeDefinition `json:"types"` + }{} err = json.Unmarshal(typeDefsByte, &typeDefs) if err != nil { return nil, err } for _, typeDef := range typeDefs.Types { - typeInfo := TypeData{ + jh.allTypes[typeDef.Id] = TypeData{ TypeId: typeDef.Id, DMaaPTopicURL: typeDef.DmaapTopicURL, Jobs: make(map[string]JobInfo), } - if _, ok := allTypes[typeInfo.TypeId]; !ok { - allTypes[typeInfo.TypeId] = typeInfo - } - types = append(types, typeInfo) } - return types, nil + return typeDefs.Types, nil } -func GetSupportedTypes() []string { - mu.Lock() - defer mu.Unlock() +func (jh *JobHandlerImpl) GetSupportedTypes() []string { + jh.mu.Lock() + defer jh.mu.Unlock() supportedTypes := []string{} - for k := range allTypes { + for k := range jh.allTypes { supportedTypes = append(supportedTypes, k) } return supportedTypes } -func AddJob(job JobInfo) error { - return Handler.AddJob(job) -} - -func DeleteJob(jobId string) { - Handler.DeleteJob(jobId) -} - -func RunJobs(mRAddress string) { +func (jh *JobHandlerImpl) RunJobs(mRAddress string) { for { - pollAndDistributeMessages(mRAddress) + jh.pollAndDistributeMessages(mRAddress) } } -func pollAndDistributeMessages(mRAddress string) { - for typeId, typeInfo := range allTypes { +func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) { + jh.mu.Lock() + defer jh.mu.Unlock() + for typeId, typeInfo := range jh.allTypes { log.Debugf("Processing jobs for type: %v", typeId) - messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL)) + messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient) if error != nil { log.Warnf("Error getting data from MR. Cause: %v", error) continue } - distributeMessages(messagesBody, typeInfo) + jh.distributeMessages(messagesBody, typeInfo) } } -func distributeMessages(messages []byte, typeInfo TypeData) { +func (jh *JobHandlerImpl) distributeMessages(messages []byte, typeInfo TypeData) { if len(messages) > 2 { - mu.Lock() for _, jobInfo := range typeInfo.Jobs { - go sendMessagesToConsumer(messages, jobInfo) + go jh.sendMessagesToConsumer(messages, jobInfo) } - mu.Unlock() } } -func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) { +func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) { log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity) - if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil { + if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil { log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr) } } -func clearAll() { - allTypes = make(map[string]TypeData) +func (jh *JobHandlerImpl) clearAll() { + jh.allTypes = make(map[string]TypeData) } diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 6f292272..6ca39b27 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -31,7 +31,7 @@ import ( "time" "github.com/stretchr/testify/require" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" ) const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` @@ -42,31 +42,31 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes if err != nil { t.Errorf("Unable to create temporary directory for types due to: %v", err) } + fname := filepath.Join(typesDir, "type_config.json") + handlerUnderTest := NewJobHandlerImpl(fname, nil, nil) t.Cleanup(func() { os.RemoveAll(typesDir) - clearAll() + handlerUnderTest.clearAll() }) - fname := filepath.Join(typesDir, "type_config.json") - configFile = fname if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { t.Errorf("Unable to create temporary config file for types due to: %v", err) } - types, err := GetTypes() - wantedType := TypeData{ - TypeId: "type1", - DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", - Jobs: make(map[string]JobInfo), + types, err := handlerUnderTest.GetTypes() + wantedType := config.TypeDefinition{ + Id: "type1", + DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", } - wantedTypes := []TypeData{wantedType} + wantedTypes := []config.TypeDefinition{wantedType} assertions.EqualValues(wantedTypes, types) assertions.Nil(err) - supportedTypes := GetSupportedTypes() + supportedTypes := handlerUnderTest.GetSupportedTypes() assertions.EqualValues([]string{"type1"}, supportedTypes) } func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { assertions := require.New(t) + handlerUnderTest := NewJobHandlerImpl("", nil, nil) wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", @@ -75,66 +75,72 @@ func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { InfoJobData: "{}", InfoTypeIdentity: "type1", } - allTypes["type1"] = TypeData{ + handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", Jobs: map[string]JobInfo{"job1": wantedJob}, } t.Cleanup(func() { - clearAll() + handlerUnderTest.clearAll() }) - err := AddJob(wantedJob) + err := handlerUnderTest.AddJob(wantedJob) assertions.Nil(err) - assertions.Equal(1, len(allTypes["type1"].Jobs)) - assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"]) + assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs)) + assertions.Equal(wantedJob, handlerUnderTest.allTypes["type1"].Jobs["job1"]) } func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { assertions := require.New(t) + handlerUnderTest := NewJobHandlerImpl("", nil, nil) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } - err := AddJob(jobInfo) + err := handlerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("type not supported: type1", err.Error()) } func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allTypes["type1"] = TypeData{ + handlerUnderTest := NewJobHandlerImpl("", nil, nil) + handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } t.Cleanup(func() { - clearAll() + handlerUnderTest.clearAll() }) + jobInfo := JobInfo{ InfoTypeIdentity: "type1", } - - err := AddJob(jobInfo) + err := handlerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("missing required job identity: { type1}", err.Error()) } func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allTypes["type1"] = TypeData{ + handlerUnderTest := NewJobHandlerImpl("", nil, nil) + handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } + t.Cleanup(func() { + handlerUnderTest.clearAll() + }) + jobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", } - - err := AddJob(jobInfo) + err := handlerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("missing required target URI: { job1 type1}", err.Error()) - clearAll() } func TestDeleteJob(t *testing.T) { assertions := require.New(t) + handlerUnderTest := NewJobHandlerImpl("", nil, nil) jobToKeep := JobInfo{ InfoJobIdentity: "job1", InfoTypeIdentity: "type1", @@ -143,52 +149,44 @@ func TestDeleteJob(t *testing.T) { InfoJobIdentity: "job2", InfoTypeIdentity: "type1", } - allTypes["type1"] = TypeData{ + handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", Jobs: map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete}, } t.Cleanup(func() { - clearAll() + handlerUnderTest.clearAll() }) - DeleteJob("job2") - assertions.Equal(1, len(allTypes["type1"].Jobs)) - assertions.Equal(jobToKeep, allTypes["type1"].Jobs["job1"]) + handlerUnderTest.DeleteJob("job2") + assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs)) + assertions.Equal(jobToKeep, handlerUnderTest.allTypes["type1"].Jobs["job1"]) } func TestPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) - jobInfo := JobInfo{ - InfoTypeIdentity: "type1", - InfoJobIdentity: "job1", - TargetUri: "http://consumerHost/target", - } - allTypes["type1"] = TypeData{ - TypeId: "type1", - DMaaPTopicURL: "topicUrl", - Jobs: map[string]JobInfo{"job1": jobInfo}, - } - t.Cleanup(func() { - clearAll() - }) wg := sync.WaitGroup{} - wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute messages := `[{"message": {"data": "data"}}]` - clientMock := NewTestClient(func(req *http.Request) *http.Response { + pollClientMock := NewTestClient(func(req *http.Request) *http.Response { if req.URL.String() == "http://mrAddr/topicUrl" { assertions.Equal(req.Method, "GET") - wg.Done() + wg.Done() // Signal that the poll call has been made return &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader([]byte(messages))), Header: make(http.Header), // Must be set to non-nil value or it panics } - } else if req.URL.String() == "http://consumerHost/target" { + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/target" { assertions.Equal(req.Method, "POST") assertions.Equal(messages, getBodyAsString(req)) assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type")) - wg.Done() + wg.Done() // Signal that the distribution call has been made return &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), @@ -199,10 +197,23 @@ func TestPollAndDistributeMessages(t *testing.T) { t.Fail() return nil }) + handlerUnderTest := NewJobHandlerImpl("", pollClientMock, distributeClientMock) + jobInfo := JobInfo{ + InfoTypeIdentity: "type1", + InfoJobIdentity: "job1", + TargetUri: "http://consumerHost/target", + } + handlerUnderTest.allTypes["type1"] = TypeData{ + TypeId: "type1", + DMaaPTopicURL: "topicUrl", + Jobs: map[string]JobInfo{"job1": jobInfo}, + } + t.Cleanup(func() { + handlerUnderTest.clearAll() + }) - restclient.Client = clientMock - - pollAndDistributeMessages("http://mrAddr") + wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute + handlerUnderTest.pollAndDistributeMessages("http://mrAddr") if waitTimeout(&wg, 100*time.Millisecond) { t.Error("Not all calls to server were made") diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go index a783f7e6..2b3a0cf3 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go @@ -43,47 +43,35 @@ func (pe RequestError) Error() string { return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body)) } -var ( - Client HTTPClient -) - -func init() { - Client = &http.Client{} -} - -func Get(url string) ([]byte, error) { - if response, err := Client.Get(url); err == nil { - defer response.Body.Close() - if responseData, err := io.ReadAll(response.Body); err == nil { - if isResponseSuccess(response.StatusCode) { +func Get(url string, client HTTPClient) ([]byte, error) { + if response, err := client.Get(url); err == nil { + if isResponseSuccess(response.StatusCode) { + defer response.Body.Close() + if responseData, err := io.ReadAll(response.Body); err == nil { return responseData, nil } else { - requestError := RequestError{ - StatusCode: response.StatusCode, - Body: responseData, - } - return nil, requestError + return nil, err } } else { - return nil, err + return nil, getRequestError(response) } } else { return nil, err } } -func Put(url string, body []byte) error { - return do(http.MethodPut, url, body) +func Put(url string, body []byte, client HTTPClient) error { + return do(http.MethodPut, url, body, client) } -func Post(url string, body []byte) error { - return do(http.MethodPost, url, body) +func Post(url string, body []byte, client HTTPClient) error { + return do(http.MethodPost, url, body, client) } -func do(method string, url string, body []byte) error { +func do(method string, url string, body []byte, client HTTPClient) error { if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil { req.Header.Set("Content-Type", "application/json; charset=utf-8") - if response, respErr := Client.Do(req); respErr == nil { + if response, respErr := client.Do(req); respErr == nil { if isResponseSuccess(response.StatusCode) { return nil } else { diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go index 3727e6a3..7fe3cc7a 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go @@ -23,58 +23,55 @@ package restclient import ( "bytes" "errors" + "fmt" "io/ioutil" "net/http" - "reflect" "testing" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "oransc.org/nonrtric/dmaapmediatorproducer/mocks" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient" ) -func TestGet(t *testing.T) { - clientMock := mocks.HTTPClient{} - - clientMock.On("Get", "http://testOk").Return(&http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader([]byte("Response"))), - }, nil) - - clientMock.On("Get", "http://testNotOk").Return(&http.Response{ +func TestRequestError_Error(t *testing.T) { + assertions := require.New(t) + actualError := RequestError{ StatusCode: http.StatusBadRequest, - Body: ioutil.NopCloser(bytes.NewReader([]byte("Bad Response"))), - }, nil) - - clientMock.On("Get", "http://testError").Return(nil, errors.New("Failed Request")) - - Client = &clientMock - + Body: []byte("error"), + } + assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error()) +} +func TestGet(t *testing.T) { + assertions := require.New(t) type args struct { - url string + url string + mockReturnStatus int + mockReturnBody string + mockReturnError error } tests := []struct { name string args args want []byte - wantErr bool wantedError error }{ { name: "Test Get with OK response", args: args{ - url: "http://testOk", + url: "http://testOk", + mockReturnStatus: http.StatusOK, + mockReturnBody: "Response", }, - want: []byte("Response"), - wantErr: false, + want: []byte("Response"), }, { name: "Test Get with Not OK response", args: args{ - url: "http://testNotOk", + url: "http://testNotOk", + mockReturnStatus: http.StatusBadRequest, + mockReturnBody: "Bad Response", }, - want: nil, - wantErr: true, + want: nil, wantedError: RequestError{ StatusCode: http.StatusBadRequest, Body: []byte("Bad Response"), @@ -83,40 +80,38 @@ func TestGet(t *testing.T) { { name: "Test Get with error", args: args{ - url: "http://testError", + url: "http://testError", + mockReturnError: errors.New("Failed Request"), }, want: nil, - wantErr: true, wantedError: errors.New("Failed Request"), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := Get(tt.args.url) - if (err != nil) != tt.wantErr { - t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("Get() = %v, want %v", got, tt.want) - } - if tt.wantErr && err.Error() != tt.wantedError.Error() { - t.Errorf("Get() error = %v, wantedError % v", err, tt.wantedError.Error()) - } + clientMock := httpclient.HTTPClient{} + clientMock.On("Get", tt.args.url).Return(&http.Response{ + StatusCode: tt.args.mockReturnStatus, + Body: ioutil.NopCloser(bytes.NewReader([]byte(tt.args.mockReturnBody))), + }, tt.args.mockReturnError) + + got, err := Get(tt.args.url, &clientMock) + assertions.Equal(tt.wantedError, err, tt.name) + assertions.Equal(tt.want, got, tt.name) + clientMock.AssertCalled(t, "Get", tt.args.url) }) } } func TestPutOk(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} + clientMock := httpclient.HTTPClient{} clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusOK, }, nil) - Client = &clientMock - if err := Put("http://localhost:9990", []byte("body")); err != nil { + if err := Put("http://localhost:9990", []byte("body"), &clientMock); err != nil { t.Errorf("Put() error = %v, did not want error", err) } var actualRequest *http.Request @@ -134,31 +129,76 @@ func TestPutOk(t *testing.T) { clientMock.AssertNumberOfCalls(t, "Do", 1) } -func TestPutBadResponse(t *testing.T) { +func TestPostOk(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} + clientMock := httpclient.HTTPClient{} clientMock.On("Do", mock.Anything).Return(&http.Response{ - StatusCode: http.StatusBadRequest, - Body: ioutil.NopCloser(bytes.NewReader([]byte("Bad Request"))), + StatusCode: http.StatusOK, }, nil) - Client = &clientMock - err := Put("url", []byte("body")) - assertions.NotNil("Put() error = %v, wanted error", err) - expectedErrorMessage := "Request failed due to error response with status: 400 and body: Bad Request" - assertions.Equal(expectedErrorMessage, err.Error()) + if err := Post("http://localhost:9990", []byte("body"), &clientMock); err != nil { + t.Errorf("Put() error = %v, did not want error", err) + } + var actualRequest *http.Request + clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool { + actualRequest = req + return true + })) + assertions.Equal(http.MethodPost, actualRequest.Method) + assertions.Equal("http", actualRequest.URL.Scheme) + assertions.Equal("localhost:9990", actualRequest.URL.Host) + assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + body, _ := ioutil.ReadAll(actualRequest.Body) + expectedBody := []byte("body") + assertions.Equal(expectedBody, body) + clientMock.AssertNumberOfCalls(t, "Do", 1) } -func TestPutError(t *testing.T) { +func Test_doErrorCases(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} - - clientMock.On("Do", mock.Anything).Return(nil, errors.New("Failed Request")) - - Client = &clientMock - err := Put("url", []byte("body")) - assertions.NotNil("Put() error = %v, wanted error", err) - expectedErrorMessage := "Failed Request" - assertions.Equal(expectedErrorMessage, err.Error()) + type args struct { + url string + mockReturnStatus int + mockReturnBody []byte + mockReturnError error + } + tests := []struct { + name string + args args + wantErr error + }{ + { + name: "Bad request should get RequestError", + args: args{ + url: "badRequest", + mockReturnStatus: http.StatusBadRequest, + mockReturnBody: []byte("bad request"), + mockReturnError: nil, + }, + wantErr: RequestError{ + StatusCode: http.StatusBadRequest, + Body: []byte("bad request"), + }, + }, + { + name: "Server unavailable should get error", + args: args{ + url: "serverUnavailable", + mockReturnError: fmt.Errorf("Server unavailable"), + }, + wantErr: fmt.Errorf("Server unavailable"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientMock := httpclient.HTTPClient{} + clientMock.On("Do", mock.Anything).Return(&http.Response{ + StatusCode: tt.args.mockReturnStatus, + Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)), + }, tt.args.mockReturnError) + err := do("PUT", tt.args.url, nil, &clientMock) + assertions.Equal(tt.wantErr, err, tt.name) + }) + } } diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go index d07b7309..5861cbe7 100644 --- a/dmaap-mediator-producer/internal/server/server.go +++ b/dmaap-mediator-producer/internal/server/server.go @@ -35,19 +35,32 @@ const AddJobPath = "/jobs" const jobIdToken = "infoJobId" const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}" -func NewRouter() *mux.Router { +type ProducerCallbackHandler struct { + jobHandler jobs.JobHandler +} + +func NewProducerCallbackHandler(jh jobs.JobHandler) *ProducerCallbackHandler { + return &ProducerCallbackHandler{ + jobHandler: jh, + } +} + +func NewRouter(jh jobs.JobHandler) *mux.Router { + callbackHandler := NewProducerCallbackHandler(jh) r := mux.NewRouter() r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status") - r.HandleFunc(AddJobPath, addInfoJobHandler).Methods(http.MethodPost).Name("add") - r.HandleFunc(deleteJobPath, deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete") + r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add") + r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete") r.NotFoundHandler = ¬FoundHandler{} r.MethodNotAllowedHandler = &methodNotAllowedHandler{} return r } -func statusHandler(w http.ResponseWriter, r *http.Request) {} +func statusHandler(w http.ResponseWriter, r *http.Request) { + // Just respond OK to show the server is alive for now. Might be extended later. +} -func addInfoJobHandler(w http.ResponseWriter, r *http.Request) { +func (h *ProducerCallbackHandler) addInfoJobHandler(w http.ResponseWriter, r *http.Request) { b, readErr := ioutil.ReadAll(r.Body) if readErr != nil { http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest) @@ -58,12 +71,12 @@ func addInfoJobHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest) return } - if err := jobs.AddJob(jobInfo); err != nil { + if err := h.jobHandler.AddJob(jobInfo); err != nil { http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest) } } -func deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) { +func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id, ok := vars[jobIdToken] if !ok { @@ -71,7 +84,7 @@ func deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) { return } - jobs.DeleteJob(id) + h.jobHandler.DeleteJob(id) } type notFoundHandler struct{} diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index 59439143..08885077 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -39,7 +39,7 @@ import ( func TestNewRouter(t *testing.T) { assertions := require.New(t) - r := NewRouter() + r := NewRouter(nil) statusRoute := r.Get("status") assertions.NotNil(statusRoute) supportedMethods, err := statusRoute.GetMethods() @@ -63,7 +63,6 @@ func TestNewRouter(t *testing.T) { responseRecorder := httptest.NewRecorder() handler.ServeHTTP(responseRecorder, newRequest("GET", "/wrong", nil, t)) assertions.Equal(http.StatusNotFound, responseRecorder.Code) - assertions.Contains(responseRecorder.Body.String(), "404 not found.") methodNotAllowedHandler := r.MethodNotAllowedHandler @@ -71,7 +70,6 @@ func TestNewRouter(t *testing.T) { responseRecorder = httptest.NewRecorder() handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t)) assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code) - assertions.Contains(responseRecorder.Body.String(), "Method is not supported.") } @@ -88,51 +86,39 @@ func TestStatusHandler(t *testing.T) { func TestAddInfoJobHandler(t *testing.T) { assertions := require.New(t) - jobHandlerMock := jobhandler.JobHandler{} - - goodJobInfo := jobs.JobInfo{ - Owner: "owner", - LastUpdated: "now", - InfoJobIdentity: "jobId", - TargetUri: "target", - InfoJobData: "{}", - InfoTypeIdentity: "type", - } - badJobInfo := jobs.JobInfo{ - Owner: "bad", - } - jobHandlerMock.On("AddJob", goodJobInfo).Return(nil) - jobHandlerMock.On("AddJob", badJobInfo).Return(errors.New("error")) - jobs.Handler = &jobHandlerMock type args struct { - responseRecorder *httptest.ResponseRecorder - r *http.Request + job jobs.JobInfo + mockReturn error } tests := []struct { name string args args wantedStatus int wantedBody string - assertFunc assertMockFunk }{ { name: "AddInfoJobHandler with correct path and method, should return OK", args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest(http.MethodPost, "/jobs", &goodJobInfo, t), + job: jobs.JobInfo{ + Owner: "owner", + LastUpdated: "now", + InfoJobIdentity: "jobId", + TargetUri: "target", + InfoJobData: "{}", + InfoTypeIdentity: "type", + }, }, wantedStatus: http.StatusOK, wantedBody: "", - assertFunc: func(mock *jobhandler.JobHandler) { - mock.AssertCalled(t, "AddJob", goodJobInfo) - }, }, { name: "AddInfoJobHandler with incorrect job info, should return BadRequest", args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest(http.MethodPost, "/jobs", &badJobInfo, t), + job: jobs.JobInfo{ + Owner: "bad", + }, + mockReturn: errors.New("error"), }, wantedStatus: http.StatusBadRequest, wantedBody: "Invalid job info. Cause: error", @@ -140,15 +126,19 @@ func TestAddInfoJobHandler(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - handler := http.HandlerFunc(addInfoJobHandler) - handler.ServeHTTP(tt.args.responseRecorder, tt.args.r) - assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code, tt.name) + jobHandlerMock := jobhandler.JobHandler{} + jobHandlerMock.On("AddJob", tt.args.job).Return(tt.args.mockReturn) + callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock) - assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody, tt.name) + handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler) + responseRecorder := httptest.NewRecorder() + r := newRequest(http.MethodPost, "/jobs", &tt.args.job, t) - if tt.assertFunc != nil { - tt.assertFunc(&jobHandlerMock) - } + handler.ServeHTTP(responseRecorder, r) + + assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name) + assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name) + jobHandlerMock.AssertCalled(t, "AddJob", tt.args.job) }) } } @@ -158,11 +148,11 @@ func TestDeleteJob(t *testing.T) { jobHandlerMock := jobhandler.JobHandler{} jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil) - jobs.Handler = &jobHandlerMock + callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock) responseRecorder := httptest.NewRecorder() r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"}) - handler := http.HandlerFunc(deleteInfoJobHandler) + handler := http.HandlerFunc(callbackHandlerUnderTest.deleteInfoJobHandler) handler.ServeHTTP(responseRecorder, r) assertions.Equal(http.StatusOK, responseRecorder.Result().StatusCode) @@ -171,8 +161,6 @@ func TestDeleteJob(t *testing.T) { jobHandlerMock.AssertCalled(t, "DeleteJob", "job1") } -type assertMockFunk func(mock *jobhandler.JobHandler) - func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request { var body io.Reader if jobInfo != nil { diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 15207ec7..beeb995d 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -24,49 +24,45 @@ import ( "fmt" "net/http" "sync" + "time" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" "oransc.org/nonrtric/dmaapmediatorproducer/internal/server" ) +const timeoutHTTPClient = time.Second * 5 +const timeoutPollClient = time.Second * 15 + var configuration *config.Config -var callbackAddress string +var httpClient restclient.HTTPClient +var jobHandler *jobs.JobHandlerImpl func init() { configuration = config.New() - if loglevel, err := log.ParseLevel(configuration.LogLevel); err == nil { - log.SetLevel(loglevel) - } else { - log.Warnf("Invalid log level: %v. Log level will be Info!", configuration.LogLevel) - } +} +func main() { + log.SetLevel(configuration.LogLevel) log.Debug("Initializing DMaaP Mediator Producer") - if configuration.InfoProducerHost == "" { - log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST") + if err := validateConfiguration(configuration); err != nil { + log.Fatalf("Stopping producer due to error: %v", err) } - callbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort) + callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort) - registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress) - if types, err := jobs.GetTypes(); err == nil { - if regErr := registrator.RegisterTypes(types); regErr != nil { - log.Fatalf("Unable to register all types due to: %v", regErr) - } - } else { - log.Fatalf("Unable to get types to register due to: %v", err) + httpClient = &http.Client{ + Timeout: timeoutHTTPClient, } - producer := config.ProducerRegistrationInfo{ - InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath, - SupportedInfoTypes: jobs.GetSupportedTypes(), - InfoJobCallbackUrl: callbackAddress + server.AddJobPath, + pollClient := &http.Client{ + Timeout: timeoutPollClient, } - if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { - log.Fatalf("Unable to register producer due to: %v", err) + jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", pollClient, httpClient) + if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil { + log.Fatalf("Stopping producer due to: %v", err) } -} -func main() { log.Debug("Starting DMaaP Mediator Producer") wg := new(sync.WaitGroup) @@ -75,13 +71,13 @@ func main() { log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort) go func() { - r := server.NewRouter() + r := server.NewRouter(jobHandler) log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r)) wg.Done() }() go func() { - jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort)) + jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort)) wg.Done() }() @@ -89,3 +85,30 @@ func main() { wg.Wait() log.Debug("Stopping DMaaP Mediator Producer") } + +func validateConfiguration(configuration *config.Config) error { + if configuration.InfoProducerHost == "" { + return fmt.Errorf("missing INFO_PRODUCER_HOST") + } + return nil +} + +func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error { + registrator := config.NewRegistratorImpl(infoCoordinatorAddress, httpClient) + if types, err := jobHandler.GetTypes(); err == nil { + if regErr := registrator.RegisterTypes(types); regErr != nil { + return fmt.Errorf("unable to register all types due to: %v", regErr) + } + } else { + return fmt.Errorf("unable to get types to register due to: %v", err) + } + producer := config.ProducerRegistrationInfo{ + InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath, + SupportedInfoTypes: jobHandler.GetSupportedTypes(), + InfoJobCallbackUrl: callbackAddress + server.AddJobPath, + } + if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { + return fmt.Errorf("unable to register producer due to: %v", err) + } + return nil +} diff --git a/dmaap-mediator-producer/mocks/HTTPClient.go b/dmaap-mediator-producer/mocks/httpclient/HTTPClient.go similarity index 94% rename from dmaap-mediator-producer/mocks/HTTPClient.go rename to dmaap-mediator-producer/mocks/httpclient/HTTPClient.go index 3037798b..ab399dfb 100644 --- a/dmaap-mediator-producer/mocks/HTTPClient.go +++ b/dmaap-mediator-producer/mocks/httpclient/HTTPClient.go @@ -1,6 +1,6 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.9.3. DO NOT EDIT. -package mocks +package httpclient import ( http "net/http" diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go index 144f56f3..03da6f4e 100644 --- a/dmaap-mediator-producer/simulator/consumersimulator.go +++ b/dmaap-mediator-producer/simulator/consumersimulator.go @@ -26,18 +26,25 @@ import ( "fmt" "io" http "net/http" + "time" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) +var httpClient http.Client + func main() { + httpClient = http.Client{ + Timeout: time.Second * 5, + } port := flag.Int("port", 40935, "The port this consumer will listen on") flag.Parse() http.HandleFunc("/jobs", handleData) + registerJob(*port) + fmt.Print("Starting consumer on port: ", *port) http.ListenAndServe(fmt.Sprintf(":%v", *port), nil) - registerJob(*port) } func registerJob(port int) { @@ -49,7 +56,7 @@ func registerJob(port int) { }{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"} fmt.Print("Registering consumer: ", jobInfo) body, _ := json.Marshal(jobInfo) - putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body) + putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient) if putErr != nil { fmt.Printf("Unable to register consumer: %v", putErr) } diff --git a/onap/oran b/onap/oran index 8efef051..3b916e4d 160000 --- a/onap/oran +++ b/onap/oran @@ -1 +1 @@ -Subproject commit 8efef0513821e7a586de4831a982833050be03e8 +Subproject commit 3b916e4dc5777863cb4ee873b41ee460fb9aec27 diff --git a/test/common/test_env-onap-istanbul.sh b/test/common/test_env-onap-istanbul.sh index 4982e192..829da8fb 100644 --- a/test/common/test_env-onap-istanbul.sh +++ b/test/common/test_env-onap-istanbul.sh @@ -69,17 +69,17 @@ NEXUS_RELEASE_REPO_ONAP=$NEXUS_RELEASE_REPO # Policy Agent image and tags POLICY_AGENT_IMAGE_BASE="onap/ccsdk-oran-a1policymanagementservice" -POLICY_AGENT_IMAGE_TAG_LOCAL="1.2.2-SNAPSHOT" -POLICY_AGENT_IMAGE_TAG_REMOTE_SNAPSHOT="1.2-SNAPSHOT" -POLICY_AGENT_IMAGE_TAG_REMOTE="1.2.2-STAGING-latest" #Will use snapshot repo -POLICY_AGENT_IMAGE_TAG_REMOTE_RELEASE="1.2.1" +POLICY_AGENT_IMAGE_TAG_LOCAL="1.3.0-SNAPSHOT" +POLICY_AGENT_IMAGE_TAG_REMOTE_SNAPSHOT="1.3.0-SNAPSHOT" +POLICY_AGENT_IMAGE_TAG_REMOTE="1.3.0-STAGING-latest" #Will use snapshot repo +POLICY_AGENT_IMAGE_TAG_REMOTE_RELEASE="1.3.0" # SDNC A1 Controller remote image and tag SDNC_A1_CONTROLLER_IMAGE_BASE="onap/sdnc-image" -SDNC_A1_CONTROLLER_IMAGE_TAG_LOCAL="2.2.0-SNAPSHOT" ###CHECK THIS -SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE_SNAPSHOT="2.2.0-STAGING-latest" -SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE="2.2.0-STAGING-latest" #Will use snapshot repo -SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE_RELEASE="2.2.0" +SDNC_A1_CONTROLLER_IMAGE_TAG_LOCAL="2.2.1-SNAPSHOT" ###CHECK THIS +SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE_SNAPSHOT="2.2.1-STAGING-latest" +SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE="2.2.1-STAGING-latest" #Will use snapshot repo +SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE_RELEASE="2.2.1" #SDNC DB remote image and tag #The DB is part of SDNC so handled in the same way as SDNC diff --git a/test/common/test_env-oran-e-release.sh b/test/common/test_env-oran-e-release.sh index 0f6e977b..9b83044f 100755 --- a/test/common/test_env-oran-e-release.sh +++ b/test/common/test_env-oran-e-release.sh @@ -125,9 +125,9 @@ RAPP_CAT_IMAGE_TAG_REMOTE_RELEASE="1.1.0" # Near RT RIC Simulator image and tags - same version as cherry RIC_SIM_IMAGE_BASE="o-ran-sc/a1-simulator" RIC_SIM_IMAGE_TAG_LOCAL="latest" -RIC_SIM_IMAGE_TAG_REMOTE_SNAPSHOT="2.1.0-SNAPSHOT" -RIC_SIM_IMAGE_TAG_REMOTE="2.1.0" -RIC_SIM_IMAGE_TAG_REMOTE_RELEASE="2.1.0" +RIC_SIM_IMAGE_TAG_REMOTE_SNAPSHOT="2.2.0-SNAPSHOT" +RIC_SIM_IMAGE_TAG_REMOTE="2.2.0" +RIC_SIM_IMAGE_TAG_REMOTE_RELEASE="2.2.0" #Consul remote image and tag diff --git a/test/simulator-group/policy_agent/application.yaml b/test/simulator-group/policy_agent/application.yaml index 75e42004..275ad3f2 100644 --- a/test/simulator-group/policy_agent/application.yaml +++ b/test/simulator-group/policy_agent/application.yaml @@ -55,6 +55,8 @@ app: filepath: /opt/app/policy-agent/data/application_configuration.json # path where the service can store data vardata-directory: /var/policy-management-service + # path to json schema for config validation + config-file-schema-path: /application_configuration_schema.json webclient: # Configuration of the trust store used for the HTTP client (outgoing requests) # The file location and the password for the truststore is only relevant if trust-store-used == true diff --git a/test/simulator-group/ric/app.yaml b/test/simulator-group/ric/app.yaml index a4040394..c1397306 100644 --- a/test/simulator-group/ric/app.yaml +++ b/test/simulator-group/ric/app.yaml @@ -28,6 +28,8 @@ spec: value: "1" - name: ALLOW_HTTP value: "true" + - name: DUPLICATE_CHECK + value: "1" imagePullPolicy: $KUBE_IMAGE_PULL_POLICY ports: - name: http diff --git a/test/simulator-group/ric/docker-compose.yml b/test/simulator-group/ric/docker-compose.yml index 5a04ce94..37fad42d 100644 --- a/test/simulator-group/ric/docker-compose.yml +++ b/test/simulator-group/ric/docker-compose.yml @@ -35,6 +35,7 @@ services: - A1_VERSION=${G1_A1_VERSION} - REMOTE_HOSTS_LOGGING=1 - ALLOW_HTTP=true + - DUPLICATE_CHECK=1 volumes: - ${RIC_SIM_CERT_MOUNT_DIR}:/usr/src/app/cert:ro labels: @@ -52,6 +53,7 @@ services: - A1_VERSION=${G2_A1_VERSION} - REMOTE_HOSTS_LOGGING=1 - ALLOW_HTTP=true + - DUPLICATE_CHECK=1 volumes: - ${RIC_SIM_CERT_MOUNT_DIR}:/usr/src/app/cert:ro labels: @@ -69,6 +71,7 @@ services: - A1_VERSION=${G3_A1_VERSION} - REMOTE_HOSTS_LOGGING=1 - ALLOW_HTTP=true + - DUPLICATE_CHECK=1 volumes: - ${RIC_SIM_CERT_MOUNT_DIR}:/usr/src/app/cert:ro labels: diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/config/config_test.go b/test/usecases/oruclosedlooprecovery/goversion/internal/config/config_test.go index e278e60c..a5b16242 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/config/config_test.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/config/config_test.go @@ -23,7 +23,6 @@ package config import ( "bytes" "os" - "reflect" "testing" log "github.com/sirupsen/logrus" @@ -31,6 +30,7 @@ import ( ) func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { + assertions := require.New(t) os.Setenv("LOG_LEVEL", "Debug") os.Setenv("CONSUMER_HOST", "consumerHost") os.Setenv("CONSUMER_PORT", "8095") @@ -54,9 +54,9 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { SDNPassword: "pwd", ORUToODUMapFile: "file", } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + + got := New() + assertions.Equal(&wantConfig, got) } func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) { @@ -80,9 +80,10 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T SDNPassword: "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U", ORUToODUMapFile: "o-ru-to-o-du-map.csv", } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + + got := New() + assertions.Equal(&wantConfig, got) + logString := buf.String() assertions.Contains(logString, "Invalid int value: wrong for variable: CONSUMER_PORT. Default value: 0 will be used") } @@ -108,12 +109,8 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) { SDNPassword: "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U", ORUToODUMapFile: "o-ru-to-o-du-map.csv", } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + got := New() + assertions.Equal(&wantConfig, got) logString := buf.String() assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!") } diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go b/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go index c21371a3..558d0d78 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go @@ -32,7 +32,6 @@ import ( ) type Configuration struct { - ConsumerAddress string InfoCoordAddress string SDNRAddress string SDNRUser string @@ -40,18 +39,19 @@ type Configuration struct { } const rawSdnrPath = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=[O-DU-ID]/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection=[O-RU-ID]" - const unlockMessage = `{"o-ran-sc-du-hello-world:du-to-ru-connection": [{"name":"[O-RU-ID]","administrative-state":"UNLOCKED"}]}` type LinkFailureHandler struct { lookupService repository.LookupService config Configuration + client restclient.HTTPClient } -func NewLinkFailureHandler(ls repository.LookupService, conf Configuration) *LinkFailureHandler { +func NewLinkFailureHandler(ls repository.LookupService, conf Configuration, client restclient.HTTPClient) *LinkFailureHandler { return &LinkFailureHandler{ lookupService: ls, config: conf, + client: client, } } @@ -74,7 +74,7 @@ func (lfh LinkFailureHandler) sendUnlockMessage(oRuId string) { if oDuId, err := lfh.lookupService.GetODuID(oRuId); err == nil { sdnrPath := getSdnrPath(oRuId, oDuId) unlockMessage := lfh.getUnlockMessage(oRuId) - if error := restclient.Put(lfh.config.SDNRAddress+sdnrPath, unlockMessage, lfh.config.SDNRUser, lfh.config.SDNRPassword); error == nil { + if error := restclient.Put(lfh.config.SDNRAddress+sdnrPath, unlockMessage, lfh.client, lfh.config.SDNRUser, lfh.config.SDNRPassword); error == nil { log.Debugf("Sent unlock message for O-RU: %v to O-DU: %v.", oRuId, oDuId) } else { log.Warn(error) diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler_test.go b/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler_test.go index 9653c993..a3df7044 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler_test.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler_test.go @@ -34,7 +34,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "oransc.org/usecase/oruclosedloop/internal/repository" - "oransc.org/usecase/oruclosedloop/internal/restclient" "oransc.org/usecase/oruclosedloop/internal/ves" "oransc.org/usecase/oruclosedloop/mocks" ) @@ -55,8 +54,6 @@ func Test_MessagesHandlerWithLinkFailure(t *testing.T) { StatusCode: http.StatusOK, }, nil) - restclient.Client = &clientMock - lookupServiceMock := mocks.LookupService{} lookupServiceMock.On("GetODuID", mock.Anything).Return("HCL-O-DU-1122", nil) @@ -65,7 +62,7 @@ func Test_MessagesHandlerWithLinkFailure(t *testing.T) { SDNRAddress: "http://localhost:9990", SDNRUser: "admin", SDNRPassword: "pwd", - }) + }, &clientMock) responseRecorder := httptest.NewRecorder() r := newRequest(http.MethodPost, "/", getFaultMessage("ERICSSON-O-RU-11220", "CRITICAL"), t) @@ -122,7 +119,7 @@ func Test_MessagesHandlerWithClearLinkFailure(t *testing.T) { lookupServiceMock.On("GetODuID", mock.Anything).Return("HCL-O-DU-1122", nil) - handlerUnderTest := NewLinkFailureHandler(&lookupServiceMock, Configuration{}) + handlerUnderTest := NewLinkFailureHandler(&lookupServiceMock, Configuration{}, nil) responseRecorder := httptest.NewRecorder() r := newRequest(http.MethodPost, "/", getFaultMessage("ERICSSON-O-RU-11220", "NORMAL"), t) @@ -151,7 +148,7 @@ func Test_MessagesHandlerWithLinkFailureUnmappedORU(t *testing.T) { Id: "ERICSSON-O-RU-11220", }) - handlerUnderTest := NewLinkFailureHandler(&lookupServiceMock, Configuration{}) + handlerUnderTest := NewLinkFailureHandler(&lookupServiceMock, Configuration{}, nil) responseRecorder := httptest.NewRecorder() r := newRequest(http.MethodPost, "/", getFaultMessage("ERICSSON-O-RU-11220", "CRITICAL"), t) diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/repository/csvhelp.go b/test/usecases/oruclosedlooprecovery/goversion/internal/repository/csvhelp.go index ccaff8b7..c5afda7d 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/repository/csvhelp.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/repository/csvhelp.go @@ -31,11 +31,11 @@ type CsvFileHelper interface { type CsvFileHelperImpl struct{} -func NewCsvFileHelper() CsvFileHelperImpl { +func NewCsvFileHelperImpl() CsvFileHelperImpl { return CsvFileHelperImpl{} } -func (h *CsvFileHelperImpl) GetCsvFromFile(name string) ([][]string, error) { +func (h CsvFileHelperImpl) GetCsvFromFile(name string) ([][]string, error) { if csvFile, err := os.Open(name); err == nil { defer csvFile.Close() reader := csv.NewReader(csvFile) diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/repository/csvhelp_test.go b/test/usecases/oruclosedlooprecovery/goversion/internal/repository/csvhelp_test.go index dfd29cc9..ddcc229b 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/repository/csvhelp_test.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/repository/csvhelp_test.go @@ -22,52 +22,46 @@ package repository import ( "os" - "reflect" "testing" + + "github.com/stretchr/testify/require" ) func TestCsvFileHelperImpl_GetCsvFromFile(t *testing.T) { + assertions := require.New(t) filePath := createTempCsvFile() defer os.Remove(filePath) type args struct { name string } tests := []struct { - name string - fileHelper *CsvFileHelperImpl - args args - want [][]string - wantErr bool + name string + args args + want [][]string + wantErrString string }{ { - name: "Read from file should return array of content", - fileHelper: &CsvFileHelperImpl{}, + name: "Read from file should return array of content", args: args{ name: filePath, }, - want: [][]string{{"O-RU-ID", "O-DU-ID"}}, - wantErr: false, + want: [][]string{{"O-RU-ID", "O-DU-ID"}}, }, { - name: "File missing should return error", - fileHelper: &CsvFileHelperImpl{}, + name: "File missing should return error", args: args{ name: "nofile.csv", }, - want: nil, - wantErr: true, + wantErrString: "open nofile.csv: no such file or directory", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - h := &CsvFileHelperImpl{} + h := NewCsvFileHelperImpl() got, err := h.GetCsvFromFile(tt.args.name) - if (err != nil) != tt.wantErr { - t.Errorf("CsvFileHelperImpl.GetCsvFromFile() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("CsvFileHelperImpl.GetCsvFromFile() = %v, want %v", got, tt.want) + assertions.Equal(tt.want, got) + if tt.wantErrString != "" { + assertions.Contains(err.Error(), tt.wantErrString) } }) } diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/repository/lookupservice_test.go b/test/usecases/oruclosedlooprecovery/goversion/internal/repository/lookupservice_test.go index 58194569..8cfdbdc4 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/repository/lookupservice_test.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/repository/lookupservice_test.go @@ -22,13 +22,23 @@ package repository import ( "errors" - "reflect" "testing" + "github.com/stretchr/testify/require" "oransc.org/usecase/oruclosedloop/mocks" ) +func TestIdNotMappedError(t *testing.T) { + assertions := require.New(t) + + actualError := IdNotMappedError{ + Id: "1", + } + assertions.Equal("O-RU-ID: 1 not mapped.", actualError.Error()) +} + func TestNewLookupServiceImpl(t *testing.T) { + assertions := require.New(t) mockCsvFileHelper := &mocks.CsvFileHelper{} type args struct { fileHelper CsvFileHelper @@ -54,66 +64,61 @@ func TestNewLookupServiceImpl(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewLookupServiceImpl(tt.args.fileHelper, tt.args.fileName); !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewLookupServiceImpl() = %v, want %v", got, tt.want) - } + got := NewLookupServiceImpl(tt.args.fileHelper, tt.args.fileName) + assertions.Equal(tt.want, got) }) } } func TestLookupServiceImpl_Init(t *testing.T) { - mockCsvFileHelper := &mocks.CsvFileHelper{} - mockCsvFileHelper.On("GetCsvFromFile", "./map.csv").Return([][]string{{"O-RU-ID", "O-DU-ID"}}, nil).Once() - mockCsvFileHelper.On("GetCsvFromFile", "foo.csv").Return(nil, errors.New("Error")).Once() - type fields struct { - csvFileHelper CsvFileHelper + assertions := require.New(t) + type args struct { csvFileName string - oRuIdToODuIdMap map[string]string + mockReturn [][]string + mockReturnError error } tests := []struct { - name string - fields fields - wantErr bool + name string + args args + wantedORuIdToODuIdMap map[string]string + wantErr error }{ { - name: "Init with proper csv file should not return error", - fields: fields{ - csvFileHelper: mockCsvFileHelper, - csvFileName: "./map.csv", - oRuIdToODuIdMap: map[string]string{}}, - wantErr: false, + name: "Init with proper csv file should not return error and map should be initialized", + args: args{ + csvFileName: "./map.csv", + mockReturn: [][]string{{"O-RU-ID", "O-DU-ID"}}, + }, + wantedORuIdToODuIdMap: map[string]string{"O-RU-ID": "O-DU-ID"}, }, { - name: "Init with missing file should return error", - fields: fields{ - csvFileHelper: mockCsvFileHelper, + name: "Init with missing file should return error and map should not be initialized", + args: args{ csvFileName: "foo.csv", - oRuIdToODuIdMap: map[string]string{}, + mockReturnError: errors.New("Error"), }, - wantErr: true, + wantedORuIdToODuIdMap: map[string]string{}, + wantErr: errors.New("Error"), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := LookupServiceImpl{ - csvFileHelper: tt.fields.csvFileHelper, - csvFileName: tt.fields.csvFileName, - oRuIdToODuIdMap: tt.fields.oRuIdToODuIdMap, - } - if err := s.Init(); (err != nil) != tt.wantErr { - t.Errorf("LookupServiceImpl.Init() error = %v, wantErr %v", err, tt.wantErr) - } else if !tt.wantErr { - wantedMap := map[string]string{"O-RU-ID": "O-DU-ID"} - if !reflect.DeepEqual(wantedMap, s.oRuIdToODuIdMap) { - t.Errorf("LookupServiceImpl.Init() map not initialized, wanted map: %v, got map: %v", wantedMap, s.oRuIdToODuIdMap) - } - } + mockCsvFileHelper := &mocks.CsvFileHelper{} + mockCsvFileHelper.On("GetCsvFromFile", tt.args.csvFileName).Return(tt.args.mockReturn, tt.args.mockReturnError) + + s := NewLookupServiceImpl(mockCsvFileHelper, tt.args.csvFileName) + + err := s.Init() + + assertions.Equal(tt.wantErr, err, tt.name) + assertions.Equal(tt.wantedORuIdToODuIdMap, s.oRuIdToODuIdMap) + mockCsvFileHelper.AssertNumberOfCalls(t, "GetCsvFromFile", 1) }) } - mockCsvFileHelper.AssertNumberOfCalls(t, "GetCsvFromFile", 2) } func TestLookupServiceImpl_GetODuID(t *testing.T) { + assertions := require.New(t) type fields struct { csvFileHelper CsvFileHelper csvFileName string @@ -139,8 +144,7 @@ func TestLookupServiceImpl_GetODuID(t *testing.T) { args: args{ oRuId: "O-RU-ID", }, - want: "O-DU-ID", - wantErr: nil, + want: "O-DU-ID", }, { name: "Id not mapped should return IdNotMappedError", @@ -152,7 +156,6 @@ func TestLookupServiceImpl_GetODuID(t *testing.T) { args: args{ oRuId: "O-RU-ID", }, - want: "", wantErr: IdNotMappedError{Id: "O-RU-ID"}, }, } @@ -163,14 +166,11 @@ func TestLookupServiceImpl_GetODuID(t *testing.T) { csvFileName: tt.fields.csvFileName, oRuIdToODuIdMap: tt.fields.oRuIdToODuIdMap, } + got, err := s.GetODuID(tt.args.oRuId) - if err != tt.wantErr { - t.Errorf("LookupServiceImpl.GetODuID() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("LookupServiceImpl.GetODuID() = %v, want %v", got, tt.want) - } + + assertions.Equal(tt.wantErr, err, tt.name) + assertions.Equal(tt.want, got, tt.name) }) } } diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go b/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go index 310a4493..7932bfac 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go @@ -25,7 +25,6 @@ import ( "fmt" "io" "net/http" - "time" ) type RequestError struct { @@ -33,10 +32,6 @@ type RequestError struct { Body []byte } -func (pe RequestError) Error() string { - return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body)) -} - // HTTPClient interface type HTTPClient interface { Get(url string) (*http.Response, error) @@ -44,46 +39,23 @@ type HTTPClient interface { Do(*http.Request) (*http.Response, error) } -var ( - Client HTTPClient -) - -func init() { - Client = &http.Client{ - Timeout: time.Second * 5, - } -} - -func Get(url string) ([]byte, error) { - if response, err := Client.Get(url); err == nil { - if isResponseSuccess(response.StatusCode) { - defer response.Body.Close() - if responseData, err := io.ReadAll(response.Body); err == nil { - return responseData, nil - } else { - return nil, err - } - } else { - return nil, getResponseError(response) - } - } else { - return nil, err - } +func (pe RequestError) Error() string { + return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body)) } -func PutWithoutAuth(url string, body []byte) error { - return do(http.MethodPut, url, body) +func PutWithoutAuth(url string, body []byte, client HTTPClient) error { + return do(http.MethodPut, url, body, client) } -func Put(url string, body string, userName string, password string) error { - return do(http.MethodPut, url, []byte(body), userName, password) +func Put(url string, body string, client HTTPClient, userName string, password string) error { + return do(http.MethodPut, url, []byte(body), client, userName, password) } -func Delete(url string) error { - return do(http.MethodDelete, url, nil) +func Delete(url string, client HTTPClient) error { + return do(http.MethodDelete, url, nil, client) } -func do(method string, url string, body []byte, userInfo ...string) error { +func do(method string, url string, body []byte, client HTTPClient, userInfo ...string) error { if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil { if body != nil { req.Header.Set("Content-Type", "application/json; charset=utf-8") @@ -91,7 +63,7 @@ func do(method string, url string, body []byte, userInfo ...string) error { if len(userInfo) > 0 { req.SetBasicAuth(userInfo[0], userInfo[1]) } - if response, respErr := Client.Do(req); respErr == nil { + if response, respErr := client.Do(req); respErr == nil { if isResponseSuccess(response.StatusCode) { return nil } else { diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go b/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go index 9b482b52..4b2f7fe4 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go @@ -41,73 +41,6 @@ func TestRequestError_Error(t *testing.T) { assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error()) } -func TestGet(t *testing.T) { - assertions := require.New(t) - - type args struct { - url string - mockReturnStatus int - mockReturnBody []byte - mockReturnError error - } - tests := []struct { - name string - args args - want []byte - wantErr error - }{ - { - name: "Ok response", - args: args{ - url: "ok", - mockReturnStatus: http.StatusOK, - mockReturnBody: []byte("body"), - mockReturnError: nil, - }, - want: []byte("body"), - wantErr: nil, - }, - { - name: "Bad request should get RequestError", - args: args{ - url: "badRequest", - mockReturnStatus: http.StatusBadRequest, - mockReturnBody: []byte("bad request"), - mockReturnError: nil, - }, - want: nil, - wantErr: RequestError{ - StatusCode: http.StatusBadRequest, - Body: []byte("bad request"), - }, - }, - { - name: "Server unavailable should get error", - args: args{ - url: "serverUnavailable", - mockReturnError: fmt.Errorf("Server unavailable"), - }, - want: nil, - wantErr: fmt.Errorf("Server unavailable"), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - clientMock := mocks.HTTPClient{} - clientMock.On("Get", tt.args.url).Return(&http.Response{ - StatusCode: tt.args.mockReturnStatus, - Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)), - }, tt.args.mockReturnError) - Client = &clientMock - - got, err := Get(tt.args.url) - assertions.Equal(tt.wantErr, err, tt.name) - assertions.Equal(tt.want, got, tt.name) - clientMock.AssertCalled(t, "Get", tt.args.url) - }) - } -} - func TestPutWithoutAuth(t *testing.T) { assertions := require.New(t) @@ -115,9 +48,8 @@ func TestPutWithoutAuth(t *testing.T) { clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusOK, }, nil) - Client = &clientMock - error := PutWithoutAuth("url", []byte("body")) + error := PutWithoutAuth("url", []byte("body"), &clientMock) assertions.Nil(error) var actualRequest *http.Request @@ -142,9 +74,8 @@ func TestPut(t *testing.T) { clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusOK, }, nil) - Client = &clientMock - error := Put("url", "body", "admin", "pwd") + error := Put("url", "body", &clientMock, "admin", "pwd") assertions.Nil(error) var actualRequest *http.Request @@ -171,9 +102,8 @@ func TestDelete(t *testing.T) { clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusOK, }, nil) - Client = &clientMock - error := Delete("url") + error := Delete("url", &clientMock) assertions.Nil(error) var actualRequest *http.Request @@ -232,9 +162,8 @@ func Test_doErrorCases(t *testing.T) { StatusCode: tt.args.mockReturnStatus, Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)), }, tt.args.mockReturnError) - Client = &clientMock - err := do("PUT", tt.args.url, nil) + err := do("PUT", tt.args.url, nil, &clientMock) assertions.Equal(tt.wantErr, err, tt.name) }) } diff --git a/test/usecases/oruclosedlooprecovery/goversion/main.go b/test/usecases/oruclosedlooprecovery/goversion/main.go index 5574f8cc..86da4d48 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/main.go +++ b/test/usecases/oruclosedlooprecovery/goversion/main.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "net/http" + "time" "github.com/gorilla/mux" log "github.com/sirupsen/logrus" @@ -33,40 +34,63 @@ import ( "oransc.org/usecase/oruclosedloop/internal/restclient" ) -var consumerConfig linkfailure.Configuration +const timeoutHTTPClient = time.Second * 5 +const jobId = "14e7bb84-a44d-44c1-90b7-6995a92ad43c" + +var infoCoordAddress string +var linkfailureConfig linkfailure.Configuration var lookupService repository.LookupService var host string var port string - -const jobId = "14e7bb84-a44d-44c1-90b7-6995a92ad43c" +var client restclient.HTTPClient func init() { configuration := config.New() + client = &http.Client{ + Timeout: timeoutHTTPClient, + } + log.SetLevel(configuration.LogLevel) - if configuration.ConsumerHost == "" || configuration.ConsumerPort == 0 { - log.Fatal("Consumer host and port must be provided!") + if err := validateConfiguration(configuration); err != nil { + log.Fatalf("Unable to start consumer due to: %v", err) } host = configuration.ConsumerHost port = fmt.Sprint(configuration.ConsumerPort) - csvFileHelper := repository.NewCsvFileHelper() - lookupService = repository.NewLookupServiceImpl(&csvFileHelper, configuration.ORUToODUMapFile) - if initErr := lookupService.Init(); initErr != nil { + csvFileHelper := repository.NewCsvFileHelperImpl() + if initErr := initializeLookupService(csvFileHelper, configuration); initErr != nil { log.Fatalf("Unable to create LookupService due to inability to get O-RU-ID to O-DU-ID map. Cause: %v", initErr) } - consumerConfig = linkfailure.Configuration{ - InfoCoordAddress: configuration.InfoCoordinatorAddress, - SDNRAddress: configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort), - SDNRUser: configuration.SDNRUser, - SDNRPassword: configuration.SDNPassword, + + infoCoordAddress = configuration.InfoCoordinatorAddress + + linkfailureConfig = linkfailure.Configuration{ + SDNRAddress: configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort), + SDNRUser: configuration.SDNRUser, + SDNRPassword: configuration.SDNPassword, + } +} + +func validateConfiguration(configuration *config.Config) error { + if configuration.ConsumerHost == "" || configuration.ConsumerPort == 0 { + return fmt.Errorf("consumer host and port must be provided") + } + return nil +} + +func initializeLookupService(csvFileHelper repository.CsvFileHelper, configuration *config.Config) error { + lookupService = repository.NewLookupServiceImpl(csvFileHelper, configuration.ORUToODUMapFile) + if initErr := lookupService.Init(); initErr != nil { + return initErr } + return nil } func main() { defer deleteJob() - messageHandler := linkfailure.NewLinkFailureHandler(lookupService, consumerConfig) + messageHandler := linkfailure.NewLinkFailureHandler(lookupService, linkfailureConfig, client) r := mux.NewRouter() r.HandleFunc("/", messageHandler.MessagesHandler).Methods(http.MethodPost) r.HandleFunc("/admin/start", startHandler).Methods(http.MethodPost) @@ -87,7 +111,7 @@ func startHandler(w http.ResponseWriter, r *http.Request) { JobDefinition: "{}", } body, _ := json.Marshal(jobRegistrationInfo) - putErr := restclient.PutWithoutAuth(consumerConfig.InfoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, body) + putErr := restclient.PutWithoutAuth(infoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, body, client) if putErr != nil { http.Error(w, fmt.Sprintf("Unable to register consumer job: %v", putErr), http.StatusBadRequest) return @@ -105,5 +129,5 @@ func stopHandler(w http.ResponseWriter, r *http.Request) { } func deleteJob() error { - return restclient.Delete(consumerConfig.InfoCoordAddress + "/data-consumer/v1/info-jobs/" + jobId) + return restclient.Delete(infoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, client) }