From f4969908aee0d89693e127a242005f88cdabc586 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Wed, 24 Nov 2021 15:53:24 +0100 Subject: [PATCH] Refactor for better separation DMaaP Mediator Issue-ID: NONRTRIC-639 Signed-off-by: elinuxhenrik Change-Id: I809c40df83a8db39c810ab102e13ab49546fca6d --- dmaap-mediator-producer/internal/config/config.go | 17 +++++++++++ .../internal/config/config_test.go | 34 +++++++++++++++++++--- dmaap-mediator-producer/internal/jobs/jobs.go | 25 ++++------------ dmaap-mediator-producer/internal/jobs/jobs_test.go | 34 ++++++++-------------- dmaap-mediator-producer/main.go | 16 +++++----- 5 files changed, 73 insertions(+), 53 deletions(-) diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index eef1b5f9..34b056d5 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -21,6 +21,7 @@ package config import ( + "encoding/json" "fmt" "os" "strconv" @@ -81,3 +82,19 @@ func getLogLevel() log.Level { return log.InfoLevel } } + +func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) { + typeDefsByte, err := os.ReadFile(configFile) + if err != nil { + return nil, err + } + typeDefs := struct { + Types []TypeDefinition `json:"types"` + }{} + err = json.Unmarshal(typeDefsByte, &typeDefs) + if err != nil { + return nil, err + } + + return typeDefs.Types, nil +} diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index 90d3c036..293e0d0b 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -23,7 +23,7 @@ package config import ( "bytes" "os" - "reflect" + "path/filepath" "testing" log "github.com/sirupsen/logrus" @@ -75,9 +75,8 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T ProducerCertPath: "security/producer.crt", ProducerKeyPath: "security/producer.key", } - if got := New(); !reflect.DeepEqual(got, &wantConfig) { - t.Errorf("New() = %v, want %v", got, &wantConfig) - } + got := New() + assertions.Equal(&wantConfig, got) logString := buf.String() assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used") } @@ -109,3 +108,30 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) { logString := buf.String() assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!") } + +const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` + +func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) { + assertions := require.New(t) + typesDir, err := os.MkdirTemp("", "configs") + if err != nil { + t.Errorf("Unable to create temporary directory for types due to: %v", err) + } + fname := filepath.Join(typesDir, "type_config.json") + t.Cleanup(func() { + os.RemoveAll(typesDir) + }) + if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { + t.Errorf("Unable to create temporary config file for types due to: %v", err) + } + + types, err := GetJobTypesFromConfiguration(fname) + + wantedType := TypeDefinition{ + Id: "type1", + DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + } + wantedTypes := []TypeDefinition{wantedType} + assertions.EqualValues(wantedTypes, types) + assertions.Nil(err) +} diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 6dad5fd9..b6616a1b 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -21,9 +21,7 @@ package jobs import ( - "encoding/json" "fmt" - "os" "sync" log "github.com/sirupsen/logrus" @@ -47,7 +45,7 @@ type JobInfo struct { } type JobTypesManager interface { - LoadTypesFromConfiguration() ([]config.TypeDefinition, error) + LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition GetSupportedTypes() []string } @@ -57,16 +55,14 @@ type JobsManager interface { } type JobsManagerImpl struct { - configFile string allTypes map[string]TypeData pollClient restclient.HTTPClient mrAddress string distributeClient restclient.HTTPClient } -func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { +func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { return &JobsManagerImpl{ - configFile: typeConfigFilePath, allTypes: make(map[string]TypeData), pollClient: pollClient, mrAddress: mrAddr, @@ -107,26 +103,15 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error { return nil } -func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) { - typeDefsByte, err := os.ReadFile(jm.configFile) - if err != nil { - return nil, err - } - typeDefs := struct { - Types []config.TypeDefinition `json:"types"` - }{} - err = json.Unmarshal(typeDefsByte, &typeDefs) - if err != nil { - return nil, err - } - for _, typeDef := range typeDefs.Types { +func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition { + for _, typeDef := range types { jm.allTypes[typeDef.Id] = TypeData{ TypeId: typeDef.Id, DMaaPTopicURL: typeDef.DmaapTopicURL, jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient), } } - return typeDefs.Types, nil + return types } func (jm *JobsManagerImpl) GetSupportedTypes() []string { diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 552b5fa1..30b4ffd9 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -24,8 +24,6 @@ import ( "bytes" "io/ioutil" "net/http" - "os" - "path/filepath" "sync" "testing" "time" @@ -38,26 +36,18 @@ const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unau func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) - typesDir, err := os.MkdirTemp("", "configs") - if err != nil { - t.Errorf("Unable to create temporary directory for types due to: %v", err) - } - fname := filepath.Join(typesDir, "type_config.json") - managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil) - t.Cleanup(func() { - os.RemoveAll(typesDir) - }) - if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { - t.Errorf("Unable to create temporary config file for types due to: %v", err) - } - types, err := managerUnderTest.LoadTypesFromConfiguration() + + managerUnderTest := NewJobsManagerImpl(nil, "", nil) + wantedType := config.TypeDefinition{ Id: "type1", DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", } wantedTypes := []config.TypeDefinition{wantedType} + + types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes) + assertions.EqualValues(wantedTypes, types) - assertions.Nil(err) supportedTypes := managerUnderTest.GetSupportedTypes() assertions.EqualValues([]string{"type1"}, supportedTypes) @@ -65,7 +55,7 @@ func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedT func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", @@ -93,7 +83,7 @@ func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } @@ -105,7 +95,7 @@ func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) managerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } @@ -120,7 +110,7 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) managerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } @@ -136,7 +126,7 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", nil) jobsHandler := jobsHandler{ deleteJobCh: make(chan string)} managerUnderTest.allTypes["type1"] = TypeData{ @@ -192,7 +182,7 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) }) jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock) - jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock) + jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock) jobsManager.allTypes["type1"] = TypeData{ DMaaPTopicURL: "/topicUrl", TypeId: "type1", diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 3a4935a3..2d72466b 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -56,7 +56,7 @@ func main() { } retryClient := restclient.CreateRetryClient(cert) - jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second)) + jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second)) if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil { log.Fatalf("Stopping producer due to: %v", err) } @@ -87,13 +87,15 @@ func validateConfiguration(configuration *config.Config) error { } func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error { registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client) - if types, err := jobTypesHandler.LoadTypesFromConfiguration(); err == nil { - if regErr := registrator.RegisterTypes(types); regErr != nil { - return fmt.Errorf("unable to register all types due to: %v", regErr) - } - } else { - return fmt.Errorf("unable to get types to register due to: %v", err) + configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json") + if err != nil { + return fmt.Errorf("unable to register all types due to: %v", err) } + regErr := registrator.RegisterTypes(jobTypesHandler.LoadTypesFromConfiguration(configTypes)) + if regErr != nil { + return fmt.Errorf("unable to register all types due to: %v", regErr) + } + producer := config.ProducerRegistrationInfo{ InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath, SupportedInfoTypes: jobTypesHandler.GetSupportedTypes(), -- 2.16.6