From a77cd650f99b29be6c53a690157529e7e158d70e Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Mon, 6 Sep 2021 10:56:21 +0200 Subject: [PATCH] Register producer DMaaP mediator producer Issue-ID: NONRTRIC-584 Signed-off-by: elinuxhenrik Change-Id: I7f9d8e6a2f68d13e91706722d171b5f6874bae78 --- dmaap-mediator-producer/.gitignore | 1 + dmaap-mediator-producer/internal/config/config.go | 20 +++++++---- .../internal/config/config_test.go | 19 ++++++----- .../internal/config/registrator.go | 17 +++++++++- .../internal/config/registrator_test.go | 39 +++++++++++++++++++++- .../internal/jobtypes/jobtypes.go | 15 ++++++--- .../internal/jobtypes/jobtypes_test.go | 7 ++-- dmaap-mediator-producer/main.go | 15 +++++++-- 8 files changed, 109 insertions(+), 24 deletions(-) diff --git a/dmaap-mediator-producer/.gitignore b/dmaap-mediator-producer/.gitignore index 567963e6..0d08f66c 100644 --- a/dmaap-mediator-producer/.gitignore +++ b/dmaap-mediator-producer/.gitignore @@ -3,3 +3,4 @@ coverage.* main dmaapmediatorproducer +__debug_bin* diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 6969f9fa..a3e3a119 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -25,16 +25,24 @@ import ( ) type Config struct { - LogLevel string - JobResultUri string - InfoCoordinatorAddress string + LogLevel string + InfoJobCallbackUrl string + InfoCoordinatorAddress string + InfoProducerSupervisionCallbackUrl string +} + +type ProducerRegistrationInfo struct { + InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"` + SupportedInfoTypes []string `json:"supported_info_types"` + InfoJobCallbackUrl string `json:"info_job_callback_url"` } func New() *Config { return &Config{ - LogLevel: getEnv("LOG_LEVEL", "Info"), - JobResultUri: getEnv("JOB_RESULT_URI", ""), - InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), + LogLevel: getEnv("LOG_LEVEL", "Info"), + InfoJobCallbackUrl: getEnv("INFO_JOB_CALLBACK_URL", ""), + InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), + InfoProducerSupervisionCallbackUrl: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", ""), } } diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index f0106d0f..88ba1d8b 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -28,13 +28,15 @@ import ( func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Setenv("LOG_LEVEL", "Debug") - os.Setenv("JOB_RESULT_URI", "testUrl") - os.Setenv("INFO_COORD_ADDR", "testAddr") + os.Setenv("INFO_JOB_CALLBACK_URL", "jobCallbackUrl") + os.Setenv("INFO_COORD_ADDR", "infoCoordAddr") + os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", "supervisionCallbackUrl") defer os.Clearenv() wantConfig := Config{ - LogLevel: "Debug", - JobResultUri: "testUrl", - InfoCoordinatorAddress: "testAddr", + LogLevel: "Debug", + InfoJobCallbackUrl: "jobCallbackUrl", + InfoCoordinatorAddress: "infoCoordAddr", + InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl", } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) @@ -43,9 +45,10 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) { wantConfig := Config{ - LogLevel: "Info", - JobResultUri: "", - InfoCoordinatorAddress: "http://enrichmentservice:8083", + LogLevel: "Info", + InfoJobCallbackUrl: "", + InfoCoordinatorAddress: "http://enrichmentservice:8083", + InfoProducerSupervisionCallbackUrl: "", } if got := New(); !reflect.DeepEqual(got, &wantConfig) { t.Errorf("New() = %v, want %v", got, &wantConfig) diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go index f846f9f5..eaf8752f 100644 --- a/dmaap-mediator-producer/internal/config/registrator.go +++ b/dmaap-mediator-producer/internal/config/registrator.go @@ -21,6 +21,7 @@ package config import ( + "encoding/json" "fmt" "net/url" @@ -31,9 +32,11 @@ import ( ) const registerTypePath = "/data-producer/v1/info-types/" +const registerProducerPath = "/data-producer/v1/info-producers/" type Registrator interface { RegisterTypes(types []*jobtypes.Type) error + RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) } type RegistratorImpl struct { @@ -49,10 +52,22 @@ func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl { func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error { for _, jobType := range jobTypes { body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema) - if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Name), []byte(body)); error != nil { + if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil { return error } log.Debugf("Registered type: %v", jobType) } return nil } + +func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error { + if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil { + if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil { + return putErr + } + log.Debugf("Registered producer: %v", producerId) + return nil + } else { + return marshalErr + } +} diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go index d3dd3a0c..94dc6848 100644 --- a/dmaap-mediator-producer/internal/config/registrator_test.go +++ b/dmaap-mediator-producer/internal/config/registrator_test.go @@ -44,7 +44,7 @@ func TestRegisterTypes(t *testing.T) { restclient.Client = &clientMock type1 := jobtypes.Type{ - Name: "Type1", + TypeId: "Type1", Schema: `{"title": "Type 1"}`, } types := []*jobtypes.Type{&type1} @@ -68,3 +68,40 @@ func TestRegisterTypes(t *testing.T) { assertions.Equal(expectedBody, body) clientMock.AssertNumberOfCalls(t, "Do", 1) } + +func TestRegisterProducer(t *testing.T) { + assertions := require.New(t) + + clientMock := mocks.HTTPClient{} + + clientMock.On("Do", mock.Anything).Return(&http.Response{ + StatusCode: http.StatusCreated, + }, nil) + + restclient.Client = &clientMock + + producer := ProducerRegistrationInfo{ + InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl", + SupportedInfoTypes: []string{"type1"}, + InfoJobCallbackUrl: "jobCallbackUrl", + } + + r := NewRegistratorImpl("http://localhost:9990") + err := r.RegisterProducer("Producer1", &producer) + + assertions.Nil(err) + var actualRequest *http.Request + clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool { + actualRequest = req + return true + })) + assertions.Equal(http.MethodPut, actualRequest.Method) + assertions.Equal("http", actualRequest.URL.Scheme) + assertions.Equal("localhost:9990", actualRequest.URL.Host) + assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path) + assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + body, _ := ioutil.ReadAll(actualRequest.Body) + expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`) + assertions.Equal(expectedBody, body) + clientMock.AssertNumberOfCalls(t, "Do", 1) +} diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go index 14d837de..894c586e 100644 --- a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go +++ b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go @@ -27,11 +27,12 @@ import ( ) type Type struct { - Name string + TypeId string Schema string } var typeDir = "configs" +var supportedTypes = make([]string, 0) func GetTypes() ([]*Type, error) { types := make([]*Type, 0, 1) @@ -53,15 +54,21 @@ func GetTypes() ([]*Type, error) { return types, nil } +func GetSupportedTypes() []string { + return supportedTypes +} + func getType(path string) (*Type, error) { fileName := filepath.Base(path) typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) if typeSchema, err := os.ReadFile(path); err == nil { - return &Type{ - Name: typeName, + typeInfo := Type{ + TypeId: typeName, Schema: string(typeSchema), - }, nil + } + supportedTypes = append(supportedTypes, typeName) + return &typeInfo, nil } else { return nil, err } diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go index 5fdc378b..195fc4cf 100644 --- a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go +++ b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go @@ -30,7 +30,7 @@ import ( const type1Schema = `{"title": "Type 1"}` -func TestGetTypes_filesOkShouldReturnSliceOfTypes(t *testing.T) { +func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) typesDir, err := os.MkdirTemp("", "configs") if err != nil { @@ -44,10 +44,13 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypes(t *testing.T) { } types, err := GetTypes() wantedType := Type{ - Name: "type1", + TypeId: "type1", Schema: type1Schema, } wantedTypes := []*Type{&wantedType} assertions.EqualValues(wantedTypes, types) assertions.Nil(err) + + supportedTypes := GetSupportedTypes() + assertions.EqualValues([]string{"type1"}, supportedTypes) } diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 240bdbd5..d38496ff 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -39,8 +39,11 @@ func init() { } log.Debug("Initializing DMaaP Mediator Producer") - if configuration.JobResultUri == "" { - log.Fatal("Missing JOB_RESULT_URI") + if configuration.InfoJobCallbackUrl == "" { + log.Fatal("Missing INFO_JOB_CALLBACK_URL") + } + if configuration.InfoProducerSupervisionCallbackUrl == "" { + log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_URL") } registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress) @@ -51,6 +54,14 @@ func init() { } else { log.Fatalf("Unable to get types to register due to: %v", err) } + producer := config.ProducerRegistrationInfo{ + InfoProducerSupervisionCallbackUrl: configuration.InfoProducerSupervisionCallbackUrl, + SupportedInfoTypes: jobtypes.GetSupportedTypes(), + InfoJobCallbackUrl: configuration.InfoJobCallbackUrl, + } + if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { + log.Fatalf("Unable to register producer due to: %v", err) + } } func main() { -- 2.16.6