From 65a53d2388547247222d144b365401056bbdffc5 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Wed, 29 Sep 2021 15:41:26 +0200 Subject: [PATCH] Refactor Go code Issue-ID: NONRTRIC-606 Signed-off-by: elinuxhenrik Change-Id: Icbd97b8dbf0c3b015e2c864d4d4dd3581d5ade9b --- dmaap-mediator-producer/internal/config/config.go | 20 ++- .../internal/config/config_test.go | 34 +++-- .../internal/config/registrator.go | 24 ++- .../internal/config/registrator_test.go | 22 +-- dmaap-mediator-producer/internal/jobs/jobs.go | 127 ++++++++-------- dmaap-mediator-producer/internal/jobs/jobs_test.go | 112 +++++++------- .../internal/restclient/HTTPClient.go | 38 ++--- .../internal/restclient/HTTPClient_test.go | 162 +++++++++++++-------- dmaap-mediator-producer/internal/server/server.go | 29 +++- .../internal/server/server_test.go | 68 ++++----- dmaap-mediator-producer/main.go | 75 ++++++---- .../mocks/{ => httpclient}/HTTPClient.go | 4 +- .../simulator/consumersimulator.go | 11 +- 13 files changed, 403 insertions(+), 323 deletions(-) rename dmaap-mediator-producer/mocks/{ => httpclient}/HTTPClient.go (94%) diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 9b7b1dd1..dfd2505b 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -28,7 +28,7 @@ import ( ) type Config struct { - LogLevel string + LogLevel log.Level InfoProducerHost string InfoProducerPort int InfoCoordinatorAddress string @@ -36,15 +36,9 @@ type Config struct { MRPort int } -type ProducerRegistrationInfo struct { - InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"` - SupportedInfoTypes []string `json:"supported_info_types"` - InfoJobCallbackUrl string `json:"info_job_callback_url"` -} - func New() *Config { return &Config{ - LogLevel: getEnv("LOG_LEVEL", "Info"), + LogLevel: getLogLevel(), InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""), InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085), InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), @@ -71,3 +65,13 @@ func getEnvAsInt(name string, defaultVal int) int { return defaultVal } + +func getLogLevel() log.Level { + logLevelStr := getEnv("LOG_LEVEL", "Info") + if loglevel, err := log.ParseLevel(logLevelStr); err == nil { + return loglevel + } else { + log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr) + return log.InfoLevel + } +} diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index 0fcbdd3f..fc64e575 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -31,6 +31,7 @@ import ( ) func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { + assertions := require.New(t) os.Setenv("LOG_LEVEL", "Debug") os.Setenv("INFO_PRODUCER_HOST", "producerHost") os.Setenv("INFO_PRODUCER_PORT", "8095") @@ -41,16 +42,16 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Clearenv() }) wantConfig := Config{ - LogLevel: "Debug", + LogLevel: log.DebugLevel, InfoProducerHost: "producerHost", InfoProducerPort: 8095, InfoCoordinatorAddress: "infoCoordAddr", MRHost: "mrHost", MRPort: 3908, } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + got := New() + + assertions.Equal(&wantConfig, got) } func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) { @@ -64,7 +65,7 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T os.Clearenv() }) wantConfig := Config{ - LogLevel: "Info", + LogLevel: log.InfoLevel, InfoProducerHost: "", InfoProducerPort: 8085, InfoCoordinatorAddress: "http://enrichmentservice:8083", @@ -78,16 +79,29 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used") } -func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) { +func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) { + assertions := require.New(t) + var buf bytes.Buffer + log.SetOutput(&buf) + + os.Setenv("LOG_LEVEL", "wrong") + t.Cleanup(func() { + log.SetOutput(os.Stderr) + os.Clearenv() + }) + wantConfig := Config{ - LogLevel: "Info", + LogLevel: log.InfoLevel, InfoProducerHost: "", InfoProducerPort: 8085, InfoCoordinatorAddress: "http://enrichmentservice:8083", MRHost: "http://message-router.onap", MRPort: 3904, } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + + got := New() + + assertions.Equal(&wantConfig, got) + logString := buf.String() + assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!") } diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go index db46c544..83ed43f2 100644 --- a/dmaap-mediator-producer/internal/config/registrator.go +++ b/dmaap-mediator-producer/internal/config/registrator.go @@ -27,7 +27,6 @@ import ( log "github.com/sirupsen/logrus" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) @@ -35,25 +34,38 @@ const registerTypePath = "/data-producer/v1/info-types/" const registerProducerPath = "/data-producer/v1/info-producers/" const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}` +type TypeDefinition struct { + Id string `json:"id"` + DmaapTopicURL string `json:"dmaapTopicUrl"` +} + +type ProducerRegistrationInfo struct { + InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"` + SupportedInfoTypes []string `json:"supported_info_types"` + InfoJobCallbackUrl string `json:"info_job_callback_url"` +} + type Registrator interface { - RegisterTypes(types []*jobs.TypeData) error + RegisterTypes(types []TypeDefinition) error RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) } type RegistratorImpl struct { infoCoordinatorAddress string + httpClient restclient.HTTPClient } -func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl { +func NewRegistratorImpl(infoCoordAddr string, client restclient.HTTPClient) *RegistratorImpl { return &RegistratorImpl{ infoCoordinatorAddress: infoCoordAddr, + httpClient: client, } } -func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error { +func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error { for _, jobType := range jobTypes { body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema) - if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil { + if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Id), []byte(body), r.httpClient); error != nil { return error } log.Debugf("Registered type: %v", jobType) @@ -63,7 +75,7 @@ func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error { func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error { if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil { - if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil { + if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body), r.httpClient); putErr != nil { return putErr } log.Debugf("Registered producer: %v", producerId) diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go index 353e9de8..2cffa2c3 100644 --- a/dmaap-mediator-producer/internal/config/registrator_test.go +++ b/dmaap-mediator-producer/internal/config/registrator_test.go @@ -27,28 +27,24 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" - "oransc.org/nonrtric/dmaapmediatorproducer/mocks" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient" ) func TestRegisterTypes(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} + clientMock := httpclient.HTTPClient{} clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusCreated, }, nil) - restclient.Client = &clientMock - - type1 := jobs.TypeData{ - TypeId: "Type1", + type1 := TypeDefinition{ + Id: "Type1", } - types := []jobs.TypeData{type1} + types := []TypeDefinition{type1} - r := NewRegistratorImpl("http://localhost:9990") + r := NewRegistratorImpl("http://localhost:9990", &clientMock) err := r.RegisterTypes(types) assertions.Nil(err) @@ -71,21 +67,19 @@ func TestRegisterTypes(t *testing.T) { func TestRegisterProducer(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} + clientMock := httpclient.HTTPClient{} clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusCreated, }, nil) - restclient.Client = &clientMock - producer := ProducerRegistrationInfo{ InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl", SupportedInfoTypes: []string{"type1"}, InfoJobCallbackUrl: "jobCallbackUrl", } - r := NewRegistratorImpl("http://localhost:9990") + r := NewRegistratorImpl("http://localhost:9990", &clientMock) err := r.RegisterProducer("Producer1", &producer) assertions.Nil(err) diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index e5a1070b..7b21b002 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -27,17 +27,10 @@ import ( "sync" log "github.com/sirupsen/logrus" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) -type TypeDefinitions struct { - Types []TypeDefinition `json:"types"` -} -type TypeDefinition struct { - Id string `json:"id"` - DmaapTopicURL string `json:"dmaapTopicUrl"` -} - type TypeData struct { TypeId string `json:"id"` DMaaPTopicURL string `json:"dmaapTopicUrl"` @@ -53,33 +46,38 @@ type JobInfo struct { InfoTypeIdentity string `json:"info_type_identity"` } +type JobTypeHandler interface { + GetTypes() ([]config.TypeDefinition, error) + GetSupportedTypes() []string +} + type JobHandler interface { AddJob(JobInfo) error DeleteJob(jobId string) } -var ( - mu sync.Mutex - configFile = "configs/type_config.json" - Handler JobHandler - allTypes = make(map[string]TypeData) -) - -func init() { - Handler = newJobHandlerImpl() +type JobHandlerImpl struct { + mu sync.Mutex + configFile string + allTypes map[string]TypeData + pollClient restclient.HTTPClient + distributeClient restclient.HTTPClient } -type jobHandlerImpl struct{} - -func newJobHandlerImpl() *jobHandlerImpl { - return &jobHandlerImpl{} +func NewJobHandlerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *JobHandlerImpl { + return &JobHandlerImpl{ + configFile: typeConfigFilePath, + allTypes: make(map[string]TypeData), + pollClient: pollClient, + distributeClient: distributeClient, + } } -func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { - mu.Lock() - defer mu.Unlock() - if err := validateJobInfo(ji); err == nil { - jobs := allTypes[ji.InfoTypeIdentity].Jobs +func (jh *JobHandlerImpl) AddJob(ji JobInfo) error { + jh.mu.Lock() + defer jh.mu.Unlock() + if err := jh.validateJobInfo(ji); err == nil { + jobs := jh.allTypes[ji.InfoTypeIdentity].Jobs jobs[ji.InfoJobIdentity] = ji log.Debug("Added job: ", ji) return nil @@ -88,17 +86,17 @@ func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { } } -func (jh *jobHandlerImpl) DeleteJob(jobId string) { - mu.Lock() - defer mu.Unlock() - for _, typeData := range allTypes { +func (jh *JobHandlerImpl) DeleteJob(jobId string) { + jh.mu.Lock() + defer jh.mu.Unlock() + for _, typeData := range jh.allTypes { delete(typeData.Jobs, jobId) } log.Debug("Deleted job: ", jobId) } -func validateJobInfo(ji JobInfo) error { - if _, ok := allTypes[ji.InfoTypeIdentity]; !ok { +func (jh *JobHandlerImpl) validateJobInfo(ji JobInfo) error { + if _, ok := jh.allTypes[ji.InfoTypeIdentity]; !ok { return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity) } if ji.InfoJobIdentity == "" { @@ -111,86 +109,75 @@ func validateJobInfo(ji JobInfo) error { return nil } -func GetTypes() ([]TypeData, error) { - mu.Lock() - defer mu.Unlock() - types := make([]TypeData, 0, 1) - typeDefsByte, err := os.ReadFile(configFile) +func (jh *JobHandlerImpl) GetTypes() ([]config.TypeDefinition, error) { + jh.mu.Lock() + defer jh.mu.Unlock() + typeDefsByte, err := os.ReadFile(jh.configFile) if err != nil { return nil, err } - typeDefs := TypeDefinitions{} + typeDefs := struct { + Types []config.TypeDefinition `json:"types"` + }{} err = json.Unmarshal(typeDefsByte, &typeDefs) if err != nil { return nil, err } for _, typeDef := range typeDefs.Types { - typeInfo := TypeData{ + jh.allTypes[typeDef.Id] = TypeData{ TypeId: typeDef.Id, DMaaPTopicURL: typeDef.DmaapTopicURL, Jobs: make(map[string]JobInfo), } - if _, ok := allTypes[typeInfo.TypeId]; !ok { - allTypes[typeInfo.TypeId] = typeInfo - } - types = append(types, typeInfo) } - return types, nil + return typeDefs.Types, nil } -func GetSupportedTypes() []string { - mu.Lock() - defer mu.Unlock() +func (jh *JobHandlerImpl) GetSupportedTypes() []string { + jh.mu.Lock() + defer jh.mu.Unlock() supportedTypes := []string{} - for k := range allTypes { + for k := range jh.allTypes { supportedTypes = append(supportedTypes, k) } return supportedTypes } -func AddJob(job JobInfo) error { - return Handler.AddJob(job) -} - -func DeleteJob(jobId string) { - Handler.DeleteJob(jobId) -} - -func RunJobs(mRAddress string) { +func (jh *JobHandlerImpl) RunJobs(mRAddress string) { for { - pollAndDistributeMessages(mRAddress) + jh.pollAndDistributeMessages(mRAddress) } } -func pollAndDistributeMessages(mRAddress string) { - for typeId, typeInfo := range allTypes { +func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) { + jh.mu.Lock() + defer jh.mu.Unlock() + for typeId, typeInfo := range jh.allTypes { log.Debugf("Processing jobs for type: %v", typeId) - messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL)) + messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient) if error != nil { log.Warnf("Error getting data from MR. Cause: %v", error) continue } - distributeMessages(messagesBody, typeInfo) + jh.distributeMessages(messagesBody, typeInfo) } } -func distributeMessages(messages []byte, typeInfo TypeData) { +func (jh *JobHandlerImpl) distributeMessages(messages []byte, typeInfo TypeData) { if len(messages) > 2 { - mu.Lock() for _, jobInfo := range typeInfo.Jobs { - go sendMessagesToConsumer(messages, jobInfo) + go jh.sendMessagesToConsumer(messages, jobInfo) } - mu.Unlock() } } -func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) { +func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) { log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity) - if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil { + if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil { log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr) } } -func clearAll() { - allTypes = make(map[string]TypeData) +func (jh *JobHandlerImpl) clearAll() { + jh.allTypes = make(map[string]TypeData) } diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 6f292272..b301c2f8 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/stretchr/testify/require" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` @@ -42,31 +41,31 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes if err != nil { t.Errorf("Unable to create temporary directory for types due to: %v", err) } + fname := filepath.Join(typesDir, "type_config.json") + handlerUnderTest := NewJobHandlerImpl(fname, nil, nil) t.Cleanup(func() { os.RemoveAll(typesDir) - clearAll() + handlerUnderTest.clearAll() }) - fname := filepath.Join(typesDir, "type_config.json") - configFile = fname if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { t.Errorf("Unable to create temporary config file for types due to: %v", err) } - types, err := GetTypes() - wantedType := TypeData{ - TypeId: "type1", - DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", - Jobs: make(map[string]JobInfo), + types, err := handlerUnderTest.GetTypes() + wantedType := TypeDefinition{ + Id: "type1", + DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", } - wantedTypes := []TypeData{wantedType} + wantedTypes := []TypeDefinition{wantedType} assertions.EqualValues(wantedTypes, types) assertions.Nil(err) - supportedTypes := GetSupportedTypes() + supportedTypes := handlerUnderTest.GetSupportedTypes() assertions.EqualValues([]string{"type1"}, supportedTypes) } func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { assertions := require.New(t) + handlerUnderTest := NewJobHandlerImpl("", nil, nil) wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", @@ -75,66 +74,72 @@ func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { InfoJobData: "{}", InfoTypeIdentity: "type1", } - allTypes["type1"] = TypeData{ + handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", Jobs: map[string]JobInfo{"job1": wantedJob}, } t.Cleanup(func() { - clearAll() + handlerUnderTest.clearAll() }) - err := AddJob(wantedJob) + err := handlerUnderTest.AddJob(wantedJob) assertions.Nil(err) - assertions.Equal(1, len(allTypes["type1"].Jobs)) - assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"]) + assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs)) + assertions.Equal(wantedJob, handlerUnderTest.allTypes["type1"].Jobs["job1"]) } func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { assertions := require.New(t) + handlerUnderTest := NewJobHandlerImpl("", nil, nil) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } - err := AddJob(jobInfo) + err := handlerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("type not supported: type1", err.Error()) } func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allTypes["type1"] = TypeData{ + handlerUnderTest := NewJobHandlerImpl("", nil, nil) + handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } t.Cleanup(func() { - clearAll() + handlerUnderTest.clearAll() }) + jobInfo := JobInfo{ InfoTypeIdentity: "type1", } - - err := AddJob(jobInfo) + err := handlerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("missing required job identity: { type1}", err.Error()) } func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - allTypes["type1"] = TypeData{ + handlerUnderTest := NewJobHandlerImpl("", nil, nil) + handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } + t.Cleanup(func() { + handlerUnderTest.clearAll() + }) + jobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", } - - err := AddJob(jobInfo) + err := handlerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("missing required target URI: { job1 type1}", err.Error()) - clearAll() } func TestDeleteJob(t *testing.T) { assertions := require.New(t) + handlerUnderTest := NewJobHandlerImpl("", nil, nil) jobToKeep := JobInfo{ InfoJobIdentity: "job1", InfoTypeIdentity: "type1", @@ -143,52 +148,44 @@ func TestDeleteJob(t *testing.T) { InfoJobIdentity: "job2", InfoTypeIdentity: "type1", } - allTypes["type1"] = TypeData{ + handlerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", Jobs: map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete}, } t.Cleanup(func() { - clearAll() + handlerUnderTest.clearAll() }) - DeleteJob("job2") - assertions.Equal(1, len(allTypes["type1"].Jobs)) - assertions.Equal(jobToKeep, allTypes["type1"].Jobs["job1"]) + handlerUnderTest.DeleteJob("job2") + assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs)) + assertions.Equal(jobToKeep, handlerUnderTest.allTypes["type1"].Jobs["job1"]) } func TestPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) - jobInfo := JobInfo{ - InfoTypeIdentity: "type1", - InfoJobIdentity: "job1", - TargetUri: "http://consumerHost/target", - } - allTypes["type1"] = TypeData{ - TypeId: "type1", - DMaaPTopicURL: "topicUrl", - Jobs: map[string]JobInfo{"job1": jobInfo}, - } - t.Cleanup(func() { - clearAll() - }) wg := sync.WaitGroup{} - wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute messages := `[{"message": {"data": "data"}}]` - clientMock := NewTestClient(func(req *http.Request) *http.Response { + pollClientMock := NewTestClient(func(req *http.Request) *http.Response { if req.URL.String() == "http://mrAddr/topicUrl" { assertions.Equal(req.Method, "GET") - wg.Done() + wg.Done() // Signal that the poll call has been made return &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader([]byte(messages))), Header: make(http.Header), // Must be set to non-nil value or it panics } - } else if req.URL.String() == "http://consumerHost/target" { + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/target" { assertions.Equal(req.Method, "POST") assertions.Equal(messages, getBodyAsString(req)) assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type")) - wg.Done() + wg.Done() // Signal that the distribution call has been made return &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), @@ -199,10 +196,23 @@ func TestPollAndDistributeMessages(t *testing.T) { t.Fail() return nil }) + handlerUnderTest := NewJobHandlerImpl("", pollClientMock, distributeClientMock) + jobInfo := JobInfo{ + InfoTypeIdentity: "type1", + InfoJobIdentity: "job1", + TargetUri: "http://consumerHost/target", + } + handlerUnderTest.allTypes["type1"] = TypeData{ + TypeId: "type1", + DMaaPTopicURL: "topicUrl", + Jobs: map[string]JobInfo{"job1": jobInfo}, + } + t.Cleanup(func() { + handlerUnderTest.clearAll() + }) - restclient.Client = clientMock - - pollAndDistributeMessages("http://mrAddr") + wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute + handlerUnderTest.pollAndDistributeMessages("http://mrAddr") if waitTimeout(&wg, 100*time.Millisecond) { t.Error("Not all calls to server were made") diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go index a783f7e6..2b3a0cf3 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go @@ -43,47 +43,35 @@ func (pe RequestError) Error() string { return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body)) } -var ( - Client HTTPClient -) - -func init() { - Client = &http.Client{} -} - -func Get(url string) ([]byte, error) { - if response, err := Client.Get(url); err == nil { - defer response.Body.Close() - if responseData, err := io.ReadAll(response.Body); err == nil { - if isResponseSuccess(response.StatusCode) { +func Get(url string, client HTTPClient) ([]byte, error) { + if response, err := client.Get(url); err == nil { + if isResponseSuccess(response.StatusCode) { + defer response.Body.Close() + if responseData, err := io.ReadAll(response.Body); err == nil { return responseData, nil } else { - requestError := RequestError{ - StatusCode: response.StatusCode, - Body: responseData, - } - return nil, requestError + return nil, err } } else { - return nil, err + return nil, getRequestError(response) } } else { return nil, err } } -func Put(url string, body []byte) error { - return do(http.MethodPut, url, body) +func Put(url string, body []byte, client HTTPClient) error { + return do(http.MethodPut, url, body, client) } -func Post(url string, body []byte) error { - return do(http.MethodPost, url, body) +func Post(url string, body []byte, client HTTPClient) error { + return do(http.MethodPost, url, body, client) } -func do(method string, url string, body []byte) error { +func do(method string, url string, body []byte, client HTTPClient) error { if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil { req.Header.Set("Content-Type", "application/json; charset=utf-8") - if response, respErr := Client.Do(req); respErr == nil { + if response, respErr := client.Do(req); respErr == nil { if isResponseSuccess(response.StatusCode) { return nil } else { diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go index 3727e6a3..7fe3cc7a 100644 --- a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go +++ b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go @@ -23,58 +23,55 @@ package restclient import ( "bytes" "errors" + "fmt" "io/ioutil" "net/http" - "reflect" "testing" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "oransc.org/nonrtric/dmaapmediatorproducer/mocks" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient" ) -func TestGet(t *testing.T) { - clientMock := mocks.HTTPClient{} - - clientMock.On("Get", "http://testOk").Return(&http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader([]byte("Response"))), - }, nil) - - clientMock.On("Get", "http://testNotOk").Return(&http.Response{ +func TestRequestError_Error(t *testing.T) { + assertions := require.New(t) + actualError := RequestError{ StatusCode: http.StatusBadRequest, - Body: ioutil.NopCloser(bytes.NewReader([]byte("Bad Response"))), - }, nil) - - clientMock.On("Get", "http://testError").Return(nil, errors.New("Failed Request")) - - Client = &clientMock - + Body: []byte("error"), + } + assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error()) +} +func TestGet(t *testing.T) { + assertions := require.New(t) type args struct { - url string + url string + mockReturnStatus int + mockReturnBody string + mockReturnError error } tests := []struct { name string args args want []byte - wantErr bool wantedError error }{ { name: "Test Get with OK response", args: args{ - url: "http://testOk", + url: "http://testOk", + mockReturnStatus: http.StatusOK, + mockReturnBody: "Response", }, - want: []byte("Response"), - wantErr: false, + want: []byte("Response"), }, { name: "Test Get with Not OK response", args: args{ - url: "http://testNotOk", + url: "http://testNotOk", + mockReturnStatus: http.StatusBadRequest, + mockReturnBody: "Bad Response", }, - want: nil, - wantErr: true, + want: nil, wantedError: RequestError{ StatusCode: http.StatusBadRequest, Body: []byte("Bad Response"), @@ -83,40 +80,38 @@ func TestGet(t *testing.T) { { name: "Test Get with error", args: args{ - url: "http://testError", + url: "http://testError", + mockReturnError: errors.New("Failed Request"), }, want: nil, - wantErr: true, wantedError: errors.New("Failed Request"), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := Get(tt.args.url) - if (err != nil) != tt.wantErr { - t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("Get() = %v, want %v", got, tt.want) - } - if tt.wantErr && err.Error() != tt.wantedError.Error() { - t.Errorf("Get() error = %v, wantedError % v", err, tt.wantedError.Error()) - } + clientMock := httpclient.HTTPClient{} + clientMock.On("Get", tt.args.url).Return(&http.Response{ + StatusCode: tt.args.mockReturnStatus, + Body: ioutil.NopCloser(bytes.NewReader([]byte(tt.args.mockReturnBody))), + }, tt.args.mockReturnError) + + got, err := Get(tt.args.url, &clientMock) + assertions.Equal(tt.wantedError, err, tt.name) + assertions.Equal(tt.want, got, tt.name) + clientMock.AssertCalled(t, "Get", tt.args.url) }) } } func TestPutOk(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} + clientMock := httpclient.HTTPClient{} clientMock.On("Do", mock.Anything).Return(&http.Response{ StatusCode: http.StatusOK, }, nil) - Client = &clientMock - if err := Put("http://localhost:9990", []byte("body")); err != nil { + if err := Put("http://localhost:9990", []byte("body"), &clientMock); err != nil { t.Errorf("Put() error = %v, did not want error", err) } var actualRequest *http.Request @@ -134,31 +129,76 @@ func TestPutOk(t *testing.T) { clientMock.AssertNumberOfCalls(t, "Do", 1) } -func TestPutBadResponse(t *testing.T) { +func TestPostOk(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} + clientMock := httpclient.HTTPClient{} clientMock.On("Do", mock.Anything).Return(&http.Response{ - StatusCode: http.StatusBadRequest, - Body: ioutil.NopCloser(bytes.NewReader([]byte("Bad Request"))), + StatusCode: http.StatusOK, }, nil) - Client = &clientMock - err := Put("url", []byte("body")) - assertions.NotNil("Put() error = %v, wanted error", err) - expectedErrorMessage := "Request failed due to error response with status: 400 and body: Bad Request" - assertions.Equal(expectedErrorMessage, err.Error()) + if err := Post("http://localhost:9990", []byte("body"), &clientMock); err != nil { + t.Errorf("Put() error = %v, did not want error", err) + } + var actualRequest *http.Request + clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool { + actualRequest = req + return true + })) + assertions.Equal(http.MethodPost, actualRequest.Method) + assertions.Equal("http", actualRequest.URL.Scheme) + assertions.Equal("localhost:9990", actualRequest.URL.Host) + assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + body, _ := ioutil.ReadAll(actualRequest.Body) + expectedBody := []byte("body") + assertions.Equal(expectedBody, body) + clientMock.AssertNumberOfCalls(t, "Do", 1) } -func TestPutError(t *testing.T) { +func Test_doErrorCases(t *testing.T) { assertions := require.New(t) - clientMock := mocks.HTTPClient{} - - clientMock.On("Do", mock.Anything).Return(nil, errors.New("Failed Request")) - - Client = &clientMock - err := Put("url", []byte("body")) - assertions.NotNil("Put() error = %v, wanted error", err) - expectedErrorMessage := "Failed Request" - assertions.Equal(expectedErrorMessage, err.Error()) + type args struct { + url string + mockReturnStatus int + mockReturnBody []byte + mockReturnError error + } + tests := []struct { + name string + args args + wantErr error + }{ + { + name: "Bad request should get RequestError", + args: args{ + url: "badRequest", + mockReturnStatus: http.StatusBadRequest, + mockReturnBody: []byte("bad request"), + mockReturnError: nil, + }, + wantErr: RequestError{ + StatusCode: http.StatusBadRequest, + Body: []byte("bad request"), + }, + }, + { + name: "Server unavailable should get error", + args: args{ + url: "serverUnavailable", + mockReturnError: fmt.Errorf("Server unavailable"), + }, + wantErr: fmt.Errorf("Server unavailable"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientMock := httpclient.HTTPClient{} + clientMock.On("Do", mock.Anything).Return(&http.Response{ + StatusCode: tt.args.mockReturnStatus, + Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)), + }, tt.args.mockReturnError) + err := do("PUT", tt.args.url, nil, &clientMock) + assertions.Equal(tt.wantErr, err, tt.name) + }) + } } diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go index d07b7309..5861cbe7 100644 --- a/dmaap-mediator-producer/internal/server/server.go +++ b/dmaap-mediator-producer/internal/server/server.go @@ -35,19 +35,32 @@ const AddJobPath = "/jobs" const jobIdToken = "infoJobId" const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}" -func NewRouter() *mux.Router { +type ProducerCallbackHandler struct { + jobHandler jobs.JobHandler +} + +func NewProducerCallbackHandler(jh jobs.JobHandler) *ProducerCallbackHandler { + return &ProducerCallbackHandler{ + jobHandler: jh, + } +} + +func NewRouter(jh jobs.JobHandler) *mux.Router { + callbackHandler := NewProducerCallbackHandler(jh) r := mux.NewRouter() r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status") - r.HandleFunc(AddJobPath, addInfoJobHandler).Methods(http.MethodPost).Name("add") - r.HandleFunc(deleteJobPath, deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete") + r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add") + r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete") r.NotFoundHandler = ¬FoundHandler{} r.MethodNotAllowedHandler = &methodNotAllowedHandler{} return r } -func statusHandler(w http.ResponseWriter, r *http.Request) {} +func statusHandler(w http.ResponseWriter, r *http.Request) { + // Just respond OK to show the server is alive for now. Might be extended later. +} -func addInfoJobHandler(w http.ResponseWriter, r *http.Request) { +func (h *ProducerCallbackHandler) addInfoJobHandler(w http.ResponseWriter, r *http.Request) { b, readErr := ioutil.ReadAll(r.Body) if readErr != nil { http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest) @@ -58,12 +71,12 @@ func addInfoJobHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest) return } - if err := jobs.AddJob(jobInfo); err != nil { + if err := h.jobHandler.AddJob(jobInfo); err != nil { http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest) } } -func deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) { +func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id, ok := vars[jobIdToken] if !ok { @@ -71,7 +84,7 @@ func deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) { return } - jobs.DeleteJob(id) + h.jobHandler.DeleteJob(id) } type notFoundHandler struct{} diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index 59439143..08885077 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -39,7 +39,7 @@ import ( func TestNewRouter(t *testing.T) { assertions := require.New(t) - r := NewRouter() + r := NewRouter(nil) statusRoute := r.Get("status") assertions.NotNil(statusRoute) supportedMethods, err := statusRoute.GetMethods() @@ -63,7 +63,6 @@ func TestNewRouter(t *testing.T) { responseRecorder := httptest.NewRecorder() handler.ServeHTTP(responseRecorder, newRequest("GET", "/wrong", nil, t)) assertions.Equal(http.StatusNotFound, responseRecorder.Code) - assertions.Contains(responseRecorder.Body.String(), "404 not found.") methodNotAllowedHandler := r.MethodNotAllowedHandler @@ -71,7 +70,6 @@ func TestNewRouter(t *testing.T) { responseRecorder = httptest.NewRecorder() handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t)) assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code) - assertions.Contains(responseRecorder.Body.String(), "Method is not supported.") } @@ -88,51 +86,39 @@ func TestStatusHandler(t *testing.T) { func TestAddInfoJobHandler(t *testing.T) { assertions := require.New(t) - jobHandlerMock := jobhandler.JobHandler{} - - goodJobInfo := jobs.JobInfo{ - Owner: "owner", - LastUpdated: "now", - InfoJobIdentity: "jobId", - TargetUri: "target", - InfoJobData: "{}", - InfoTypeIdentity: "type", - } - badJobInfo := jobs.JobInfo{ - Owner: "bad", - } - jobHandlerMock.On("AddJob", goodJobInfo).Return(nil) - jobHandlerMock.On("AddJob", badJobInfo).Return(errors.New("error")) - jobs.Handler = &jobHandlerMock type args struct { - responseRecorder *httptest.ResponseRecorder - r *http.Request + job jobs.JobInfo + mockReturn error } tests := []struct { name string args args wantedStatus int wantedBody string - assertFunc assertMockFunk }{ { name: "AddInfoJobHandler with correct path and method, should return OK", args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest(http.MethodPost, "/jobs", &goodJobInfo, t), + job: jobs.JobInfo{ + Owner: "owner", + LastUpdated: "now", + InfoJobIdentity: "jobId", + TargetUri: "target", + InfoJobData: "{}", + InfoTypeIdentity: "type", + }, }, wantedStatus: http.StatusOK, wantedBody: "", - assertFunc: func(mock *jobhandler.JobHandler) { - mock.AssertCalled(t, "AddJob", goodJobInfo) - }, }, { name: "AddInfoJobHandler with incorrect job info, should return BadRequest", args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest(http.MethodPost, "/jobs", &badJobInfo, t), + job: jobs.JobInfo{ + Owner: "bad", + }, + mockReturn: errors.New("error"), }, wantedStatus: http.StatusBadRequest, wantedBody: "Invalid job info. Cause: error", @@ -140,15 +126,19 @@ func TestAddInfoJobHandler(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - handler := http.HandlerFunc(addInfoJobHandler) - handler.ServeHTTP(tt.args.responseRecorder, tt.args.r) - assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code, tt.name) + jobHandlerMock := jobhandler.JobHandler{} + jobHandlerMock.On("AddJob", tt.args.job).Return(tt.args.mockReturn) + callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock) - assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody, tt.name) + handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler) + responseRecorder := httptest.NewRecorder() + r := newRequest(http.MethodPost, "/jobs", &tt.args.job, t) - if tt.assertFunc != nil { - tt.assertFunc(&jobHandlerMock) - } + handler.ServeHTTP(responseRecorder, r) + + assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name) + assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name) + jobHandlerMock.AssertCalled(t, "AddJob", tt.args.job) }) } } @@ -158,11 +148,11 @@ func TestDeleteJob(t *testing.T) { jobHandlerMock := jobhandler.JobHandler{} jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil) - jobs.Handler = &jobHandlerMock + callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock) responseRecorder := httptest.NewRecorder() r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"}) - handler := http.HandlerFunc(deleteInfoJobHandler) + handler := http.HandlerFunc(callbackHandlerUnderTest.deleteInfoJobHandler) handler.ServeHTTP(responseRecorder, r) assertions.Equal(http.StatusOK, responseRecorder.Result().StatusCode) @@ -171,8 +161,6 @@ func TestDeleteJob(t *testing.T) { jobHandlerMock.AssertCalled(t, "DeleteJob", "job1") } -type assertMockFunk func(mock *jobhandler.JobHandler) - func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request { var body io.Reader if jobInfo != nil { diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 15207ec7..beeb995d 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -24,49 +24,45 @@ import ( "fmt" "net/http" "sync" + "time" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" "oransc.org/nonrtric/dmaapmediatorproducer/internal/server" ) +const timeoutHTTPClient = time.Second * 5 +const timeoutPollClient = time.Second * 15 + var configuration *config.Config -var callbackAddress string +var httpClient restclient.HTTPClient +var jobHandler *jobs.JobHandlerImpl func init() { configuration = config.New() - if loglevel, err := log.ParseLevel(configuration.LogLevel); err == nil { - log.SetLevel(loglevel) - } else { - log.Warnf("Invalid log level: %v. Log level will be Info!", configuration.LogLevel) - } +} +func main() { + log.SetLevel(configuration.LogLevel) log.Debug("Initializing DMaaP Mediator Producer") - if configuration.InfoProducerHost == "" { - log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST") + if err := validateConfiguration(configuration); err != nil { + log.Fatalf("Stopping producer due to error: %v", err) } - callbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort) + callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort) - registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress) - if types, err := jobs.GetTypes(); err == nil { - if regErr := registrator.RegisterTypes(types); regErr != nil { - log.Fatalf("Unable to register all types due to: %v", regErr) - } - } else { - log.Fatalf("Unable to get types to register due to: %v", err) + httpClient = &http.Client{ + Timeout: timeoutHTTPClient, } - producer := config.ProducerRegistrationInfo{ - InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath, - SupportedInfoTypes: jobs.GetSupportedTypes(), - InfoJobCallbackUrl: callbackAddress + server.AddJobPath, + pollClient := &http.Client{ + Timeout: timeoutPollClient, } - if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { - log.Fatalf("Unable to register producer due to: %v", err) + jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", pollClient, httpClient) + if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil { + log.Fatalf("Stopping producer due to: %v", err) } -} -func main() { log.Debug("Starting DMaaP Mediator Producer") wg := new(sync.WaitGroup) @@ -75,13 +71,13 @@ func main() { log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort) go func() { - r := server.NewRouter() + r := server.NewRouter(jobHandler) log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r)) wg.Done() }() go func() { - jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort)) + jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort)) wg.Done() }() @@ -89,3 +85,30 @@ func main() { wg.Wait() log.Debug("Stopping DMaaP Mediator Producer") } + +func validateConfiguration(configuration *config.Config) error { + if configuration.InfoProducerHost == "" { + return fmt.Errorf("missing INFO_PRODUCER_HOST") + } + return nil +} + +func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error { + registrator := config.NewRegistratorImpl(infoCoordinatorAddress, httpClient) + if types, err := jobHandler.GetTypes(); err == nil { + if regErr := registrator.RegisterTypes(types); regErr != nil { + return fmt.Errorf("unable to register all types due to: %v", regErr) + } + } else { + return fmt.Errorf("unable to get types to register due to: %v", err) + } + producer := config.ProducerRegistrationInfo{ + InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath, + SupportedInfoTypes: jobHandler.GetSupportedTypes(), + InfoJobCallbackUrl: callbackAddress + server.AddJobPath, + } + if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { + return fmt.Errorf("unable to register producer due to: %v", err) + } + return nil +} diff --git a/dmaap-mediator-producer/mocks/HTTPClient.go b/dmaap-mediator-producer/mocks/httpclient/HTTPClient.go similarity index 94% rename from dmaap-mediator-producer/mocks/HTTPClient.go rename to dmaap-mediator-producer/mocks/httpclient/HTTPClient.go index 3037798b..ab399dfb 100644 --- a/dmaap-mediator-producer/mocks/HTTPClient.go +++ b/dmaap-mediator-producer/mocks/httpclient/HTTPClient.go @@ -1,6 +1,6 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.9.3. DO NOT EDIT. -package mocks +package httpclient import ( http "net/http" diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go index 144f56f3..03da6f4e 100644 --- a/dmaap-mediator-producer/simulator/consumersimulator.go +++ b/dmaap-mediator-producer/simulator/consumersimulator.go @@ -26,18 +26,25 @@ import ( "fmt" "io" http "net/http" + "time" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) +var httpClient http.Client + func main() { + httpClient = http.Client{ + Timeout: time.Second * 5, + } port := flag.Int("port", 40935, "The port this consumer will listen on") flag.Parse() http.HandleFunc("/jobs", handleData) + registerJob(*port) + fmt.Print("Starting consumer on port: ", *port) http.ListenAndServe(fmt.Sprintf(":%v", *port), nil) - registerJob(*port) } func registerJob(port int) { @@ -49,7 +56,7 @@ func registerJob(port int) { }{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"} fmt.Print("Registering consumer: ", jobInfo) body, _ := json.Marshal(jobInfo) - putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body) + putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient) if putErr != nil { fmt.Printf("Unable to register consumer: %v", putErr) } -- 2.16.6