From 63a42cacf9c52b7dff64431a3354f55c49bd6e4b Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Mon, 6 Sep 2021 22:16:24 +0200 Subject: [PATCH] Initial job creation Issue-ID: NONRTRIC-585 Signed-off-by: elinuxhenrik Change-Id: I687d3305ba8d152b8c330f57b598298f03934328 --- dmaap-mediator-producer/internal/config/config.go | 22 ++++- .../internal/config/config_test.go | 43 ++++++++-- .../internal/config/registrator.go | 6 +- .../{jobtypes/jobtypes.go => jobs/jobs.go} | 55 ++++++++++++- .../jobtypes_test.go => jobs/jobs_test.go} | 19 ++++- dmaap-mediator-producer/internal/server/server.go | 43 +++++++++- .../internal/server/server_test.go | 94 +++++++++++++++++++++- dmaap-mediator-producer/main.go | 30 +++++-- dmaap-mediator-producer/mocks/JobHandler.go | 27 +++++++ 9 files changed, 309 insertions(+), 30 deletions(-) rename dmaap-mediator-producer/internal/{jobtypes/jobtypes.go => jobs/jobs.go} (60%) rename dmaap-mediator-producer/internal/{jobtypes/jobtypes_test.go => jobs/jobs_test.go} (80%) create mode 100644 dmaap-mediator-producer/mocks/JobHandler.go diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index 764e89c2..3616c584 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -22,14 +22,17 @@ package config import ( "os" + "strconv" + + log "github.com/sirupsen/logrus" ) type Config struct { LogLevel string InfoProducerSupervisionCallbackHost string - InfoProducerSupervisionCallbackPort string + InfoProducerSupervisionCallbackPort int InfoJobCallbackHost string - InfoJobCallbackPort string + InfoJobCallbackPort int InfoCoordinatorAddress string } @@ -43,9 +46,9 @@ func New() *Config { return &Config{ LogLevel: getEnv("LOG_LEVEL", "Info"), InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""), - InfoProducerSupervisionCallbackPort: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8085"), + InfoProducerSupervisionCallbackPort: getEnvAsInt("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", 8085), InfoJobCallbackHost: getEnv("INFO_JOB_CALLBACK_HOST", ""), - InfoJobCallbackPort: getEnv("INFO_JOB_CALLBACK_PORT", "8086"), + InfoJobCallbackPort: getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086), InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"), } } @@ -57,3 +60,14 @@ func getEnv(key string, defaultVal string) string { return defaultVal } + +func getEnvAsInt(name string, defaultVal int) int { + valueStr := getEnv(name, "") + if value, err := strconv.Atoi(valueStr); err == nil { + return value + } else if valueStr != "" { + log.Warnf("Invalid int value: %v for variable: %v. Default value: %v will be used", valueStr, name, defaultVal) + } + + return defaultVal +} diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index 6b02e420..23227399 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -21,25 +21,29 @@ package config import ( + "bytes" "os" "reflect" "testing" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" ) func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Setenv("LOG_LEVEL", "Debug") os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost") - os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "supervisionCallbackPort") + os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8095") os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost") - os.Setenv("INFO_JOB_CALLBACK_PORT", "jobCallbackPort") + os.Setenv("INFO_JOB_CALLBACK_PORT", "8096") os.Setenv("INFO_COORD_ADDR", "infoCoordAddr") defer os.Clearenv() wantConfig := Config{ LogLevel: "Debug", InfoProducerSupervisionCallbackHost: "supervisionCallbackHost", - InfoProducerSupervisionCallbackPort: "supervisionCallbackPort", + InfoProducerSupervisionCallbackPort: 8095, InfoJobCallbackHost: "jobCallbackHost", - InfoJobCallbackPort: "jobCallbackPort", + InfoJobCallbackPort: 8096, InfoCoordinatorAddress: "infoCoordAddr", } if got := New(); !reflect.DeepEqual(got, &wantConfig) { @@ -47,13 +51,40 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { } } +func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) { + os.Clearenv() + assertions := require.New(t) + var buf bytes.Buffer + log.SetOutput(&buf) + defer func() { + log.SetOutput(os.Stderr) + }() + + os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "wrong") + defer os.Clearenv() + wantConfig := Config{ + LogLevel: "Info", + InfoProducerSupervisionCallbackHost: "", + InfoProducerSupervisionCallbackPort: 8085, + InfoJobCallbackHost: "", + InfoJobCallbackPort: 8086, + InfoCoordinatorAddress: "http://enrichmentservice:8083", + } + if got := New(); !reflect.DeepEqual(got, &wantConfig) { + t.Errorf("New() = %v, want %v", got, &wantConfig) + } + logString := buf.String() + assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_SUPERVISION_CALLBACK_PORT. Default value: 8085 will be used") +} + func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) { + os.Clearenv() wantConfig := Config{ LogLevel: "Info", InfoProducerSupervisionCallbackHost: "", - InfoProducerSupervisionCallbackPort: "8085", + InfoProducerSupervisionCallbackPort: 8085, InfoJobCallbackHost: "", - InfoJobCallbackPort: "8086", + InfoJobCallbackPort: 8086, InfoCoordinatorAddress: "http://enrichmentservice:8083", } if got := New(); !reflect.DeepEqual(got, &wantConfig) { diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go index eaf8752f..37225eda 100644 --- a/dmaap-mediator-producer/internal/config/registrator.go +++ b/dmaap-mediator-producer/internal/config/registrator.go @@ -27,7 +27,7 @@ import ( log "github.com/sirupsen/logrus" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) @@ -35,7 +35,7 @@ const registerTypePath = "/data-producer/v1/info-types/" const registerProducerPath = "/data-producer/v1/info-producers/" type Registrator interface { - RegisterTypes(types []*jobtypes.Type) error + RegisterTypes(types []*jobs.Type) error RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) } @@ -49,7 +49,7 @@ func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl { } } -func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error { +func (r RegistratorImpl) RegisterTypes(jobTypes []*jobs.Type) error { for _, jobType := range jobTypes { body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema) if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil { diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go b/dmaap-mediator-producer/internal/jobs/jobs.go similarity index 60% rename from dmaap-mediator-producer/internal/jobtypes/jobtypes.go rename to dmaap-mediator-producer/internal/jobs/jobs.go index 894c586e..10eaf680 100644 --- a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -18,7 +18,7 @@ // ========================LICENSE_END=================================== // -package jobtypes +package jobs import ( "os" @@ -31,8 +31,45 @@ type Type struct { Schema string } -var typeDir = "configs" -var supportedTypes = make([]string, 0) +type JobInfo struct { + Owner string `json:"owner"` + LastUpdated string `json:"last_updated"` + InfoJobIdentity string `json:"info_job_identity"` + TargetUri string `json:"target_uri"` + InfoJobData string `json:"info_job_data"` + InfoTypeIdentity string `json:"info_type_identity"` +} + +type JobHandler interface { + AddJob(JobInfo) error +} + +var ( + typeDir = "configs" + Handler JobHandler + allJobs = make(map[string]map[string]JobInfo) +) + +func init() { + Handler = newJobHandlerImpl() +} + +type jobHandlerImpl struct{} + +func newJobHandlerImpl() *jobHandlerImpl { + return &jobHandlerImpl{} +} + +func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { + if jobs, ok := allJobs[ji.InfoTypeIdentity]; ok { + if _, ok := jobs[ji.InfoJobIdentity]; ok { + // TODO: Update job + } else { + jobs[ji.InfoJobIdentity] = ji + } + } + return nil +} func GetTypes() ([]*Type, error) { types := make([]*Type, 0, 1) @@ -55,9 +92,17 @@ func GetTypes() ([]*Type, error) { } func GetSupportedTypes() []string { + supportedTypes := []string{} + for k := range allJobs { + supportedTypes = append(supportedTypes, k) + } return supportedTypes } +func AddJob(job JobInfo) error { + return Handler.AddJob(job) +} + func getType(path string) (*Type, error) { fileName := filepath.Base(path) typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName)) @@ -67,7 +112,9 @@ func getType(path string) (*Type, error) { TypeId: typeName, Schema: string(typeSchema), } - supportedTypes = append(supportedTypes, typeName) + if _, ok := allJobs[typeName]; !ok { + allJobs[typeName] = make(map[string]JobInfo) + } return &typeInfo, nil } else { return nil, err diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go similarity index 80% rename from dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go rename to dmaap-mediator-producer/internal/jobs/jobs_test.go index 195fc4cf..42fc3e23 100644 --- a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -18,7 +18,7 @@ // ========================LICENSE_END=================================== // -package jobtypes +package jobs import ( "os" @@ -54,3 +54,20 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes supportedTypes := GetSupportedTypes() assertions.EqualValues([]string{"type1"}, supportedTypes) } + +func TestAddJob_shouldAddJobToAllJobsMap(t *testing.T) { + assertions := require.New(t) + allJobs["type1"] = make(map[string]JobInfo) + jobInfo := JobInfo{ + Owner: "owner", + LastUpdated: "now", + InfoJobIdentity: "job1", + TargetUri: "target", + InfoJobData: "{}", + InfoTypeIdentity: "type1", + } + + err := AddJob(jobInfo) + assertions.Nil(err) + assertions.Equal(1, len(allJobs["type1"])) +} diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go index ca30d73e..c3a1331c 100644 --- a/dmaap-mediator-producer/internal/server/server.go +++ b/dmaap-mediator-producer/internal/server/server.go @@ -21,8 +21,12 @@ package server import ( + "encoding/json" "fmt" + "io/ioutil" "net/http" + + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" ) func StatusHandler(w http.ResponseWriter, r *http.Request) { @@ -32,9 +36,46 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { } if r.Method != "GET" { - http.Error(w, "Method is not supported.", http.StatusNotFound) + http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed) return } fmt.Fprintf(w, "All is well!") } + +func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/producer_simulator/info_job" { + http.Error(w, "404 not found.", http.StatusNotFound) + return + } + + if r.Method != "POST" { + http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed) + return + } + + b, readErr := ioutil.ReadAll(r.Body) + if readErr != nil { + http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest) + return + } + jobInfo := jobs.JobInfo{} + if unmarshalErr := json.Unmarshal(b, &jobInfo); unmarshalErr != nil { + http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest) + return + } + if err := jobs.AddJob(jobInfo); err != nil { + http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest) + } +} + +func CreateServer(port int, handlerFunc func(http.ResponseWriter, *http.Request)) *http.Server { + + mux := http.NewServeMux() + mux.HandleFunc("/", handlerFunc) + server := http.Server{ + Addr: fmt.Sprintf(":%v", port), // :{port} + Handler: mux, + } + return &server +} diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index 92132165..d221c933 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -21,12 +21,18 @@ package server import ( + "bytes" + "encoding/json" + "errors" "io" + "io/ioutil" "net/http" "net/http/httptest" "testing" "github.com/stretchr/testify/require" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks" ) func TestStatusHandler(t *testing.T) { @@ -60,12 +66,12 @@ func TestStatusHandler(t *testing.T) { wantedBody: "404 not found.\n", }, { - name: "StatusHandler with incorrect method, should return NotFound", + name: "StatusHandler with incorrect method, should return MethodNotAllowed", args: args{ responseRecorder: httptest.NewRecorder(), r: newRequest("PUT", "/", nil, t), }, - wantedStatus: http.StatusNotFound, + wantedStatus: http.StatusMethodNotAllowed, wantedBody: "Method is not supported.\n", }, } @@ -80,7 +86,89 @@ func TestStatusHandler(t *testing.T) { } } -func newRequest(method string, url string, body io.Reader, t *testing.T) *http.Request { +func TestCreateInfoJobHandler(t *testing.T) { + assertions := require.New(t) + jobHandlerMock := mocks.JobHandler{} + + goodJobInfo := jobs.JobInfo{ + Owner: "owner", + LastUpdated: "now", + InfoJobIdentity: "jobId", + TargetUri: "target", + InfoJobData: "{}", + InfoTypeIdentity: "type", + } + badJobInfo := jobs.JobInfo{ + Owner: "bad", + } + jobHandlerMock.On("AddJob", goodJobInfo).Return(nil) + jobHandlerMock.On("AddJob", badJobInfo).Return(errors.New("error")) + jobs.Handler = &jobHandlerMock + + type args struct { + responseRecorder *httptest.ResponseRecorder + r *http.Request + } + tests := []struct { + name string + args args + wantedStatus int + wantedBody string + }{ + { + name: "CreateInfoJobHandler with correct path and method, should return OK", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("POST", "/producer_simulator/info_job", &goodJobInfo, t), + }, + wantedStatus: http.StatusOK, + wantedBody: "", + }, + { + name: "CreateInfoJobHandler with incorrect job info, should return BadRequest", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("POST", "/producer_simulator/info_job", &badJobInfo, t), + }, + wantedStatus: http.StatusBadRequest, + wantedBody: "Invalid job info. Cause: error", + }, + { + name: "CreateInfoJobHandler with incorrect path, should return NotFound", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("GET", "/wrong", nil, t), + }, + wantedStatus: http.StatusNotFound, + wantedBody: "404 not found.", + }, + { + name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed", + args: args{ + responseRecorder: httptest.NewRecorder(), + r: newRequest("PUT", "/producer_simulator/info_job", nil, t), + }, + wantedStatus: http.StatusMethodNotAllowed, + wantedBody: "Method is not supported.", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(CreateInfoJobHandler) + handler.ServeHTTP(tt.args.responseRecorder, tt.args.r) + assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code) + + assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody) + }) + } +} + +func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request { + var body io.Reader + if jobInfo != nil { + bodyAsBytes, _ := json.Marshal(jobInfo) + body = ioutil.NopCloser(bytes.NewReader(bodyAsBytes)) + } if req, err := http.NewRequest(method, url, body); err == nil { return req } else { diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index b357f696..3fe92dca 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -22,11 +22,11 @@ package main import ( "fmt" - "net/http" + "sync" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" "oransc.org/nonrtric/dmaapmediatorproducer/internal/server" ) @@ -54,7 +54,7 @@ func init() { jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort) registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress) - if types, err := jobtypes.GetTypes(); err == nil { + if types, err := jobs.GetTypes(); err == nil { if regErr := registrator.RegisterTypes(types); regErr != nil { log.Fatalf("Unable to register all types due to: %v", regErr) } @@ -63,7 +63,7 @@ func init() { } producer := config.ProducerRegistrationInfo{ InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress, - SupportedInfoTypes: jobtypes.GetSupportedTypes(), + SupportedInfoTypes: jobs.GetSupportedTypes(), InfoJobCallbackUrl: jobInfoCallbackAddress, } if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { @@ -73,11 +73,25 @@ func init() { func main() { log.Debug("Starting DMaaP Mediator Producer") + wg := new(sync.WaitGroup) + + // add two goroutines to `wg` WaitGroup, one for each avilable server + wg.Add(2) + log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort) - http.HandleFunc("/", server.StatusHandler) + go func() { + server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler) + log.Warn(server.ListenAndServe()) + wg.Done() + }() - if err := http.ListenAndServe(":"+configuration.InfoProducerSupervisionCallbackPort, nil); err != nil { - log.Fatal(err) - } + go func() { + server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler) + log.Warn(server.ListenAndServe()) + wg.Done() + }() + + // wait until WaitGroup is done + wg.Wait() log.Debug("Stopping DMaaP Mediator Producer") } diff --git a/dmaap-mediator-producer/mocks/JobHandler.go b/dmaap-mediator-producer/mocks/JobHandler.go new file mode 100644 index 00000000..4914e4d7 --- /dev/null +++ b/dmaap-mediator-producer/mocks/JobHandler.go @@ -0,0 +1,27 @@ +// Code generated by mockery v2.9.3. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" +) + +// JobHandler is an autogenerated mock type for the JobHandler type +type JobHandler struct { + mock.Mock +} + +// AddJob provides a mock function with given fields: _a0 +func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(jobs.JobInfo) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} -- 2.16.6