Register producer DMaaP mediator producer 95/6695/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 6 Sep 2021 08:56:21 +0000 (10:56 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 6 Sep 2021 09:02:51 +0000 (11:02 +0200)
Issue-ID: NONRTRIC-584
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I7f9d8e6a2f68d13e91706722d171b5f6874bae78

dmaap-mediator-producer/.gitignore
dmaap-mediator-producer/internal/config/config.go
dmaap-mediator-producer/internal/config/config_test.go
dmaap-mediator-producer/internal/config/registrator.go
dmaap-mediator-producer/internal/config/registrator_test.go
dmaap-mediator-producer/internal/jobtypes/jobtypes.go
dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go
dmaap-mediator-producer/main.go

index 567963e..0d08f66 100644 (file)
@@ -3,3 +3,4 @@
 coverage.*
 main
 dmaapmediatorproducer
+__debug_bin*
index 6969f9f..a3e3a11 100644 (file)
@@ -25,16 +25,24 @@ import (
 )
 
 type Config struct {
-       LogLevel               string
-       JobResultUri           string
-       InfoCoordinatorAddress string
+       LogLevel                           string
+       InfoJobCallbackUrl                 string
+       InfoCoordinatorAddress             string
+       InfoProducerSupervisionCallbackUrl string
+}
+
+type ProducerRegistrationInfo struct {
+       InfoProducerSupervisionCallbackUrl string   `json:"info_producer_supervision_callback_url"`
+       SupportedInfoTypes                 []string `json:"supported_info_types"`
+       InfoJobCallbackUrl                 string   `json:"info_job_callback_url"`
 }
 
 func New() *Config {
        return &Config{
-               LogLevel:               getEnv("LOG_LEVEL", "Info"),
-               JobResultUri:           getEnv("JOB_RESULT_URI", ""),
-               InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+               LogLevel:                           getEnv("LOG_LEVEL", "Info"),
+               InfoJobCallbackUrl:                 getEnv("INFO_JOB_CALLBACK_URL", ""),
+               InfoCoordinatorAddress:             getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+               InfoProducerSupervisionCallbackUrl: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", ""),
        }
 }
 
index f0106d0..88ba1d8 100644 (file)
@@ -28,13 +28,15 @@ import (
 
 func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        os.Setenv("LOG_LEVEL", "Debug")
-       os.Setenv("JOB_RESULT_URI", "testUrl")
-       os.Setenv("INFO_COORD_ADDR", "testAddr")
+       os.Setenv("INFO_JOB_CALLBACK_URL", "jobCallbackUrl")
+       os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
+       os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", "supervisionCallbackUrl")
        defer os.Clearenv()
        wantConfig := Config{
-               LogLevel:               "Debug",
-               JobResultUri:           "testUrl",
-               InfoCoordinatorAddress: "testAddr",
+               LogLevel:                           "Debug",
+               InfoJobCallbackUrl:                 "jobCallbackUrl",
+               InfoCoordinatorAddress:             "infoCoordAddr",
+               InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -43,9 +45,10 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
 
 func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
        wantConfig := Config{
-               LogLevel:               "Info",
-               JobResultUri:           "",
-               InfoCoordinatorAddress: "http://enrichmentservice:8083",
+               LogLevel:                           "Info",
+               InfoJobCallbackUrl:                 "",
+               InfoCoordinatorAddress:             "http://enrichmentservice:8083",
+               InfoProducerSupervisionCallbackUrl: "",
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
index f846f9f..eaf8752 100644 (file)
@@ -21,6 +21,7 @@
 package config
 
 import (
+       "encoding/json"
        "fmt"
        "net/url"
 
@@ -31,9 +32,11 @@ import (
 )
 
 const registerTypePath = "/data-producer/v1/info-types/"
+const registerProducerPath = "/data-producer/v1/info-producers/"
 
 type Registrator interface {
        RegisterTypes(types []*jobtypes.Type) error
+       RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
 }
 
 type RegistratorImpl struct {
@@ -49,10 +52,22 @@ func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl {
 func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error {
        for _, jobType := range jobTypes {
                body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
-               if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Name), []byte(body)); error != nil {
+               if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
                        return error
                }
                log.Debugf("Registered type: %v", jobType)
        }
        return nil
 }
+
+func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error {
+       if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil {
+               if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil {
+                       return putErr
+               }
+               log.Debugf("Registered producer: %v", producerId)
+               return nil
+       } else {
+               return marshalErr
+       }
+}
index d3dd3a0..94dc684 100644 (file)
@@ -44,7 +44,7 @@ func TestRegisterTypes(t *testing.T) {
        restclient.Client = &clientMock
 
        type1 := jobtypes.Type{
-               Name:   "Type1",
+               TypeId: "Type1",
                Schema: `{"title": "Type 1"}`,
        }
        types := []*jobtypes.Type{&type1}
@@ -68,3 +68,40 @@ func TestRegisterTypes(t *testing.T) {
        assertions.Equal(expectedBody, body)
        clientMock.AssertNumberOfCalls(t, "Do", 1)
 }
+
+func TestRegisterProducer(t *testing.T) {
+       assertions := require.New(t)
+
+       clientMock := mocks.HTTPClient{}
+
+       clientMock.On("Do", mock.Anything).Return(&http.Response{
+               StatusCode: http.StatusCreated,
+       }, nil)
+
+       restclient.Client = &clientMock
+
+       producer := ProducerRegistrationInfo{
+               InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
+               SupportedInfoTypes:                 []string{"type1"},
+               InfoJobCallbackUrl:                 "jobCallbackUrl",
+       }
+
+       r := NewRegistratorImpl("http://localhost:9990")
+       err := r.RegisterProducer("Producer1", &producer)
+
+       assertions.Nil(err)
+       var actualRequest *http.Request
+       clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+               actualRequest = req
+               return true
+       }))
+       assertions.Equal(http.MethodPut, actualRequest.Method)
+       assertions.Equal("http", actualRequest.URL.Scheme)
+       assertions.Equal("localhost:9990", actualRequest.URL.Host)
+       assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path)
+       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       body, _ := ioutil.ReadAll(actualRequest.Body)
+       expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`)
+       assertions.Equal(expectedBody, body)
+       clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
index 14d837d..894c586 100644 (file)
@@ -27,11 +27,12 @@ import (
 )
 
 type Type struct {
-       Name   string
+       TypeId string
        Schema string
 }
 
 var typeDir = "configs"
+var supportedTypes = make([]string, 0)
 
 func GetTypes() ([]*Type, error) {
        types := make([]*Type, 0, 1)
@@ -53,15 +54,21 @@ func GetTypes() ([]*Type, error) {
        return types, nil
 }
 
+func GetSupportedTypes() []string {
+       return supportedTypes
+}
+
 func getType(path string) (*Type, error) {
        fileName := filepath.Base(path)
        typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
 
        if typeSchema, err := os.ReadFile(path); err == nil {
-               return &Type{
-                       Name:   typeName,
+               typeInfo := Type{
+                       TypeId: typeName,
                        Schema: string(typeSchema),
-               }, nil
+               }
+               supportedTypes = append(supportedTypes, typeName)
+               return &typeInfo, nil
        } else {
                return nil, err
        }
index 5fdc378..195fc4c 100644 (file)
@@ -30,7 +30,7 @@ import (
 
 const type1Schema = `{"title": "Type 1"}`
 
-func TestGetTypes_filesOkShouldReturnSliceOfTypes(t *testing.T) {
+func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
        assertions := require.New(t)
        typesDir, err := os.MkdirTemp("", "configs")
        if err != nil {
@@ -44,10 +44,13 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypes(t *testing.T) {
        }
        types, err := GetTypes()
        wantedType := Type{
-               Name:   "type1",
+               TypeId: "type1",
                Schema: type1Schema,
        }
        wantedTypes := []*Type{&wantedType}
        assertions.EqualValues(wantedTypes, types)
        assertions.Nil(err)
+
+       supportedTypes := GetSupportedTypes()
+       assertions.EqualValues([]string{"type1"}, supportedTypes)
 }
index 240bdbd..d38496f 100644 (file)
@@ -39,8 +39,11 @@ func init() {
        }
 
        log.Debug("Initializing DMaaP Mediator Producer")
-       if configuration.JobResultUri == "" {
-               log.Fatal("Missing JOB_RESULT_URI")
+       if configuration.InfoJobCallbackUrl == "" {
+               log.Fatal("Missing INFO_JOB_CALLBACK_URL")
+       }
+       if configuration.InfoProducerSupervisionCallbackUrl == "" {
+               log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_URL")
        }
 
        registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
@@ -51,6 +54,14 @@ func init() {
        } else {
                log.Fatalf("Unable to get types to register due to: %v", err)
        }
+       producer := config.ProducerRegistrationInfo{
+               InfoProducerSupervisionCallbackUrl: configuration.InfoProducerSupervisionCallbackUrl,
+               SupportedInfoTypes:                 jobtypes.GetSupportedTypes(),
+               InfoJobCallbackUrl:                 configuration.InfoJobCallbackUrl,
+       }
+       if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
+               log.Fatalf("Unable to register producer due to: %v", err)
+       }
 }
 
 func main() {