From: elinuxhenrik Date: Fri, 22 Oct 2021 11:40:15 +0000 (+0200) Subject: Implement graceful shutdown of consumer X-Git-Tag: 1.2.0~68^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=c658d388f93467cb1a1229d11a0e01616cd97e3c;hp=--cc;p=nonrtric.git Implement graceful shutdown of consumer Also refactored and added tests for the main package. Some minor corrections in the producer also sneaked themselves in. Issue-ID: NONRTRIC-612 Signed-off-by: elinuxhenrik Change-Id: I2e81f0c0b028dc69b691b4968f6f00191ab2dce3 --- c658d388f93467cb1a1229d11a0e01616cd97e3c diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 6ca39b27..9fe27c3e 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -165,7 +165,6 @@ func TestDeleteJob(t *testing.T) { func TestPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) - wg := sync.WaitGroup{} messages := `[{"message": {"data": "data"}}]` pollClientMock := NewTestClient(func(req *http.Request) *http.Response { if req.URL.String() == "http://mrAddr/topicUrl" { @@ -212,6 +211,7 @@ func TestPollAndDistributeMessages(t *testing.T) { handlerUnderTest.clearAll() }) + wg := sync.WaitGroup{} wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute handlerUnderTest.pollAndDistributeMessages("http://mrAddr") diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index 08885077..5c2027aa 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -39,24 +39,31 @@ import ( func TestNewRouter(t *testing.T) { assertions := require.New(t) + r := NewRouter(nil) statusRoute := r.Get("status") assertions.NotNil(statusRoute) supportedMethods, err := statusRoute.GetMethods() assertions.Equal([]string{http.MethodGet}, supportedMethods) assertions.Nil(err) + path, _ := statusRoute.GetPathTemplate() + assertions.Equal("/status", path) addJobRoute := r.Get("add") assertions.NotNil(addJobRoute) supportedMethods, err = addJobRoute.GetMethods() assertions.Equal([]string{http.MethodPost}, supportedMethods) assertions.Nil(err) + path, _ = addJobRoute.GetPathTemplate() + assertions.Equal("/jobs", path) deleteJobRoute := r.Get("delete") assertions.NotNil(deleteJobRoute) supportedMethods, err = deleteJobRoute.GetMethods() assertions.Equal([]string{http.MethodDelete}, supportedMethods) assertions.Nil(err) + path, _ = deleteJobRoute.GetPathTemplate() + assertions.Equal("/jobs/{infoJobId}", path) notFoundHandler := r.NotFoundHandler handler := http.HandlerFunc(notFoundHandler.ServeHTTP) @@ -75,12 +82,14 @@ func TestNewRouter(t *testing.T) { func TestStatusHandler(t *testing.T) { assertions := require.New(t) + + handler := http.HandlerFunc(statusHandler) responseRecorder := httptest.NewRecorder() r := newRequest(http.MethodGet, "/status", nil, t) - handler := http.HandlerFunc(statusHandler) + handler.ServeHTTP(responseRecorder, r) - assertions.Equal(http.StatusOK, responseRecorder.Code) + assertions.Equal(http.StatusOK, responseRecorder.Code) assertions.Equal("", responseRecorder.Body.String()) } @@ -98,7 +107,7 @@ func TestAddInfoJobHandler(t *testing.T) { wantedBody string }{ { - name: "AddInfoJobHandler with correct path and method, should return OK", + name: "AddInfoJobHandler with correct job, should return OK", args: args{ job: jobs.JobInfo{ Owner: "owner", @@ -128,6 +137,7 @@ func TestAddInfoJobHandler(t *testing.T) { t.Run(tt.name, func(t *testing.T) { jobHandlerMock := jobhandler.JobHandler{} jobHandlerMock.On("AddJob", tt.args.job).Return(tt.args.mockReturn) + callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock) handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler) @@ -146,8 +156,8 @@ func TestAddInfoJobHandler(t *testing.T) { func TestDeleteJob(t *testing.T) { assertions := require.New(t) jobHandlerMock := jobhandler.JobHandler{} - jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil) + callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock) responseRecorder := httptest.NewRecorder() diff --git a/test/usecases/oruclosedlooprecovery/goversion/.gitignore b/test/usecases/oruclosedlooprecovery/goversion/.gitignore index 06758a79..f564804c 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/.gitignore +++ b/test/usecases/oruclosedlooprecovery/goversion/.gitignore @@ -2,3 +2,4 @@ .history oruclosedloop +simulator diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go b/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go index 558d0d78..01f121a9 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go @@ -32,10 +32,9 @@ import ( ) type Configuration struct { - InfoCoordAddress string - SDNRAddress string - SDNRUser string - SDNRPassword string + SDNRAddress string + SDNRUser string + SDNRPassword string } const rawSdnrPath = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=[O-DU-ID]/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection=[O-RU-ID]" diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go b/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go index 7932bfac..036819a0 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go @@ -32,6 +32,10 @@ type RequestError struct { Body []byte } +func (e RequestError) Error() string { + return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", e.StatusCode, string(e.Body)) +} + // HTTPClient interface type HTTPClient interface { Get(url string) (*http.Response, error) @@ -39,10 +43,6 @@ type HTTPClient interface { Do(*http.Request) (*http.Response, error) } -func (pe RequestError) Error() string { - return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body)) -} - func PutWithoutAuth(url string, body []byte, client HTTPClient) error { return do(http.MethodPut, url, body, client) } diff --git a/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go b/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go index 4b2f7fe4..8271fd07 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go +++ b/test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go @@ -139,7 +139,6 @@ func Test_doErrorCases(t *testing.T) { url: "badRequest", mockReturnStatus: http.StatusBadRequest, mockReturnBody: []byte("bad request"), - mockReturnError: nil, }, wantErr: RequestError{ StatusCode: http.StatusBadRequest, diff --git a/test/usecases/oruclosedlooprecovery/goversion/main.go b/test/usecases/oruclosedlooprecovery/goversion/main.go index 86da4d48..ebd4dce1 100644 --- a/test/usecases/oruclosedlooprecovery/goversion/main.go +++ b/test/usecases/oruclosedlooprecovery/goversion/main.go @@ -24,6 +24,9 @@ import ( "encoding/json" "fmt" "net/http" + "os" + "os/signal" + "syscall" "time" "github.com/gorilla/mux" @@ -34,43 +37,78 @@ import ( "oransc.org/usecase/oruclosedloop/internal/restclient" ) +type Server interface { + ListenAndServe() error +} + const timeoutHTTPClient = time.Second * 5 const jobId = "14e7bb84-a44d-44c1-90b7-6995a92ad43c" -var infoCoordAddress string +var jobRegistrationInfo = struct { + InfoTypeId string `json:"info_type_id"` + JobResultUri string `json:"job_result_uri"` + JobOwner string `json:"job_owner"` + JobDefinition interface{} `json:"job_definition"` +}{ + InfoTypeId: "STD_Fault_Messages", + JobResultUri: "", + JobOwner: "O-RU Closed Loop Usecase", + JobDefinition: "{}", +} + +var client restclient.HTTPClient +var configuration *config.Config var linkfailureConfig linkfailure.Configuration var lookupService repository.LookupService -var host string -var port string -var client restclient.HTTPClient +var consumerPort string func init() { - configuration := config.New() + doInit() +} + +func doInit() { + configuration = config.New() + + log.SetLevel(configuration.LogLevel) client = &http.Client{ Timeout: timeoutHTTPClient, } - log.SetLevel(configuration.LogLevel) + consumerPort = fmt.Sprint(configuration.ConsumerPort) + jobRegistrationInfo.JobResultUri = configuration.ConsumerHost + ":" + consumerPort + + linkfailureConfig = linkfailure.Configuration{ + SDNRAddress: configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort), + SDNRUser: configuration.SDNRUser, + SDNRPassword: configuration.SDNPassword, + } +} +func main() { if err := validateConfiguration(configuration); err != nil { - log.Fatalf("Unable to start consumer due to: %v", err) + log.Fatalf("Unable to start consumer due to configuration error: %v", err) } - host = configuration.ConsumerHost - port = fmt.Sprint(configuration.ConsumerPort) csvFileHelper := repository.NewCsvFileHelperImpl() - if initErr := initializeLookupService(csvFileHelper, configuration); initErr != nil { + if initErr := initializeLookupService(csvFileHelper, configuration.ORUToODUMapFile); initErr != nil { log.Fatalf("Unable to create LookupService due to inability to get O-RU-ID to O-DU-ID map. Cause: %v", initErr) } - infoCoordAddress = configuration.InfoCoordinatorAddress + go func() { + startServer(&http.Server{ + Addr: ":" + consumerPort, + Handler: getRouter(), + }) + os.Exit(1) // If the startServer function exits, it is because there has been a failure in the server, so we exit. + }() - linkfailureConfig = linkfailure.Configuration{ - SDNRAddress: configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort), - SDNRUser: configuration.SDNRUser, - SDNRPassword: configuration.SDNPassword, - } + go func() { + deleteOnShutdown(make(chan os.Signal, 1)) + os.Exit(0) + }() + + keepConsumerAlive() } func validateConfiguration(configuration *config.Config) error { @@ -80,40 +118,41 @@ func validateConfiguration(configuration *config.Config) error { return nil } -func initializeLookupService(csvFileHelper repository.CsvFileHelper, configuration *config.Config) error { - lookupService = repository.NewLookupServiceImpl(csvFileHelper, configuration.ORUToODUMapFile) - if initErr := lookupService.Init(); initErr != nil { - return initErr - } - return nil +func initializeLookupService(csvFileHelper repository.CsvFileHelper, csvFile string) error { + lookupService = repository.NewLookupServiceImpl(csvFileHelper, csvFile) + return lookupService.Init() } -func main() { - defer deleteJob() +func getRouter() *mux.Router { messageHandler := linkfailure.NewLinkFailureHandler(lookupService, linkfailureConfig, client) + r := mux.NewRouter() - r.HandleFunc("/", messageHandler.MessagesHandler).Methods(http.MethodPost) - r.HandleFunc("/admin/start", startHandler).Methods(http.MethodPost) - r.HandleFunc("/admin/stop", stopHandler).Methods(http.MethodPost) - log.Error(http.ListenAndServe(":"+port, r)) + r.HandleFunc("/", messageHandler.MessagesHandler).Methods(http.MethodPost).Name("messageHandler") + r.HandleFunc("/admin/start", startHandler).Methods(http.MethodPost).Name("start") + r.HandleFunc("/admin/stop", stopHandler).Methods(http.MethodPost).Name("stop") + + return r } -func startHandler(w http.ResponseWriter, r *http.Request) { - jobRegistrationInfo := struct { - InfoTypeId string `json:"info_type_id"` - JobResultUri string `json:"job_result_uri"` - JobOwner string `json:"job_owner"` - JobDefinition interface{} `json:"job_definition"` - }{ - InfoTypeId: "STD_Fault_Messages", - JobResultUri: host + ":" + port, - JobOwner: "O-RU Closed Loop Usecase", - JobDefinition: "{}", +func startServer(server Server) { + if err := server.ListenAndServe(); err != nil { + log.Errorf("Server stopped unintentionally due to: %v. Deleteing job.", err) + if deleteErr := deleteJob(); deleteErr != nil { + log.Error(fmt.Sprintf("Unable to delete consumer job due to: %v. Please remove job %v manually.", deleteErr, jobId)) + } } +} + +func keepConsumerAlive() { + forever := make(chan int) + <-forever +} + +func startHandler(w http.ResponseWriter, r *http.Request) { body, _ := json.Marshal(jobRegistrationInfo) - putErr := restclient.PutWithoutAuth(infoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, body, client) + putErr := restclient.PutWithoutAuth(configuration.InfoCoordinatorAddress+"/data-consumer/v1/info-jobs/"+jobId, body, client) if putErr != nil { - http.Error(w, fmt.Sprintf("Unable to register consumer job: %v", putErr), http.StatusBadRequest) + http.Error(w, fmt.Sprintf("Unable to register consumer job due to: %v.", putErr), http.StatusBadRequest) return } log.Debug("Registered job.") @@ -122,12 +161,22 @@ func startHandler(w http.ResponseWriter, r *http.Request) { func stopHandler(w http.ResponseWriter, r *http.Request) { deleteErr := deleteJob() if deleteErr != nil { - http.Error(w, fmt.Sprintf("Unable to delete consumer job: %v", deleteErr), http.StatusBadRequest) + http.Error(w, fmt.Sprintf("Unable to delete consumer job due to: %v. Please remove job %v manually.", deleteErr, jobId), http.StatusBadRequest) return } log.Debug("Deleted job.") } +func deleteOnShutdown(s chan os.Signal) { + signal.Notify(s, os.Interrupt) + signal.Notify(s, syscall.SIGTERM) + <-s + log.Info("Shutting down gracefully.") + if err := deleteJob(); err != nil { + log.Error(fmt.Sprintf("Unable to delete job on shutdown due to: %v. Please remove job %v manually.", err, jobId)) + } +} + func deleteJob() error { - return restclient.Delete(infoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, client) + return restclient.Delete(configuration.InfoCoordinatorAddress+"/data-consumer/v1/info-jobs/"+jobId, client) } diff --git a/test/usecases/oruclosedlooprecovery/goversion/main_test.go b/test/usecases/oruclosedlooprecovery/goversion/main_test.go new file mode 100644 index 00000000..99419bf7 --- /dev/null +++ b/test/usecases/oruclosedlooprecovery/goversion/main_test.go @@ -0,0 +1,440 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package main + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "sync" + "syscall" + "testing" + "time" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "oransc.org/usecase/oruclosedloop/internal/config" + "oransc.org/usecase/oruclosedloop/internal/linkfailure" + "oransc.org/usecase/oruclosedloop/mocks" +) + +func Test_init(t *testing.T) { + assertions := require.New(t) + + os.Setenv("CONSUMER_HOST", "consumerHost") + os.Setenv("CONSUMER_PORT", "8095") + t.Cleanup(func() { + os.Clearenv() + }) + + doInit() + + wantedConfiguration := &config.Config{ + LogLevel: log.InfoLevel, + ConsumerHost: "consumerHost", + ConsumerPort: 8095, + InfoCoordinatorAddress: "http://enrichmentservice:8083", + SDNRHost: "http://localhost", + SDNRPort: 3904, + SDNRUser: "admin", + SDNPassword: "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U", + ORUToODUMapFile: "o-ru-to-o-du-map.csv", + } + assertions.Equal(wantedConfiguration, configuration) + + assertions.Equal(fmt.Sprint(wantedConfiguration.ConsumerPort), consumerPort) + assertions.Equal(wantedConfiguration.ConsumerHost+":"+fmt.Sprint(wantedConfiguration.ConsumerPort), jobRegistrationInfo.JobResultUri) + + wantedLinkFailureConfig := linkfailure.Configuration{ + SDNRAddress: wantedConfiguration.SDNRHost + ":" + fmt.Sprint(wantedConfiguration.SDNRPort), + SDNRUser: wantedConfiguration.SDNRUser, + SDNRPassword: wantedConfiguration.SDNPassword, + } + assertions.Equal(wantedLinkFailureConfig, linkfailureConfig) +} + +func Test_validateConfiguration(t *testing.T) { + assertions := require.New(t) + + type args struct { + configuration *config.Config + } + tests := []struct { + name string + args args + wantErr error + }{ + { + name: "Valid config, should return nil", + args: args{ + configuration: &config.Config{ + ConsumerHost: "host", + ConsumerPort: 80, + }, + }, + }, + { + name: "Invalid config, should return error", + args: args{ + configuration: &config.Config{}, + }, + wantErr: fmt.Errorf("consumer host and port must be provided"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateConfiguration(tt.args.configuration) + assertions.Equal(tt.wantErr, err) + }) + } +} + +func Test_initializeLookupService(t *testing.T) { + assertions := require.New(t) + type args struct { + csvFile string + oRuId string + mockReturn [][]string + mockReturnError error + } + tests := []struct { + name string + args args + wantODuId string + wantInitErr error + }{ + { + name: "Successful initialization, should return nil and lookup service should be initiated with data", + args: args{ + csvFile: "file", + oRuId: "1", + mockReturn: [][]string{{"1", "2"}}, + }, + wantODuId: "2", + }, + { + name: "Unsuccessful initialization, should return error and lookup service should not be initiated with data", + args: args{ + csvFile: "file", + mockReturnError: errors.New("Error"), + }, + wantInitErr: errors.New("Error"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockCsvFileHelper := &mocks.CsvFileHelper{} + mockCsvFileHelper.On("GetCsvFromFile", mock.Anything).Return(tt.args.mockReturn, tt.args.mockReturnError) + + err := initializeLookupService(mockCsvFileHelper, tt.args.csvFile) + oDuId, _ := lookupService.GetODuID(tt.args.oRuId) + assertions.Equal(tt.wantODuId, oDuId) + assertions.Equal(tt.wantInitErr, err) + mockCsvFileHelper.AssertCalled(t, "GetCsvFromFile", tt.args.csvFile) + }) + } +} + +func Test_getRouter_shouldContainAllPathsWithHandlers(t *testing.T) { + assertions := require.New(t) + + r := getRouter() + messageHandlerRoute := r.Get("messageHandler") + assertions.NotNil(messageHandlerRoute) + supportedMethods, err := messageHandlerRoute.GetMethods() + assertions.Equal([]string{http.MethodPost}, supportedMethods) + assertions.Nil(err) + path, _ := messageHandlerRoute.GetPathTemplate() + assertions.Equal("/", path) + + startHandlerRoute := r.Get("start") + assertions.NotNil(messageHandlerRoute) + supportedMethods, err = startHandlerRoute.GetMethods() + assertions.Equal([]string{http.MethodPost}, supportedMethods) + assertions.Nil(err) + path, _ = startHandlerRoute.GetPathTemplate() + assertions.Equal("/admin/start", path) + + stopHandlerRoute := r.Get("stop") + assertions.NotNil(stopHandlerRoute) + supportedMethods, err = stopHandlerRoute.GetMethods() + assertions.Equal([]string{http.MethodPost}, supportedMethods) + assertions.Nil(err) + path, _ = stopHandlerRoute.GetPathTemplate() + assertions.Equal("/admin/stop", path) +} + +func Test_startServer_shouldDeleteJobWhenServerStopsWithErrorAndLog(t *testing.T) { + assertions := require.New(t) + + var buf bytes.Buffer + log.SetOutput(&buf) + + os.Setenv("CONSUMER_PORT", "wrong") + t.Cleanup(func() { + log.SetOutput(os.Stderr) + }) + + mockServer := &mocks.Server{} + mockServer.On("ListenAndServe").Return(errors.New("Server failure")) + + startServer(mockServer) + + log := buf.String() + assertions.Contains(log, "level=error") + assertions.Contains(log, "Server stopped unintentionally due to: Server failure. Deleteing job.") + assertions.Contains(log, "Please remove job 14e7bb84-a44d-44c1-90b7-6995a92ad43c manually") +} + +func Test_startHandler(t *testing.T) { + assertions := require.New(t) + + jobRegistrationInfo.JobResultUri = "host:80" + + type args struct { + mockReturnBody []byte + mockReturnStatus int + } + tests := []struct { + name string + args args + wantedStatus int + wantedBody string + }{ + { + name: "Start with successful registration, should return ok", + args: args{ + mockReturnBody: []byte(""), + mockReturnStatus: http.StatusOK, + }, + wantedStatus: http.StatusOK, + }, + { + name: "Start with error response at registration, should return error", + args: args{ + mockReturnBody: []byte("error"), + mockReturnStatus: http.StatusBadRequest, + }, + wantedStatus: http.StatusBadRequest, + wantedBody: "Unable to register consumer job due to:", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientMock := setUpClientMock(tt.args.mockReturnBody, tt.args.mockReturnStatus) + + handler := http.HandlerFunc(startHandler) + responseRecorder := httptest.NewRecorder() + r, _ := http.NewRequest(http.MethodPost, "/start", nil) + + handler.ServeHTTP(responseRecorder, r) + + assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name) + assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name) + + var wantedJobRegistrationInfo = struct { + InfoTypeId string `json:"info_type_id"` + JobResultUri string `json:"job_result_uri"` + JobOwner string `json:"job_owner"` + JobDefinition interface{} `json:"job_definition"` + }{ + InfoTypeId: "STD_Fault_Messages", + JobResultUri: "host:80", + JobOwner: "O-RU Closed Loop Usecase", + JobDefinition: "{}", + } + wantedBody, _ := json.Marshal(wantedJobRegistrationInfo) + + var actualRequest *http.Request + clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool { + actualRequest = req + return true + })) + assertions.Equal(http.MethodPut, actualRequest.Method) + assertions.Equal("http", actualRequest.URL.Scheme) + assertions.Equal("enrichmentservice:8083", actualRequest.URL.Host) + assertions.Equal("/data-consumer/v1/info-jobs/14e7bb84-a44d-44c1-90b7-6995a92ad43c", actualRequest.URL.Path) + assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type")) + body, _ := ioutil.ReadAll(actualRequest.Body) + expectedBody := wantedBody + assertions.Equal(expectedBody, body) + clientMock.AssertNumberOfCalls(t, "Do", 1) + }) + } +} + +func Test_stopHandler(t *testing.T) { + assertions := require.New(t) + + jobRegistrationInfo.JobResultUri = "host:80" + + type args struct { + mockReturnBody []byte + mockReturnStatus int + } + tests := []struct { + name string + args args + wantedStatus int + wantedBody string + }{ + { + name: "Stop with successful job deletion, should return ok", + args: args{ + mockReturnBody: []byte(""), + mockReturnStatus: http.StatusOK, + }, + wantedStatus: http.StatusOK, + }, + { + name: "Stop with error response at job deletion, should return error", + args: args{ + mockReturnBody: []byte("error"), + mockReturnStatus: http.StatusBadRequest, + }, + wantedStatus: http.StatusBadRequest, + wantedBody: "Please remove job 14e7bb84-a44d-44c1-90b7-6995a92ad43c manually", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientMock := setUpClientMock(tt.args.mockReturnBody, tt.args.mockReturnStatus) + + handler := http.HandlerFunc(stopHandler) + responseRecorder := httptest.NewRecorder() + r, _ := http.NewRequest(http.MethodPost, "/stop", nil) + + handler.ServeHTTP(responseRecorder, r) + + assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name) + assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name) + + var actualRequest *http.Request + clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool { + actualRequest = req + return true + })) + assertions.Equal(http.MethodDelete, actualRequest.Method) + assertions.Equal("http", actualRequest.URL.Scheme) + assertions.Equal("enrichmentservice:8083", actualRequest.URL.Host) + assertions.Equal("/data-consumer/v1/info-jobs/14e7bb84-a44d-44c1-90b7-6995a92ad43c", actualRequest.URL.Path) + clientMock.AssertNumberOfCalls(t, "Do", 1) + }) + } +} + +func Test_deleteOnShutdown(t *testing.T) { + assertions := require.New(t) + + var buf bytes.Buffer + log.SetOutput(&buf) + + t.Cleanup(func() { + log.SetOutput(os.Stderr) + }) + + type args struct { + mockReturnBody []byte + mockReturnStatus int + } + tests := []struct { + name string + args args + wantedLog string + }{ + { + name: "Delete with successful job deletion, should return ok", + args: args{ + mockReturnBody: []byte(""), + mockReturnStatus: http.StatusOK, + }, + }, + { + name: "Stop with error response at job deletion, should return error", + args: args{ + mockReturnBody: []byte("error"), + mockReturnStatus: http.StatusBadRequest, + }, + wantedLog: "Please remove job 14e7bb84-a44d-44c1-90b7-6995a92ad43c manually", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + setUpClientMock(tt.args.mockReturnBody, tt.args.mockReturnStatus) + + c := make(chan os.Signal, 1) + go deleteOnShutdown(c) + c <- syscall.SIGTERM + + waitForLogToBeWritten(&buf) + + log := buf.String() + if tt.wantedLog != "" { + assertions.Contains(log, "level=error") + assertions.Contains(log, "Unable to delete job on shutdown due to:") + assertions.Contains(log, tt.wantedLog) + } + }) + } +} + +func waitForLogToBeWritten(logBuf *bytes.Buffer) { + wg := sync.WaitGroup{} + wg.Add(1) + for { + if waitTimeout(&wg, 10*time.Millisecond) && logBuf.Len() != 0 { + wg.Done() + break + } + } +} + +// waitTimeout waits for the waitgroup for the specified max timeout. +// Returns true if waiting timed out. +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +} + +func setUpClientMock(body []byte, status int) *mocks.HTTPClient { + clientMock := mocks.HTTPClient{} + clientMock.On("Do", mock.Anything).Return(&http.Response{ + Body: ioutil.NopCloser(bytes.NewReader(body)), + StatusCode: status, + }, nil) + client = &clientMock + return &clientMock +} diff --git a/test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go b/test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go new file mode 100644 index 00000000..ad16503f --- /dev/null +++ b/test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go @@ -0,0 +1,24 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Server is an autogenerated mock type for the Server type +type Server struct { + mock.Mock +} + +// ListenAndServe provides a mock function with given fields: +func (_m *Server) ListenAndServe() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/test/usecases/oruclosedlooprecovery/goversion/simulator/producer.go b/test/usecases/oruclosedlooprecovery/goversion/simulator/producer.go new file mode 100644 index 00000000..78462ba2 --- /dev/null +++ b/test/usecases/oruclosedlooprecovery/goversion/simulator/producer.go @@ -0,0 +1,66 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package main + +import ( + "bytes" + "encoding/json" + "net/http" + "time" + + "oransc.org/usecase/oruclosedloop/internal/ves" +) + +func main() { + message := ves.FaultMessage{ + Event: ves.Event{ + CommonEventHeader: ves.CommonEventHeader{ + Domain: "fault", + SourceName: "ERICSSON-O-RU-11220", + }, + FaultFields: ves.FaultFields{ + AlarmCondition: "28", + }, + }, + } + client := &http.Client{ + Timeout: 5 * time.Second, + } + + critical := true + for range time.Tick(2 * time.Second) { + if critical { + message.Event.FaultFields.EventSeverity = "CRITICAL" + critical = false + } else { + critical = true + message.Event.FaultFields.EventSeverity = "NORMAL" + } + m, _ := json.Marshal(message) + msgToSend, _ := json.Marshal([]string{string(m)}) + + req, _ := http.NewRequest(http.MethodPost, "http://localhost:40935", bytes.NewBuffer(msgToSend)) + req.Header.Set("Content-Type", "application/json; charset=utf-8") + + client.Do(req) + } + +}