Callback to delete job 51/6751/4
authorelinuxhenrik <henrik.b.andersson@est.tech>
Fri, 24 Sep 2021 13:08:47 +0000 (15:08 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 27 Sep 2021 08:58:14 +0000 (10:58 +0200)
Issue-ID: NONRTRIC-587
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I2bdca3fc648ea3a909a0a60c43504d5f80ee05bc

dmaap-mediator-producer/go.mod
dmaap-mediator-producer/go.sum
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/internal/server/server.go
dmaap-mediator-producer/internal/server/server_test.go
dmaap-mediator-producer/main.go
dmaap-mediator-producer/mocks/jobhandler/JobHandler.go

index 7e0d96e..a47c542 100644 (file)
@@ -9,6 +9,7 @@ require (
 
 require (
        github.com/davecgh/go-spew v1.1.1 // indirect
+       github.com/gorilla/mux v1.8.0 // indirect
        github.com/pmezard/go-difflib v1.0.0 // indirect
        github.com/stretchr/objx v0.1.0 // indirect
        golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
index 2cf67a6..e43ad87 100644 (file)
@@ -1,6 +1,8 @@
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
index 09d3891..e5a1070 100644 (file)
@@ -55,6 +55,7 @@ type JobInfo struct {
 
 type JobHandler interface {
        AddJob(JobInfo) error
+       DeleteJob(jobId string)
 }
 
 var (
@@ -87,6 +88,15 @@ func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
        }
 }
 
+func (jh *jobHandlerImpl) DeleteJob(jobId string) {
+       mu.Lock()
+       defer mu.Unlock()
+       for _, typeData := range allTypes {
+               delete(typeData.Jobs, jobId)
+       }
+       log.Debug("Deleted job: ", jobId)
+}
+
 func validateJobInfo(ji JobInfo) error {
        if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
                return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
@@ -142,6 +152,10 @@ func AddJob(job JobInfo) error {
        return Handler.AddJob(job)
 }
 
+func DeleteJob(jobId string) {
+       Handler.DeleteJob(jobId)
+}
+
 func RunJobs(mRAddress string) {
        for {
                pollAndDistributeMessages(mRAddress)
index 3bb2578..6f29227 100644 (file)
@@ -132,6 +132,30 @@ func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
        clearAll()
 }
+
+func TestDeleteJob(t *testing.T) {
+       assertions := require.New(t)
+       jobToKeep := JobInfo{
+               InfoJobIdentity:  "job1",
+               InfoTypeIdentity: "type1",
+       }
+       jobToDelete := JobInfo{
+               InfoJobIdentity:  "job2",
+               InfoTypeIdentity: "type1",
+       }
+       allTypes["type1"] = TypeData{
+               TypeId: "type1",
+               Jobs:   map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete},
+       }
+       t.Cleanup(func() {
+               clearAll()
+       })
+
+       DeleteJob("job2")
+       assertions.Equal(1, len(allTypes["type1"].Jobs))
+       assertions.Equal(jobToKeep, allTypes["type1"].Jobs["job1"])
+}
+
 func TestPollAndDistributeMessages(t *testing.T) {
        assertions := require.New(t)
        jobInfo := JobInfo{
index 0b5e5b8..d07b730 100644 (file)
@@ -26,37 +26,28 @@ import (
        "io/ioutil"
        "net/http"
 
+       "github.com/gorilla/mux"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
-const StatusCallbackPath = "/status"
-const JobsCallbackPath = "/jobs"
+const StatusPath = "/status"
+const AddJobPath = "/jobs"
+const jobIdToken = "infoJobId"
+const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}"
 
-func StatusHandler(w http.ResponseWriter, r *http.Request) {
-       if r.URL.Path != StatusCallbackPath {
-               http.Error(w, "404 not found.", http.StatusNotFound)
-               return
-       }
-
-       if r.Method != "GET" {
-               http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed)
-               return
-       }
-
-       fmt.Fprintf(w, "All is well!")
+func NewRouter() *mux.Router {
+       r := mux.NewRouter()
+       r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status")
+       r.HandleFunc(AddJobPath, addInfoJobHandler).Methods(http.MethodPost).Name("add")
+       r.HandleFunc(deleteJobPath, deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
+       r.NotFoundHandler = &notFoundHandler{}
+       r.MethodNotAllowedHandler = &methodNotAllowedHandler{}
+       return r
 }
 
-func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) {
-       if r.URL.Path != JobsCallbackPath {
-               http.Error(w, "404 not found.", http.StatusNotFound)
-               return
-       }
-
-       if r.Method != "POST" {
-               http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed)
-               return
-       }
+func statusHandler(w http.ResponseWriter, r *http.Request) {}
 
+func addInfoJobHandler(w http.ResponseWriter, r *http.Request) {
        b, readErr := ioutil.ReadAll(r.Body)
        if readErr != nil {
                http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest)
@@ -71,3 +62,26 @@ func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) {
                http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
        }
 }
+
+func deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       id, ok := vars[jobIdToken]
+       if !ok {
+               http.Error(w, "Must provide infoJobId.", http.StatusBadRequest)
+               return
+       }
+
+       jobs.DeleteJob(id)
+}
+
+type notFoundHandler struct{}
+
+func (h *notFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       http.Error(w, "404 not found.", http.StatusNotFound)
+}
+
+type methodNotAllowedHandler struct{}
+
+func (h *methodNotAllowedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed)
+}
index a4b19c4..2fe936f 100644 (file)
@@ -30,63 +30,63 @@ import (
        "net/http/httptest"
        "testing"
 
+       "github.com/gorilla/mux"
+       "github.com/stretchr/testify/mock"
        "github.com/stretchr/testify/require"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
        "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler"
 )
 
+func TestNewRouter(t *testing.T) {
+       assertions := require.New(t)
+       r := NewRouter()
+       statusRoute := r.Get("status")
+       assertions.NotNil(statusRoute)
+       supportedMethods, err := statusRoute.GetMethods()
+       assertions.Equal([]string{http.MethodGet}, supportedMethods)
+       assertions.Nil(err)
+
+       addJobRoute := r.Get("add")
+       assertions.NotNil(addJobRoute)
+       supportedMethods, err = addJobRoute.GetMethods()
+       assertions.Equal([]string{http.MethodPost}, supportedMethods)
+       assertions.Nil(err)
+
+       deleteJobRoute := r.Get("delete")
+       assertions.NotNil(deleteJobRoute)
+       supportedMethods, err = deleteJobRoute.GetMethods()
+       assertions.Equal([]string{http.MethodDelete}, supportedMethods)
+       assertions.Nil(err)
+
+       notFoundHandler := r.NotFoundHandler
+       handler := http.HandlerFunc(notFoundHandler.ServeHTTP)
+       responseRecorder := httptest.NewRecorder()
+       handler.ServeHTTP(responseRecorder, newRequest("GET", "/wrong", nil, t))
+       assertions.Equal(http.StatusNotFound, responseRecorder.Code)
+
+       assertions.Contains(responseRecorder.Body.String(), "404 not found.")
+
+       methodNotAllowedHandler := r.MethodNotAllowedHandler
+       handler = http.HandlerFunc(methodNotAllowedHandler.ServeHTTP)
+       responseRecorder = httptest.NewRecorder()
+       handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t))
+       assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code)
+
+       assertions.Contains(responseRecorder.Body.String(), "Method is not supported.")
+}
+
 func TestStatusHandler(t *testing.T) {
        assertions := require.New(t)
-       type args struct {
-               responseRecorder *httptest.ResponseRecorder
-               r                *http.Request
-       }
-       tests := []struct {
-               name         string
-               args         args
-               wantedStatus int
-               wantedBody   string
-       }{
-               {
-                       name: "StatusHandler with correct path and method, should return OK",
-                       args: args{
-                               responseRecorder: httptest.NewRecorder(),
-                               r:                newRequest("GET", "/status", nil, t),
-                       },
-                       wantedStatus: http.StatusOK,
-                       wantedBody:   "All is well!",
-               },
-               {
-                       name: "StatusHandler with incorrect path, should return NotFound",
-                       args: args{
-                               responseRecorder: httptest.NewRecorder(),
-                               r:                newRequest("GET", "/wrong", nil, t),
-                       },
-                       wantedStatus: http.StatusNotFound,
-                       wantedBody:   "404 not found.\n",
-               },
-               {
-                       name: "StatusHandler with incorrect method, should return MethodNotAllowed",
-                       args: args{
-                               responseRecorder: httptest.NewRecorder(),
-                               r:                newRequest("PUT", "/status", nil, t),
-                       },
-                       wantedStatus: http.StatusMethodNotAllowed,
-                       wantedBody:   "Method is not supported.\n",
-               },
-       }
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       handler := http.HandlerFunc(StatusHandler)
-                       handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
-                       assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code)
+       responseRecorder := httptest.NewRecorder()
+       r := newRequest(http.MethodGet, "/status", nil, t)
+       handler := http.HandlerFunc(statusHandler)
+       handler.ServeHTTP(responseRecorder, r)
+       assertions.Equal(http.StatusOK, responseRecorder.Code)
 
-                       assertions.Equal(tt.wantedBody, tt.args.responseRecorder.Body.String())
-               })
-       }
+       assertions.Equal("", responseRecorder.Body.String())
 }
 
-func TestCreateInfoJobHandler(t *testing.T) {
+func TestAddInfoJobHandler(t *testing.T) {
        assertions := require.New(t)
        jobHandlerMock := jobhandler.JobHandler{}
 
@@ -114,55 +114,65 @@ func TestCreateInfoJobHandler(t *testing.T) {
                args         args
                wantedStatus int
                wantedBody   string
+               assertFunc   assertMockFunk
        }{
                {
-                       name: "CreateInfoJobHandler with correct path and method, should return OK",
+                       name: "AddInfoJobHandler with correct path and method, should return OK",
                        args: args{
                                responseRecorder: httptest.NewRecorder(),
-                               r:                newRequest("POST", "/jobs", &goodJobInfo, t),
+                               r:                newRequest(http.MethodPost, "/jobs", &goodJobInfo, t),
                        },
                        wantedStatus: http.StatusOK,
                        wantedBody:   "",
+                       assertFunc: func(mock *jobhandler.JobHandler) {
+                               mock.AssertCalled(t, "AddJob", goodJobInfo)
+                       },
                },
                {
-                       name: "CreateInfoJobHandler with incorrect job info, should return BadRequest",
+                       name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
                        args: args{
                                responseRecorder: httptest.NewRecorder(),
-                               r:                newRequest("POST", "/jobs", &badJobInfo, t),
+                               r:                newRequest(http.MethodPost, "/jobs", &badJobInfo, t),
                        },
                        wantedStatus: http.StatusBadRequest,
                        wantedBody:   "Invalid job info. Cause: error",
                },
-               {
-                       name: "CreateInfoJobHandler with incorrect path, should return NotFound",
-                       args: args{
-                               responseRecorder: httptest.NewRecorder(),
-                               r:                newRequest("GET", "/wrong", nil, t),
-                       },
-                       wantedStatus: http.StatusNotFound,
-                       wantedBody:   "404 not found.",
-               },
-               {
-                       name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed",
-                       args: args{
-                               responseRecorder: httptest.NewRecorder(),
-                               r:                newRequest("PUT", "/jobs", nil, t),
-                       },
-                       wantedStatus: http.StatusMethodNotAllowed,
-                       wantedBody:   "Method is not supported.",
-               },
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       handler := http.HandlerFunc(CreateInfoJobHandler)
+                       handler := http.HandlerFunc(addInfoJobHandler)
                        handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
-                       assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code)
+                       assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code, tt.name)
 
-                       assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody)
+                       assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody, tt.name)
+
+                       if tt.assertFunc != nil {
+                               tt.assertFunc(&jobHandlerMock)
+                       }
                })
        }
 }
 
+func TestDeleteJob(t *testing.T) {
+       assertions := require.New(t)
+       jobHandlerMock := jobhandler.JobHandler{}
+
+       jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil)
+       jobs.Handler = &jobHandlerMock
+
+       responseRecorder := httptest.NewRecorder()
+       r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"})
+       handler := http.HandlerFunc(deleteInfoJobHandler)
+       handler.ServeHTTP(responseRecorder, r)
+       assertions.Equal(http.StatusOK, http.StatusOK)
+
+       assertions.Equal("", responseRecorder.Body.String())
+
+       jobHandlerMock.AssertCalled(t, "DeleteJob", "job1")
+}
+
+type assertMockFunk func(mock *jobhandler.JobHandler)
+
 func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
        var body io.Reader
        if jobInfo != nil {
index 79fcb6b..15207ec 100644 (file)
@@ -57,9 +57,9 @@ func init() {
                log.Fatalf("Unable to get types to register due to: %v", err)
        }
        producer := config.ProducerRegistrationInfo{
-               InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusCallbackPath,
+               InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
                SupportedInfoTypes:                 jobs.GetSupportedTypes(),
-               InfoJobCallbackUrl:                 callbackAddress + server.JobsCallbackPath,
+               InfoJobCallbackUrl:                 callbackAddress + server.AddJobPath,
        }
        if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
                log.Fatalf("Unable to register producer due to: %v", err)
@@ -75,9 +75,8 @@ func main() {
 
        log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
        go func() {
-               http.HandleFunc(server.StatusCallbackPath, server.StatusHandler)
-               http.HandleFunc(server.JobsCallbackPath, server.CreateInfoJobHandler)
-               log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), nil))
+               r := server.NewRouter()
+               log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
                wg.Done()
        }()
 
index edb2bf5..8e30b1c 100644 (file)
@@ -4,7 +4,7 @@ package jobhandler
 
 import (
        mock "github.com/stretchr/testify/mock"
-       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+       jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
 // JobHandler is an autogenerated mock type for the JobHandler type
@@ -25,3 +25,8 @@ func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error {
 
        return r0
 }
+
+// DeleteJob provides a mock function with given fields: jobId
+func (_m *JobHandler) DeleteJob(jobId string) {
+       _m.Called(jobId)
+}