Initial job creation 00/6700/2
authorelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 6 Sep 2021 20:16:24 +0000 (22:16 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 6 Sep 2021 20:32:17 +0000 (22:32 +0200)
Issue-ID: NONRTRIC-585
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I687d3305ba8d152b8c330f57b598298f03934328

dmaap-mediator-producer/internal/config/config.go
dmaap-mediator-producer/internal/config/config_test.go
dmaap-mediator-producer/internal/config/registrator.go
dmaap-mediator-producer/internal/jobs/jobs.go [moved from dmaap-mediator-producer/internal/jobtypes/jobtypes.go with 60% similarity]
dmaap-mediator-producer/internal/jobs/jobs_test.go [moved from dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go with 80% similarity]
dmaap-mediator-producer/internal/server/server.go
dmaap-mediator-producer/internal/server/server_test.go
dmaap-mediator-producer/main.go
dmaap-mediator-producer/mocks/JobHandler.go [new file with mode: 0644]

index 764e89c..3616c58 100644 (file)
@@ -22,14 +22,17 @@ package config
 
 import (
        "os"
+       "strconv"
+
+       log "github.com/sirupsen/logrus"
 )
 
 type Config struct {
        LogLevel                            string
        InfoProducerSupervisionCallbackHost string
-       InfoProducerSupervisionCallbackPort string
+       InfoProducerSupervisionCallbackPort int
        InfoJobCallbackHost                 string
-       InfoJobCallbackPort                 string
+       InfoJobCallbackPort                 int
        InfoCoordinatorAddress              string
 }
 
@@ -43,9 +46,9 @@ func New() *Config {
        return &Config{
                LogLevel:                            getEnv("LOG_LEVEL", "Info"),
                InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""),
-               InfoProducerSupervisionCallbackPort: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8085"),
+               InfoProducerSupervisionCallbackPort: getEnvAsInt("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", 8085),
                InfoJobCallbackHost:                 getEnv("INFO_JOB_CALLBACK_HOST", ""),
-               InfoJobCallbackPort:                 getEnv("INFO_JOB_CALLBACK_PORT", "8086"),
+               InfoJobCallbackPort:                 getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086),
                InfoCoordinatorAddress:              getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
        }
 }
@@ -57,3 +60,14 @@ func getEnv(key string, defaultVal string) string {
 
        return defaultVal
 }
+
+func getEnvAsInt(name string, defaultVal int) int {
+       valueStr := getEnv(name, "")
+       if value, err := strconv.Atoi(valueStr); err == nil {
+               return value
+       } else if valueStr != "" {
+               log.Warnf("Invalid int value: %v for variable: %v. Default value: %v will be used", valueStr, name, defaultVal)
+       }
+
+       return defaultVal
+}
index 6b02e42..2322739 100644 (file)
 package config
 
 import (
+       "bytes"
        "os"
        "reflect"
        "testing"
+
+       log "github.com/sirupsen/logrus"
+       "github.com/stretchr/testify/require"
 )
 
 func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        os.Setenv("LOG_LEVEL", "Debug")
        os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost")
-       os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "supervisionCallbackPort")
+       os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8095")
        os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
-       os.Setenv("INFO_JOB_CALLBACK_PORT", "jobCallbackPort")
+       os.Setenv("INFO_JOB_CALLBACK_PORT", "8096")
        os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
        defer os.Clearenv()
        wantConfig := Config{
                LogLevel:                            "Debug",
                InfoProducerSupervisionCallbackHost: "supervisionCallbackHost",
-               InfoProducerSupervisionCallbackPort: "supervisionCallbackPort",
+               InfoProducerSupervisionCallbackPort: 8095,
                InfoJobCallbackHost:                 "jobCallbackHost",
-               InfoJobCallbackPort:                 "jobCallbackPort",
+               InfoJobCallbackPort:                 8096,
                InfoCoordinatorAddress:              "infoCoordAddr",
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
@@ -47,13 +51,40 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        }
 }
 
+func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) {
+       os.Clearenv()
+       assertions := require.New(t)
+       var buf bytes.Buffer
+       log.SetOutput(&buf)
+       defer func() {
+               log.SetOutput(os.Stderr)
+       }()
+
+       os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "wrong")
+       defer os.Clearenv()
+       wantConfig := Config{
+               LogLevel:                            "Info",
+               InfoProducerSupervisionCallbackHost: "",
+               InfoProducerSupervisionCallbackPort: 8085,
+               InfoJobCallbackHost:                 "",
+               InfoJobCallbackPort:                 8086,
+               InfoCoordinatorAddress:              "http://enrichmentservice:8083",
+       }
+       if got := New(); !reflect.DeepEqual(got, &wantConfig) {
+               t.Errorf("New() = %v, want %v", got, &wantConfig)
+       }
+       logString := buf.String()
+       assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_SUPERVISION_CALLBACK_PORT. Default value: 8085 will be used")
+}
+
 func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
+       os.Clearenv()
        wantConfig := Config{
                LogLevel:                            "Info",
                InfoProducerSupervisionCallbackHost: "",
-               InfoProducerSupervisionCallbackPort: "8085",
+               InfoProducerSupervisionCallbackPort: 8085,
                InfoJobCallbackHost:                 "",
-               InfoJobCallbackPort:                 "8086",
+               InfoJobCallbackPort:                 8086,
                InfoCoordinatorAddress:              "http://enrichmentservice:8083",
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
index eaf8752..37225ed 100644 (file)
@@ -27,7 +27,7 @@ import (
 
        log "github.com/sirupsen/logrus"
 
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
@@ -35,7 +35,7 @@ const registerTypePath = "/data-producer/v1/info-types/"
 const registerProducerPath = "/data-producer/v1/info-producers/"
 
 type Registrator interface {
-       RegisterTypes(types []*jobtypes.Type) error
+       RegisterTypes(types []*jobs.Type) error
        RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
 }
 
@@ -49,7 +49,7 @@ func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl {
        }
 }
 
-func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error {
+func (r RegistratorImpl) RegisterTypes(jobTypes []*jobs.Type) error {
        for _, jobType := range jobTypes {
                body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
                if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
@@ -18,7 +18,7 @@
 //   ========================LICENSE_END===================================
 //
 
-package jobtypes
+package jobs
 
 import (
        "os"
@@ -31,8 +31,45 @@ type Type struct {
        Schema string
 }
 
-var typeDir = "configs"
-var supportedTypes = make([]string, 0)
+type JobInfo struct {
+       Owner            string `json:"owner"`
+       LastUpdated      string `json:"last_updated"`
+       InfoJobIdentity  string `json:"info_job_identity"`
+       TargetUri        string `json:"target_uri"`
+       InfoJobData      string `json:"info_job_data"`
+       InfoTypeIdentity string `json:"info_type_identity"`
+}
+
+type JobHandler interface {
+       AddJob(JobInfo) error
+}
+
+var (
+       typeDir = "configs"
+       Handler JobHandler
+       allJobs = make(map[string]map[string]JobInfo)
+)
+
+func init() {
+       Handler = newJobHandlerImpl()
+}
+
+type jobHandlerImpl struct{}
+
+func newJobHandlerImpl() *jobHandlerImpl {
+       return &jobHandlerImpl{}
+}
+
+func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
+       if jobs, ok := allJobs[ji.InfoTypeIdentity]; ok {
+               if _, ok := jobs[ji.InfoJobIdentity]; ok {
+                       // TODO: Update job
+               } else {
+                       jobs[ji.InfoJobIdentity] = ji
+               }
+       }
+       return nil
+}
 
 func GetTypes() ([]*Type, error) {
        types := make([]*Type, 0, 1)
@@ -55,9 +92,17 @@ func GetTypes() ([]*Type, error) {
 }
 
 func GetSupportedTypes() []string {
+       supportedTypes := []string{}
+       for k := range allJobs {
+               supportedTypes = append(supportedTypes, k)
+       }
        return supportedTypes
 }
 
+func AddJob(job JobInfo) error {
+       return Handler.AddJob(job)
+}
+
 func getType(path string) (*Type, error) {
        fileName := filepath.Base(path)
        typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
@@ -67,7 +112,9 @@ func getType(path string) (*Type, error) {
                        TypeId: typeName,
                        Schema: string(typeSchema),
                }
-               supportedTypes = append(supportedTypes, typeName)
+               if _, ok := allJobs[typeName]; !ok {
+                       allJobs[typeName] = make(map[string]JobInfo)
+               }
                return &typeInfo, nil
        } else {
                return nil, err
@@ -18,7 +18,7 @@
 //   ========================LICENSE_END===================================
 //
 
-package jobtypes
+package jobs
 
 import (
        "os"
@@ -54,3 +54,20 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes
        supportedTypes := GetSupportedTypes()
        assertions.EqualValues([]string{"type1"}, supportedTypes)
 }
+
+func TestAddJob_shouldAddJobToAllJobsMap(t *testing.T) {
+       assertions := require.New(t)
+       allJobs["type1"] = make(map[string]JobInfo)
+       jobInfo := JobInfo{
+               Owner:            "owner",
+               LastUpdated:      "now",
+               InfoJobIdentity:  "job1",
+               TargetUri:        "target",
+               InfoJobData:      "{}",
+               InfoTypeIdentity: "type1",
+       }
+
+       err := AddJob(jobInfo)
+       assertions.Nil(err)
+       assertions.Equal(1, len(allJobs["type1"]))
+}
index ca30d73..c3a1331 100644 (file)
 package server
 
 import (
+       "encoding/json"
        "fmt"
+       "io/ioutil"
        "net/http"
+
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
 func StatusHandler(w http.ResponseWriter, r *http.Request) {
@@ -32,9 +36,46 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
        }
 
        if r.Method != "GET" {
-               http.Error(w, "Method is not supported.", http.StatusNotFound)
+               http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed)
                return
        }
 
        fmt.Fprintf(w, "All is well!")
 }
+
+func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) {
+       if r.URL.Path != "/producer_simulator/info_job" {
+               http.Error(w, "404 not found.", http.StatusNotFound)
+               return
+       }
+
+       if r.Method != "POST" {
+               http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed)
+               return
+       }
+
+       b, readErr := ioutil.ReadAll(r.Body)
+       if readErr != nil {
+               http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest)
+               return
+       }
+       jobInfo := jobs.JobInfo{}
+       if unmarshalErr := json.Unmarshal(b, &jobInfo); unmarshalErr != nil {
+               http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest)
+               return
+       }
+       if err := jobs.AddJob(jobInfo); err != nil {
+               http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
+       }
+}
+
+func CreateServer(port int, handlerFunc func(http.ResponseWriter, *http.Request)) *http.Server {
+
+       mux := http.NewServeMux()
+       mux.HandleFunc("/", handlerFunc)
+       server := http.Server{
+               Addr:    fmt.Sprintf(":%v", port), // :{port}
+               Handler: mux,
+       }
+       return &server
+}
index 9213216..d221c93 100644 (file)
 package server
 
 import (
+       "bytes"
+       "encoding/json"
+       "errors"
        "io"
+       "io/ioutil"
        "net/http"
        "net/http/httptest"
        "testing"
 
        "github.com/stretchr/testify/require"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+       "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
 )
 
 func TestStatusHandler(t *testing.T) {
@@ -60,12 +66,12 @@ func TestStatusHandler(t *testing.T) {
                        wantedBody:   "404 not found.\n",
                },
                {
-                       name: "StatusHandler with incorrect method, should return NotFound",
+                       name: "StatusHandler with incorrect method, should return MethodNotAllowed",
                        args: args{
                                responseRecorder: httptest.NewRecorder(),
                                r:                newRequest("PUT", "/", nil, t),
                        },
-                       wantedStatus: http.StatusNotFound,
+                       wantedStatus: http.StatusMethodNotAllowed,
                        wantedBody:   "Method is not supported.\n",
                },
        }
@@ -80,7 +86,89 @@ func TestStatusHandler(t *testing.T) {
        }
 }
 
-func newRequest(method string, url string, body io.Reader, t *testing.T) *http.Request {
+func TestCreateInfoJobHandler(t *testing.T) {
+       assertions := require.New(t)
+       jobHandlerMock := mocks.JobHandler{}
+
+       goodJobInfo := jobs.JobInfo{
+               Owner:            "owner",
+               LastUpdated:      "now",
+               InfoJobIdentity:  "jobId",
+               TargetUri:        "target",
+               InfoJobData:      "{}",
+               InfoTypeIdentity: "type",
+       }
+       badJobInfo := jobs.JobInfo{
+               Owner: "bad",
+       }
+       jobHandlerMock.On("AddJob", goodJobInfo).Return(nil)
+       jobHandlerMock.On("AddJob", badJobInfo).Return(errors.New("error"))
+       jobs.Handler = &jobHandlerMock
+
+       type args struct {
+               responseRecorder *httptest.ResponseRecorder
+               r                *http.Request
+       }
+       tests := []struct {
+               name         string
+               args         args
+               wantedStatus int
+               wantedBody   string
+       }{
+               {
+                       name: "CreateInfoJobHandler with correct path and method, should return OK",
+                       args: args{
+                               responseRecorder: httptest.NewRecorder(),
+                               r:                newRequest("POST", "/producer_simulator/info_job", &goodJobInfo, t),
+                       },
+                       wantedStatus: http.StatusOK,
+                       wantedBody:   "",
+               },
+               {
+                       name: "CreateInfoJobHandler with incorrect job info, should return BadRequest",
+                       args: args{
+                               responseRecorder: httptest.NewRecorder(),
+                               r:                newRequest("POST", "/producer_simulator/info_job", &badJobInfo, t),
+                       },
+                       wantedStatus: http.StatusBadRequest,
+                       wantedBody:   "Invalid job info. Cause: error",
+               },
+               {
+                       name: "CreateInfoJobHandler with incorrect path, should return NotFound",
+                       args: args{
+                               responseRecorder: httptest.NewRecorder(),
+                               r:                newRequest("GET", "/wrong", nil, t),
+                       },
+                       wantedStatus: http.StatusNotFound,
+                       wantedBody:   "404 not found.",
+               },
+               {
+                       name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed",
+                       args: args{
+                               responseRecorder: httptest.NewRecorder(),
+                               r:                newRequest("PUT", "/producer_simulator/info_job", nil, t),
+                       },
+                       wantedStatus: http.StatusMethodNotAllowed,
+                       wantedBody:   "Method is not supported.",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       handler := http.HandlerFunc(CreateInfoJobHandler)
+                       handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
+                       assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code)
+
+                       assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody)
+               })
+       }
+}
+
+func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
+       var body io.Reader
+       if jobInfo != nil {
+               bodyAsBytes, _ := json.Marshal(jobInfo)
+               body = ioutil.NopCloser(bytes.NewReader(bodyAsBytes))
+       }
        if req, err := http.NewRequest(method, url, body); err == nil {
                return req
        } else {
index b357f69..3fe92dc 100644 (file)
@@ -22,11 +22,11 @@ package main
 
 import (
        "fmt"
-       "net/http"
+       "sync"
 
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
 
@@ -54,7 +54,7 @@ func init() {
        jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort)
 
        registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
-       if types, err := jobtypes.GetTypes(); err == nil {
+       if types, err := jobs.GetTypes(); err == nil {
                if regErr := registrator.RegisterTypes(types); regErr != nil {
                        log.Fatalf("Unable to register all types due to: %v", regErr)
                }
@@ -63,7 +63,7 @@ func init() {
        }
        producer := config.ProducerRegistrationInfo{
                InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress,
-               SupportedInfoTypes:                 jobtypes.GetSupportedTypes(),
+               SupportedInfoTypes:                 jobs.GetSupportedTypes(),
                InfoJobCallbackUrl:                 jobInfoCallbackAddress,
        }
        if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
@@ -73,11 +73,25 @@ func init() {
 
 func main() {
        log.Debug("Starting DMaaP Mediator Producer")
+       wg := new(sync.WaitGroup)
+
+       // add two goroutines to `wg` WaitGroup, one for each avilable server
+       wg.Add(2)
+
        log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
-       http.HandleFunc("/", server.StatusHandler)
+       go func() {
+               server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler)
+               log.Warn(server.ListenAndServe())
+               wg.Done()
+       }()
 
-       if err := http.ListenAndServe(":"+configuration.InfoProducerSupervisionCallbackPort, nil); err != nil {
-               log.Fatal(err)
-       }
+       go func() {
+               server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler)
+               log.Warn(server.ListenAndServe())
+               wg.Done()
+       }()
+
+       // wait until WaitGroup is done
+       wg.Wait()
        log.Debug("Stopping DMaaP Mediator Producer")
 }
diff --git a/dmaap-mediator-producer/mocks/JobHandler.go b/dmaap-mediator-producer/mocks/JobHandler.go
new file mode 100644 (file)
index 0000000..4914e4d
--- /dev/null
@@ -0,0 +1,27 @@
+// Code generated by mockery v2.9.3. DO NOT EDIT.
+
+package mocks
+
+import (
+       mock "github.com/stretchr/testify/mock"
+       jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+)
+
+// JobHandler is an autogenerated mock type for the JobHandler type
+type JobHandler struct {
+       mock.Mock
+}
+
+// AddJob provides a mock function with given fields: _a0
+func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error {
+       ret := _m.Called(_a0)
+
+       var r0 error
+       if rf, ok := ret.Get(0).(func(jobs.JobInfo) error); ok {
+               r0 = rf(_a0)
+       } else {
+               r0 = ret.Error(0)
+       }
+
+       return r0
+}