From fe61c6191ba72b1d8297264c3d61566ea23a70b6 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Fri, 24 Sep 2021 15:08:47 +0200 Subject: [PATCH] Callback to delete job Issue-ID: NONRTRIC-587 Signed-off-by: elinuxhenrik Change-Id: I2bdca3fc648ea3a909a0a60c43504d5f80ee05bc --- dmaap-mediator-producer/go.mod | 1 + dmaap-mediator-producer/go.sum | 2 + dmaap-mediator-producer/internal/jobs/jobs.go | 14 ++ dmaap-mediator-producer/internal/jobs/jobs_test.go | 24 ++++ dmaap-mediator-producer/internal/server/server.go | 62 +++++---- .../internal/server/server_test.go | 154 +++++++++++---------- dmaap-mediator-producer/main.go | 9 +- .../mocks/jobhandler/JobHandler.go | 7 +- 8 files changed, 171 insertions(+), 102 deletions(-) diff --git a/dmaap-mediator-producer/go.mod b/dmaap-mediator-producer/go.mod index 7e0d96ee..a47c5425 100644 --- a/dmaap-mediator-producer/go.mod +++ b/dmaap-mediator-producer/go.mod @@ -9,6 +9,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gorilla/mux v1.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.1.0 // indirect golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect diff --git a/dmaap-mediator-producer/go.sum b/dmaap-mediator-producer/go.sum index 2cf67a6d..e43ad877 100644 --- a/dmaap-mediator-producer/go.sum +++ b/dmaap-mediator-producer/go.sum @@ -1,6 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 09d38916..e5a1070b 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -55,6 +55,7 @@ type JobInfo struct { type JobHandler interface { AddJob(JobInfo) error + DeleteJob(jobId string) } var ( @@ -87,6 +88,15 @@ func (jh *jobHandlerImpl) AddJob(ji JobInfo) error { } } +func (jh *jobHandlerImpl) DeleteJob(jobId string) { + mu.Lock() + defer mu.Unlock() + for _, typeData := range allTypes { + delete(typeData.Jobs, jobId) + } + log.Debug("Deleted job: ", jobId) +} + func validateJobInfo(ji JobInfo) error { if _, ok := allTypes[ji.InfoTypeIdentity]; !ok { return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity) @@ -142,6 +152,10 @@ func AddJob(job JobInfo) error { return Handler.AddJob(job) } +func DeleteJob(jobId string) { + Handler.DeleteJob(jobId) +} + func RunJobs(mRAddress string) { for { pollAndDistributeMessages(mRAddress) diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 3bb25787..6f292272 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -132,6 +132,30 @@ func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions.Equal("missing required target URI: { job1 type1}", err.Error()) clearAll() } + +func TestDeleteJob(t *testing.T) { + assertions := require.New(t) + jobToKeep := JobInfo{ + InfoJobIdentity: "job1", + InfoTypeIdentity: "type1", + } + jobToDelete := JobInfo{ + InfoJobIdentity: "job2", + InfoTypeIdentity: "type1", + } + allTypes["type1"] = TypeData{ + TypeId: "type1", + Jobs: map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete}, + } + t.Cleanup(func() { + clearAll() + }) + + DeleteJob("job2") + assertions.Equal(1, len(allTypes["type1"].Jobs)) + assertions.Equal(jobToKeep, allTypes["type1"].Jobs["job1"]) +} + func TestPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) jobInfo := JobInfo{ diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go index 0b5e5b8c..d07b7309 100644 --- a/dmaap-mediator-producer/internal/server/server.go +++ b/dmaap-mediator-producer/internal/server/server.go @@ -26,37 +26,28 @@ import ( "io/ioutil" "net/http" + "github.com/gorilla/mux" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" ) -const StatusCallbackPath = "/status" -const JobsCallbackPath = "/jobs" +const StatusPath = "/status" +const AddJobPath = "/jobs" +const jobIdToken = "infoJobId" +const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}" -func StatusHandler(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != StatusCallbackPath { - http.Error(w, "404 not found.", http.StatusNotFound) - return - } - - if r.Method != "GET" { - http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed) - return - } - - fmt.Fprintf(w, "All is well!") +func NewRouter() *mux.Router { + r := mux.NewRouter() + r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status") + r.HandleFunc(AddJobPath, addInfoJobHandler).Methods(http.MethodPost).Name("add") + r.HandleFunc(deleteJobPath, deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete") + r.NotFoundHandler = ¬FoundHandler{} + r.MethodNotAllowedHandler = &methodNotAllowedHandler{} + return r } -func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != JobsCallbackPath { - http.Error(w, "404 not found.", http.StatusNotFound) - return - } - - if r.Method != "POST" { - http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed) - return - } +func statusHandler(w http.ResponseWriter, r *http.Request) {} +func addInfoJobHandler(w http.ResponseWriter, r *http.Request) { b, readErr := ioutil.ReadAll(r.Body) if readErr != nil { http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest) @@ -71,3 +62,26 @@ func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest) } } + +func deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + id, ok := vars[jobIdToken] + if !ok { + http.Error(w, "Must provide infoJobId.", http.StatusBadRequest) + return + } + + jobs.DeleteJob(id) +} + +type notFoundHandler struct{} + +func (h *notFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + http.Error(w, "404 not found.", http.StatusNotFound) +} + +type methodNotAllowedHandler struct{} + +func (h *methodNotAllowedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed) +} diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index a4b19c43..2fe936fa 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -30,63 +30,63 @@ import ( "net/http/httptest" "testing" + "github.com/gorilla/mux" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler" ) +func TestNewRouter(t *testing.T) { + assertions := require.New(t) + r := NewRouter() + statusRoute := r.Get("status") + assertions.NotNil(statusRoute) + supportedMethods, err := statusRoute.GetMethods() + assertions.Equal([]string{http.MethodGet}, supportedMethods) + assertions.Nil(err) + + addJobRoute := r.Get("add") + assertions.NotNil(addJobRoute) + supportedMethods, err = addJobRoute.GetMethods() + assertions.Equal([]string{http.MethodPost}, supportedMethods) + assertions.Nil(err) + + deleteJobRoute := r.Get("delete") + assertions.NotNil(deleteJobRoute) + supportedMethods, err = deleteJobRoute.GetMethods() + assertions.Equal([]string{http.MethodDelete}, supportedMethods) + assertions.Nil(err) + + notFoundHandler := r.NotFoundHandler + handler := http.HandlerFunc(notFoundHandler.ServeHTTP) + responseRecorder := httptest.NewRecorder() + handler.ServeHTTP(responseRecorder, newRequest("GET", "/wrong", nil, t)) + assertions.Equal(http.StatusNotFound, responseRecorder.Code) + + assertions.Contains(responseRecorder.Body.String(), "404 not found.") + + methodNotAllowedHandler := r.MethodNotAllowedHandler + handler = http.HandlerFunc(methodNotAllowedHandler.ServeHTTP) + responseRecorder = httptest.NewRecorder() + handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t)) + assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code) + + assertions.Contains(responseRecorder.Body.String(), "Method is not supported.") +} + func TestStatusHandler(t *testing.T) { assertions := require.New(t) - type args struct { - responseRecorder *httptest.ResponseRecorder - r *http.Request - } - tests := []struct { - name string - args args - wantedStatus int - wantedBody string - }{ - { - name: "StatusHandler with correct path and method, should return OK", - args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest("GET", "/status", nil, t), - }, - wantedStatus: http.StatusOK, - wantedBody: "All is well!", - }, - { - name: "StatusHandler with incorrect path, should return NotFound", - args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest("GET", "/wrong", nil, t), - }, - wantedStatus: http.StatusNotFound, - wantedBody: "404 not found.\n", - }, - { - name: "StatusHandler with incorrect method, should return MethodNotAllowed", - args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest("PUT", "/status", nil, t), - }, - wantedStatus: http.StatusMethodNotAllowed, - wantedBody: "Method is not supported.\n", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - handler := http.HandlerFunc(StatusHandler) - handler.ServeHTTP(tt.args.responseRecorder, tt.args.r) - assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code) + responseRecorder := httptest.NewRecorder() + r := newRequest(http.MethodGet, "/status", nil, t) + handler := http.HandlerFunc(statusHandler) + handler.ServeHTTP(responseRecorder, r) + assertions.Equal(http.StatusOK, responseRecorder.Code) - assertions.Equal(tt.wantedBody, tt.args.responseRecorder.Body.String()) - }) - } + assertions.Equal("", responseRecorder.Body.String()) } -func TestCreateInfoJobHandler(t *testing.T) { +func TestAddInfoJobHandler(t *testing.T) { assertions := require.New(t) jobHandlerMock := jobhandler.JobHandler{} @@ -114,55 +114,65 @@ func TestCreateInfoJobHandler(t *testing.T) { args args wantedStatus int wantedBody string + assertFunc assertMockFunk }{ { - name: "CreateInfoJobHandler with correct path and method, should return OK", + name: "AddInfoJobHandler with correct path and method, should return OK", args: args{ responseRecorder: httptest.NewRecorder(), - r: newRequest("POST", "/jobs", &goodJobInfo, t), + r: newRequest(http.MethodPost, "/jobs", &goodJobInfo, t), }, wantedStatus: http.StatusOK, wantedBody: "", + assertFunc: func(mock *jobhandler.JobHandler) { + mock.AssertCalled(t, "AddJob", goodJobInfo) + }, }, { - name: "CreateInfoJobHandler with incorrect job info, should return BadRequest", + name: "AddInfoJobHandler with incorrect job info, should return BadRequest", args: args{ responseRecorder: httptest.NewRecorder(), - r: newRequest("POST", "/jobs", &badJobInfo, t), + r: newRequest(http.MethodPost, "/jobs", &badJobInfo, t), }, wantedStatus: http.StatusBadRequest, wantedBody: "Invalid job info. Cause: error", }, - { - name: "CreateInfoJobHandler with incorrect path, should return NotFound", - args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest("GET", "/wrong", nil, t), - }, - wantedStatus: http.StatusNotFound, - wantedBody: "404 not found.", - }, - { - name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed", - args: args{ - responseRecorder: httptest.NewRecorder(), - r: newRequest("PUT", "/jobs", nil, t), - }, - wantedStatus: http.StatusMethodNotAllowed, - wantedBody: "Method is not supported.", - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - handler := http.HandlerFunc(CreateInfoJobHandler) + handler := http.HandlerFunc(addInfoJobHandler) handler.ServeHTTP(tt.args.responseRecorder, tt.args.r) - assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code) + assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code, tt.name) - assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody) + assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody, tt.name) + + if tt.assertFunc != nil { + tt.assertFunc(&jobHandlerMock) + } }) } } +func TestDeleteJob(t *testing.T) { + assertions := require.New(t) + jobHandlerMock := jobhandler.JobHandler{} + + jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil) + jobs.Handler = &jobHandlerMock + + responseRecorder := httptest.NewRecorder() + r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"}) + handler := http.HandlerFunc(deleteInfoJobHandler) + handler.ServeHTTP(responseRecorder, r) + assertions.Equal(http.StatusOK, http.StatusOK) + + assertions.Equal("", responseRecorder.Body.String()) + + jobHandlerMock.AssertCalled(t, "DeleteJob", "job1") +} + +type assertMockFunk func(mock *jobhandler.JobHandler) + func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request { var body io.Reader if jobInfo != nil { diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 79fcb6b5..15207ec7 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -57,9 +57,9 @@ func init() { log.Fatalf("Unable to get types to register due to: %v", err) } producer := config.ProducerRegistrationInfo{ - InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusCallbackPath, + InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath, SupportedInfoTypes: jobs.GetSupportedTypes(), - InfoJobCallbackUrl: callbackAddress + server.JobsCallbackPath, + InfoJobCallbackUrl: callbackAddress + server.AddJobPath, } if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil { log.Fatalf("Unable to register producer due to: %v", err) @@ -75,9 +75,8 @@ func main() { log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort) go func() { - http.HandleFunc(server.StatusCallbackPath, server.StatusHandler) - http.HandleFunc(server.JobsCallbackPath, server.CreateInfoJobHandler) - log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), nil)) + r := server.NewRouter() + log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r)) wg.Done() }() diff --git a/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go b/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go index edb2bf51..8e30b1c2 100644 --- a/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go +++ b/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go @@ -4,7 +4,7 @@ package jobhandler import ( mock "github.com/stretchr/testify/mock" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" + jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" ) // JobHandler is an autogenerated mock type for the JobHandler type @@ -25,3 +25,8 @@ func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error { return r0 } + +// DeleteJob provides a mock function with given fields: jobId +func (_m *JobHandler) DeleteJob(jobId string) { + _m.Called(jobId) +} -- 2.16.6