Refactor for better separation DMaaP Mediator 19/7119/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 24 Nov 2021 14:53:24 +0000 (15:53 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 24 Nov 2021 15:31:07 +0000 (16:31 +0100)
Issue-ID: NONRTRIC-639
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I809c40df83a8db39c810ab102e13ab49546fca6d

dmaap-mediator-producer/internal/config/config.go
dmaap-mediator-producer/internal/config/config_test.go
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/main.go

index eef1b5f..34b056d 100644 (file)
@@ -21,6 +21,7 @@
 package config
 
 import (
+       "encoding/json"
        "fmt"
        "os"
        "strconv"
@@ -81,3 +82,19 @@ func getLogLevel() log.Level {
                return log.InfoLevel
        }
 }
+
+func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
+       typeDefsByte, err := os.ReadFile(configFile)
+       if err != nil {
+               return nil, err
+       }
+       typeDefs := struct {
+               Types []TypeDefinition `json:"types"`
+       }{}
+       err = json.Unmarshal(typeDefsByte, &typeDefs)
+       if err != nil {
+               return nil, err
+       }
+
+       return typeDefs.Types, nil
+}
index 90d3c03..293e0d0 100644 (file)
@@ -23,7 +23,7 @@ package config
 import (
        "bytes"
        "os"
-       "reflect"
+       "path/filepath"
        "testing"
 
        log "github.com/sirupsen/logrus"
@@ -75,9 +75,8 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T
                ProducerCertPath:       "security/producer.crt",
                ProducerKeyPath:        "security/producer.key",
        }
-       if got := New(); !reflect.DeepEqual(got, &wantConfig) {
-               t.Errorf("New() = %v, want %v", got, &wantConfig)
-       }
+       got := New()
+       assertions.Equal(&wantConfig, got)
        logString := buf.String()
        assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used")
 }
@@ -109,3 +108,30 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
        logString := buf.String()
        assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
 }
+
+const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
+
+func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) {
+       assertions := require.New(t)
+       typesDir, err := os.MkdirTemp("", "configs")
+       if err != nil {
+               t.Errorf("Unable to create temporary directory for types due to: %v", err)
+       }
+       fname := filepath.Join(typesDir, "type_config.json")
+       t.Cleanup(func() {
+               os.RemoveAll(typesDir)
+       })
+       if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
+               t.Errorf("Unable to create temporary config file for types due to: %v", err)
+       }
+
+       types, err := GetJobTypesFromConfiguration(fname)
+
+       wantedType := TypeDefinition{
+               Id:            "type1",
+               DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+       }
+       wantedTypes := []TypeDefinition{wantedType}
+       assertions.EqualValues(wantedTypes, types)
+       assertions.Nil(err)
+}
index 6dad5fd..b6616a1 100644 (file)
@@ -21,9 +21,7 @@
 package jobs
 
 import (
-       "encoding/json"
        "fmt"
-       "os"
        "sync"
 
        log "github.com/sirupsen/logrus"
@@ -47,7 +45,7 @@ type JobInfo struct {
 }
 
 type JobTypesManager interface {
-       LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
+       LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
        GetSupportedTypes() []string
 }
 
@@ -57,16 +55,14 @@ type JobsManager interface {
 }
 
 type JobsManagerImpl struct {
-       configFile       string
        allTypes         map[string]TypeData
        pollClient       restclient.HTTPClient
        mrAddress        string
        distributeClient restclient.HTTPClient
 }
 
-func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
        return &JobsManagerImpl{
-               configFile:       typeConfigFilePath,
                allTypes:         make(map[string]TypeData),
                pollClient:       pollClient,
                mrAddress:        mrAddr,
@@ -107,26 +103,15 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
        return nil
 }
 
-func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
-       typeDefsByte, err := os.ReadFile(jm.configFile)
-       if err != nil {
-               return nil, err
-       }
-       typeDefs := struct {
-               Types []config.TypeDefinition `json:"types"`
-       }{}
-       err = json.Unmarshal(typeDefsByte, &typeDefs)
-       if err != nil {
-               return nil, err
-       }
-       for _, typeDef := range typeDefs.Types {
+func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
+       for _, typeDef := range types {
                jm.allTypes[typeDef.Id] = TypeData{
                        TypeId:        typeDef.Id,
                        DMaaPTopicURL: typeDef.DmaapTopicURL,
                        jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
                }
        }
-       return typeDefs.Types, nil
+       return types
 }
 
 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
index 552b5fa..30b4ffd 100644 (file)
@@ -24,8 +24,6 @@ import (
        "bytes"
        "io/ioutil"
        "net/http"
-       "os"
-       "path/filepath"
        "sync"
        "testing"
        "time"
@@ -38,26 +36,18 @@ const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unau
 
 func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
        assertions := require.New(t)
-       typesDir, err := os.MkdirTemp("", "configs")
-       if err != nil {
-               t.Errorf("Unable to create temporary directory for types due to: %v", err)
-       }
-       fname := filepath.Join(typesDir, "type_config.json")
-       managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil)
-       t.Cleanup(func() {
-               os.RemoveAll(typesDir)
-       })
-       if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
-               t.Errorf("Unable to create temporary config file for types due to: %v", err)
-       }
-       types, err := managerUnderTest.LoadTypesFromConfiguration()
+
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+
        wantedType := config.TypeDefinition{
                Id:            "type1",
                DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
        }
        wantedTypes := []config.TypeDefinition{wantedType}
+
+       types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
+
        assertions.EqualValues(wantedTypes, types)
-       assertions.Nil(err)
 
        supportedTypes := managerUnderTest.GetSupportedTypes()
        assertions.EqualValues([]string{"type1"}, supportedTypes)
@@ -65,7 +55,7 @@ func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedT
 
 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        wantedJob := JobInfo{
                Owner:            "owner",
                LastUpdated:      "now",
@@ -93,7 +83,7 @@ func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T
 
 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
        }
@@ -105,7 +95,7 @@ func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T)
 
 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
                TypeId: "type1",
        }
@@ -120,7 +110,7 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
 
 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        managerUnderTest.allTypes["type1"] = TypeData{
                TypeId: "type1",
        }
@@ -136,7 +126,7 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
 
 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
        jobsHandler := jobsHandler{
                deleteJobCh: make(chan string)}
        managerUnderTest.allTypes["type1"] = TypeData{
@@ -192,7 +182,7 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T)
        })
        jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
 
-       jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+       jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
        jobsManager.allTypes["type1"] = TypeData{
                DMaaPTopicURL: "/topicUrl",
                TypeId:        "type1",
index 3a4935a..2d72466 100644 (file)
@@ -56,7 +56,7 @@ func main() {
        }
        retryClient := restclient.CreateRetryClient(cert)
 
-       jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
+       jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
        if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
                log.Fatalf("Stopping producer due to: %v", err)
        }
@@ -87,13 +87,15 @@ func validateConfiguration(configuration *config.Config) error {
 }
 func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
        registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
-       if types, err := jobTypesHandler.LoadTypesFromConfiguration(); err == nil {
-               if regErr := registrator.RegisterTypes(types); regErr != nil {
-                       return fmt.Errorf("unable to register all types due to: %v", regErr)
-               }
-       } else {
-               return fmt.Errorf("unable to get types to register due to: %v", err)
+       configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
+       if err != nil {
+               return fmt.Errorf("unable to register all types due to: %v", err)
        }
+       regErr := registrator.RegisterTypes(jobTypesHandler.LoadTypesFromConfiguration(configTypes))
+       if regErr != nil {
+               return fmt.Errorf("unable to register all types due to: %v", regErr)
+       }
+
        producer := config.ProducerRegistrationInfo{
                InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
                SupportedInfoTypes:                 jobTypesHandler.GetSupportedTypes(),