Implement graceful shutdown of consumer 18/6918/2
authorelinuxhenrik <henrik.b.andersson@est.tech>
Fri, 22 Oct 2021 11:40:15 +0000 (13:40 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Fri, 22 Oct 2021 12:08:11 +0000 (14:08 +0200)
Also refactored and added tests for the main package.

Some minor corrections in the producer also sneaked themselves in.

Issue-ID: NONRTRIC-612
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I2e81f0c0b028dc69b691b4968f6f00191ab2dce3

dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/internal/server/server_test.go
test/usecases/oruclosedlooprecovery/goversion/.gitignore
test/usecases/oruclosedlooprecovery/goversion/internal/linkfailure/linkfailurehandler.go
test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client.go
test/usecases/oruclosedlooprecovery/goversion/internal/restclient/client_test.go
test/usecases/oruclosedlooprecovery/goversion/main.go
test/usecases/oruclosedlooprecovery/goversion/main_test.go [new file with mode: 0644]
test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go [new file with mode: 0644]
test/usecases/oruclosedlooprecovery/goversion/simulator/producer.go [new file with mode: 0644]

index 6ca39b2..9fe27c3 100644 (file)
@@ -165,7 +165,6 @@ func TestDeleteJob(t *testing.T) {
 func TestPollAndDistributeMessages(t *testing.T) {
        assertions := require.New(t)
 
-       wg := sync.WaitGroup{}
        messages := `[{"message": {"data": "data"}}]`
        pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://mrAddr/topicUrl" {
@@ -212,6 +211,7 @@ func TestPollAndDistributeMessages(t *testing.T) {
                handlerUnderTest.clearAll()
        })
 
+       wg := sync.WaitGroup{}
        wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
        handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
 
index 0888507..5c2027a 100644 (file)
@@ -39,24 +39,31 @@ import (
 
 func TestNewRouter(t *testing.T) {
        assertions := require.New(t)
+
        r := NewRouter(nil)
        statusRoute := r.Get("status")
        assertions.NotNil(statusRoute)
        supportedMethods, err := statusRoute.GetMethods()
        assertions.Equal([]string{http.MethodGet}, supportedMethods)
        assertions.Nil(err)
+       path, _ := statusRoute.GetPathTemplate()
+       assertions.Equal("/status", path)
 
        addJobRoute := r.Get("add")
        assertions.NotNil(addJobRoute)
        supportedMethods, err = addJobRoute.GetMethods()
        assertions.Equal([]string{http.MethodPost}, supportedMethods)
        assertions.Nil(err)
+       path, _ = addJobRoute.GetPathTemplate()
+       assertions.Equal("/jobs", path)
 
        deleteJobRoute := r.Get("delete")
        assertions.NotNil(deleteJobRoute)
        supportedMethods, err = deleteJobRoute.GetMethods()
        assertions.Equal([]string{http.MethodDelete}, supportedMethods)
        assertions.Nil(err)
+       path, _ = deleteJobRoute.GetPathTemplate()
+       assertions.Equal("/jobs/{infoJobId}", path)
 
        notFoundHandler := r.NotFoundHandler
        handler := http.HandlerFunc(notFoundHandler.ServeHTTP)
@@ -75,12 +82,14 @@ func TestNewRouter(t *testing.T) {
 
 func TestStatusHandler(t *testing.T) {
        assertions := require.New(t)
+
+       handler := http.HandlerFunc(statusHandler)
        responseRecorder := httptest.NewRecorder()
        r := newRequest(http.MethodGet, "/status", nil, t)
-       handler := http.HandlerFunc(statusHandler)
+
        handler.ServeHTTP(responseRecorder, r)
-       assertions.Equal(http.StatusOK, responseRecorder.Code)
 
+       assertions.Equal(http.StatusOK, responseRecorder.Code)
        assertions.Equal("", responseRecorder.Body.String())
 }
 
@@ -98,7 +107,7 @@ func TestAddInfoJobHandler(t *testing.T) {
                wantedBody   string
        }{
                {
-                       name: "AddInfoJobHandler with correct path and method, should return OK",
+                       name: "AddInfoJobHandler with correct job, should return OK",
                        args: args{
                                job: jobs.JobInfo{
                                        Owner:            "owner",
@@ -128,6 +137,7 @@ func TestAddInfoJobHandler(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        jobHandlerMock := jobhandler.JobHandler{}
                        jobHandlerMock.On("AddJob", tt.args.job).Return(tt.args.mockReturn)
+
                        callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
 
                        handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler)
@@ -146,8 +156,8 @@ func TestAddInfoJobHandler(t *testing.T) {
 func TestDeleteJob(t *testing.T) {
        assertions := require.New(t)
        jobHandlerMock := jobhandler.JobHandler{}
-
        jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil)
+
        callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
 
        responseRecorder := httptest.NewRecorder()
index 558d0d7..01f121a 100644 (file)
@@ -32,10 +32,9 @@ import (
 )
 
 type Configuration struct {
-       InfoCoordAddress string
-       SDNRAddress      string
-       SDNRUser         string
-       SDNRPassword     string
+       SDNRAddress  string
+       SDNRUser     string
+       SDNRPassword string
 }
 
 const rawSdnrPath = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=[O-DU-ID]/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection=[O-RU-ID]"
index 7932bfa..036819a 100644 (file)
@@ -32,6 +32,10 @@ type RequestError struct {
        Body       []byte
 }
 
+func (e RequestError) Error() string {
+       return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", e.StatusCode, string(e.Body))
+}
+
 // HTTPClient interface
 type HTTPClient interface {
        Get(url string) (*http.Response, error)
@@ -39,10 +43,6 @@ type HTTPClient interface {
        Do(*http.Request) (*http.Response, error)
 }
 
-func (pe RequestError) Error() string {
-       return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
-}
-
 func PutWithoutAuth(url string, body []byte, client HTTPClient) error {
        return do(http.MethodPut, url, body, client)
 }
index 4b2f7fe..8271fd0 100644 (file)
@@ -139,7 +139,6 @@ func Test_doErrorCases(t *testing.T) {
                                url:              "badRequest",
                                mockReturnStatus: http.StatusBadRequest,
                                mockReturnBody:   []byte("bad request"),
-                               mockReturnError:  nil,
                        },
                        wantErr: RequestError{
                                StatusCode: http.StatusBadRequest,
index 86da4d4..ebd4dce 100644 (file)
@@ -24,6 +24,9 @@ import (
        "encoding/json"
        "fmt"
        "net/http"
+       "os"
+       "os/signal"
+       "syscall"
        "time"
 
        "github.com/gorilla/mux"
@@ -34,43 +37,78 @@ import (
        "oransc.org/usecase/oruclosedloop/internal/restclient"
 )
 
+type Server interface {
+       ListenAndServe() error
+}
+
 const timeoutHTTPClient = time.Second * 5
 const jobId = "14e7bb84-a44d-44c1-90b7-6995a92ad43c"
 
-var infoCoordAddress string
+var jobRegistrationInfo = struct {
+       InfoTypeId    string      `json:"info_type_id"`
+       JobResultUri  string      `json:"job_result_uri"`
+       JobOwner      string      `json:"job_owner"`
+       JobDefinition interface{} `json:"job_definition"`
+}{
+       InfoTypeId:    "STD_Fault_Messages",
+       JobResultUri:  "",
+       JobOwner:      "O-RU Closed Loop Usecase",
+       JobDefinition: "{}",
+}
+
+var client restclient.HTTPClient
+var configuration *config.Config
 var linkfailureConfig linkfailure.Configuration
 var lookupService repository.LookupService
-var host string
-var port string
-var client restclient.HTTPClient
+var consumerPort string
 
 func init() {
-       configuration := config.New()
+       doInit()
+}
+
+func doInit() {
+       configuration = config.New()
+
+       log.SetLevel(configuration.LogLevel)
 
        client = &http.Client{
                Timeout: timeoutHTTPClient,
        }
 
-       log.SetLevel(configuration.LogLevel)
+       consumerPort = fmt.Sprint(configuration.ConsumerPort)
+       jobRegistrationInfo.JobResultUri = configuration.ConsumerHost + ":" + consumerPort
+
+       linkfailureConfig = linkfailure.Configuration{
+               SDNRAddress:  configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort),
+               SDNRUser:     configuration.SDNRUser,
+               SDNRPassword: configuration.SDNPassword,
+       }
+}
 
+func main() {
        if err := validateConfiguration(configuration); err != nil {
-               log.Fatalf("Unable to start consumer due to: %v", err)
+               log.Fatalf("Unable to start consumer due to configuration error: %v", err)
        }
-       host = configuration.ConsumerHost
-       port = fmt.Sprint(configuration.ConsumerPort)
 
        csvFileHelper := repository.NewCsvFileHelperImpl()
-       if initErr := initializeLookupService(csvFileHelper, configuration); initErr != nil {
+       if initErr := initializeLookupService(csvFileHelper, configuration.ORUToODUMapFile); initErr != nil {
                log.Fatalf("Unable to create LookupService due to inability to get O-RU-ID to O-DU-ID map. Cause: %v", initErr)
        }
 
-       infoCoordAddress = configuration.InfoCoordinatorAddress
+       go func() {
+               startServer(&http.Server{
+                       Addr:    ":" + consumerPort,
+                       Handler: getRouter(),
+               })
+               os.Exit(1) // If the startServer function exits, it is because there has been a failure in the server, so we exit.
+       }()
 
-       linkfailureConfig = linkfailure.Configuration{
-               SDNRAddress:  configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort),
-               SDNRUser:     configuration.SDNRUser,
-               SDNRPassword: configuration.SDNPassword,
-       }
+       go func() {
+               deleteOnShutdown(make(chan os.Signal, 1))
+               os.Exit(0)
+       }()
+
+       keepConsumerAlive()
 }
 
 func validateConfiguration(configuration *config.Config) error {
@@ -80,40 +118,41 @@ func validateConfiguration(configuration *config.Config) error {
        return nil
 }
 
-func initializeLookupService(csvFileHelper repository.CsvFileHelper, configuration *config.Config) error {
-       lookupService = repository.NewLookupServiceImpl(csvFileHelper, configuration.ORUToODUMapFile)
-       if initErr := lookupService.Init(); initErr != nil {
-               return initErr
-       }
-       return nil
+func initializeLookupService(csvFileHelper repository.CsvFileHelper, csvFile string) error {
+       lookupService = repository.NewLookupServiceImpl(csvFileHelper, csvFile)
+       return lookupService.Init()
 }
 
-func main() {
-       defer deleteJob()
+func getRouter() *mux.Router {
        messageHandler := linkfailure.NewLinkFailureHandler(lookupService, linkfailureConfig, client)
+
        r := mux.NewRouter()
-       r.HandleFunc("/", messageHandler.MessagesHandler).Methods(http.MethodPost)
-       r.HandleFunc("/admin/start", startHandler).Methods(http.MethodPost)
-       r.HandleFunc("/admin/stop", stopHandler).Methods(http.MethodPost)
-       log.Error(http.ListenAndServe(":"+port, r))
+       r.HandleFunc("/", messageHandler.MessagesHandler).Methods(http.MethodPost).Name("messageHandler")
+       r.HandleFunc("/admin/start", startHandler).Methods(http.MethodPost).Name("start")
+       r.HandleFunc("/admin/stop", stopHandler).Methods(http.MethodPost).Name("stop")
+
+       return r
 }
 
-func startHandler(w http.ResponseWriter, r *http.Request) {
-       jobRegistrationInfo := struct {
-               InfoTypeId    string      `json:"info_type_id"`
-               JobResultUri  string      `json:"job_result_uri"`
-               JobOwner      string      `json:"job_owner"`
-               JobDefinition interface{} `json:"job_definition"`
-       }{
-               InfoTypeId:    "STD_Fault_Messages",
-               JobResultUri:  host + ":" + port,
-               JobOwner:      "O-RU Closed Loop Usecase",
-               JobDefinition: "{}",
+func startServer(server Server) {
+       if err := server.ListenAndServe(); err != nil {
+               log.Errorf("Server stopped unintentionally due to: %v. Deleteing job.", err)
+               if deleteErr := deleteJob(); deleteErr != nil {
+                       log.Error(fmt.Sprintf("Unable to delete consumer job due to: %v. Please remove job %v manually.", deleteErr, jobId))
+               }
        }
+}
+
+func keepConsumerAlive() {
+       forever := make(chan int)
+       <-forever
+}
+
+func startHandler(w http.ResponseWriter, r *http.Request) {
        body, _ := json.Marshal(jobRegistrationInfo)
-       putErr := restclient.PutWithoutAuth(infoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, body, client)
+       putErr := restclient.PutWithoutAuth(configuration.InfoCoordinatorAddress+"/data-consumer/v1/info-jobs/"+jobId, body, client)
        if putErr != nil {
-               http.Error(w, fmt.Sprintf("Unable to register consumer job: %v", putErr), http.StatusBadRequest)
+               http.Error(w, fmt.Sprintf("Unable to register consumer job due to: %v.", putErr), http.StatusBadRequest)
                return
        }
        log.Debug("Registered job.")
@@ -122,12 +161,22 @@ func startHandler(w http.ResponseWriter, r *http.Request) {
 func stopHandler(w http.ResponseWriter, r *http.Request) {
        deleteErr := deleteJob()
        if deleteErr != nil {
-               http.Error(w, fmt.Sprintf("Unable to delete consumer job: %v", deleteErr), http.StatusBadRequest)
+               http.Error(w, fmt.Sprintf("Unable to delete consumer job due to: %v. Please remove job %v manually.", deleteErr, jobId), http.StatusBadRequest)
                return
        }
        log.Debug("Deleted job.")
 }
 
+func deleteOnShutdown(s chan os.Signal) {
+       signal.Notify(s, os.Interrupt)
+       signal.Notify(s, syscall.SIGTERM)
+       <-s
+       log.Info("Shutting down gracefully.")
+       if err := deleteJob(); err != nil {
+               log.Error(fmt.Sprintf("Unable to delete job on shutdown due to: %v. Please remove job %v manually.", err, jobId))
+       }
+}
+
 func deleteJob() error {
-       return restclient.Delete(infoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, client)
+       return restclient.Delete(configuration.InfoCoordinatorAddress+"/data-consumer/v1/info-jobs/"+jobId, client)
 }
diff --git a/test/usecases/oruclosedlooprecovery/goversion/main_test.go b/test/usecases/oruclosedlooprecovery/goversion/main_test.go
new file mode 100644 (file)
index 0000000..99419bf
--- /dev/null
@@ -0,0 +1,440 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "sync"
+       "syscall"
+       "testing"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       "github.com/stretchr/testify/mock"
+       "github.com/stretchr/testify/require"
+       "oransc.org/usecase/oruclosedloop/internal/config"
+       "oransc.org/usecase/oruclosedloop/internal/linkfailure"
+       "oransc.org/usecase/oruclosedloop/mocks"
+)
+
+func Test_init(t *testing.T) {
+       assertions := require.New(t)
+
+       os.Setenv("CONSUMER_HOST", "consumerHost")
+       os.Setenv("CONSUMER_PORT", "8095")
+       t.Cleanup(func() {
+               os.Clearenv()
+       })
+
+       doInit()
+
+       wantedConfiguration := &config.Config{
+               LogLevel:               log.InfoLevel,
+               ConsumerHost:           "consumerHost",
+               ConsumerPort:           8095,
+               InfoCoordinatorAddress: "http://enrichmentservice:8083",
+               SDNRHost:               "http://localhost",
+               SDNRPort:               3904,
+               SDNRUser:               "admin",
+               SDNPassword:            "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
+               ORUToODUMapFile:        "o-ru-to-o-du-map.csv",
+       }
+       assertions.Equal(wantedConfiguration, configuration)
+
+       assertions.Equal(fmt.Sprint(wantedConfiguration.ConsumerPort), consumerPort)
+       assertions.Equal(wantedConfiguration.ConsumerHost+":"+fmt.Sprint(wantedConfiguration.ConsumerPort), jobRegistrationInfo.JobResultUri)
+
+       wantedLinkFailureConfig := linkfailure.Configuration{
+               SDNRAddress:  wantedConfiguration.SDNRHost + ":" + fmt.Sprint(wantedConfiguration.SDNRPort),
+               SDNRUser:     wantedConfiguration.SDNRUser,
+               SDNRPassword: wantedConfiguration.SDNPassword,
+       }
+       assertions.Equal(wantedLinkFailureConfig, linkfailureConfig)
+}
+
+func Test_validateConfiguration(t *testing.T) {
+       assertions := require.New(t)
+
+       type args struct {
+               configuration *config.Config
+       }
+       tests := []struct {
+               name    string
+               args    args
+               wantErr error
+       }{
+               {
+                       name: "Valid config, should return nil",
+                       args: args{
+                               configuration: &config.Config{
+                                       ConsumerHost: "host",
+                                       ConsumerPort: 80,
+                               },
+                       },
+               },
+               {
+                       name: "Invalid config, should return error",
+                       args: args{
+                               configuration: &config.Config{},
+                       },
+                       wantErr: fmt.Errorf("consumer host and port must be provided"),
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       err := validateConfiguration(tt.args.configuration)
+                       assertions.Equal(tt.wantErr, err)
+               })
+       }
+}
+
+func Test_initializeLookupService(t *testing.T) {
+       assertions := require.New(t)
+       type args struct {
+               csvFile         string
+               oRuId           string
+               mockReturn      [][]string
+               mockReturnError error
+       }
+       tests := []struct {
+               name        string
+               args        args
+               wantODuId   string
+               wantInitErr error
+       }{
+               {
+                       name: "Successful initialization, should return nil and lookup service should be initiated with data",
+                       args: args{
+                               csvFile:    "file",
+                               oRuId:      "1",
+                               mockReturn: [][]string{{"1", "2"}},
+                       },
+                       wantODuId: "2",
+               },
+               {
+                       name: "Unsuccessful initialization, should return error and lookup service should not be initiated with data",
+                       args: args{
+                               csvFile:         "file",
+                               mockReturnError: errors.New("Error"),
+                       },
+                       wantInitErr: errors.New("Error"),
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       mockCsvFileHelper := &mocks.CsvFileHelper{}
+                       mockCsvFileHelper.On("GetCsvFromFile", mock.Anything).Return(tt.args.mockReturn, tt.args.mockReturnError)
+
+                       err := initializeLookupService(mockCsvFileHelper, tt.args.csvFile)
+                       oDuId, _ := lookupService.GetODuID(tt.args.oRuId)
+                       assertions.Equal(tt.wantODuId, oDuId)
+                       assertions.Equal(tt.wantInitErr, err)
+                       mockCsvFileHelper.AssertCalled(t, "GetCsvFromFile", tt.args.csvFile)
+               })
+       }
+}
+
+func Test_getRouter_shouldContainAllPathsWithHandlers(t *testing.T) {
+       assertions := require.New(t)
+
+       r := getRouter()
+       messageHandlerRoute := r.Get("messageHandler")
+       assertions.NotNil(messageHandlerRoute)
+       supportedMethods, err := messageHandlerRoute.GetMethods()
+       assertions.Equal([]string{http.MethodPost}, supportedMethods)
+       assertions.Nil(err)
+       path, _ := messageHandlerRoute.GetPathTemplate()
+       assertions.Equal("/", path)
+
+       startHandlerRoute := r.Get("start")
+       assertions.NotNil(messageHandlerRoute)
+       supportedMethods, err = startHandlerRoute.GetMethods()
+       assertions.Equal([]string{http.MethodPost}, supportedMethods)
+       assertions.Nil(err)
+       path, _ = startHandlerRoute.GetPathTemplate()
+       assertions.Equal("/admin/start", path)
+
+       stopHandlerRoute := r.Get("stop")
+       assertions.NotNil(stopHandlerRoute)
+       supportedMethods, err = stopHandlerRoute.GetMethods()
+       assertions.Equal([]string{http.MethodPost}, supportedMethods)
+       assertions.Nil(err)
+       path, _ = stopHandlerRoute.GetPathTemplate()
+       assertions.Equal("/admin/stop", path)
+}
+
+func Test_startServer_shouldDeleteJobWhenServerStopsWithErrorAndLog(t *testing.T) {
+       assertions := require.New(t)
+
+       var buf bytes.Buffer
+       log.SetOutput(&buf)
+
+       os.Setenv("CONSUMER_PORT", "wrong")
+       t.Cleanup(func() {
+               log.SetOutput(os.Stderr)
+       })
+
+       mockServer := &mocks.Server{}
+       mockServer.On("ListenAndServe").Return(errors.New("Server failure"))
+
+       startServer(mockServer)
+
+       log := buf.String()
+       assertions.Contains(log, "level=error")
+       assertions.Contains(log, "Server stopped unintentionally due to: Server failure. Deleteing job.")
+       assertions.Contains(log, "Please remove job 14e7bb84-a44d-44c1-90b7-6995a92ad43c manually")
+}
+
+func Test_startHandler(t *testing.T) {
+       assertions := require.New(t)
+
+       jobRegistrationInfo.JobResultUri = "host:80"
+
+       type args struct {
+               mockReturnBody   []byte
+               mockReturnStatus int
+       }
+       tests := []struct {
+               name         string
+               args         args
+               wantedStatus int
+               wantedBody   string
+       }{
+               {
+                       name: "Start with successful registration, should return ok",
+                       args: args{
+                               mockReturnBody:   []byte(""),
+                               mockReturnStatus: http.StatusOK,
+                       },
+                       wantedStatus: http.StatusOK,
+               },
+               {
+                       name: "Start with error response at registration, should return error",
+                       args: args{
+                               mockReturnBody:   []byte("error"),
+                               mockReturnStatus: http.StatusBadRequest,
+                       },
+                       wantedStatus: http.StatusBadRequest,
+                       wantedBody:   "Unable to register consumer job due to:",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       clientMock := setUpClientMock(tt.args.mockReturnBody, tt.args.mockReturnStatus)
+
+                       handler := http.HandlerFunc(startHandler)
+                       responseRecorder := httptest.NewRecorder()
+                       r, _ := http.NewRequest(http.MethodPost, "/start", nil)
+
+                       handler.ServeHTTP(responseRecorder, r)
+
+                       assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
+                       assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
+
+                       var wantedJobRegistrationInfo = struct {
+                               InfoTypeId    string      `json:"info_type_id"`
+                               JobResultUri  string      `json:"job_result_uri"`
+                               JobOwner      string      `json:"job_owner"`
+                               JobDefinition interface{} `json:"job_definition"`
+                       }{
+                               InfoTypeId:    "STD_Fault_Messages",
+                               JobResultUri:  "host:80",
+                               JobOwner:      "O-RU Closed Loop Usecase",
+                               JobDefinition: "{}",
+                       }
+                       wantedBody, _ := json.Marshal(wantedJobRegistrationInfo)
+
+                       var actualRequest *http.Request
+                       clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+                               actualRequest = req
+                               return true
+                       }))
+                       assertions.Equal(http.MethodPut, actualRequest.Method)
+                       assertions.Equal("http", actualRequest.URL.Scheme)
+                       assertions.Equal("enrichmentservice:8083", actualRequest.URL.Host)
+                       assertions.Equal("/data-consumer/v1/info-jobs/14e7bb84-a44d-44c1-90b7-6995a92ad43c", actualRequest.URL.Path)
+                       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+                       body, _ := ioutil.ReadAll(actualRequest.Body)
+                       expectedBody := wantedBody
+                       assertions.Equal(expectedBody, body)
+                       clientMock.AssertNumberOfCalls(t, "Do", 1)
+               })
+       }
+}
+
+func Test_stopHandler(t *testing.T) {
+       assertions := require.New(t)
+
+       jobRegistrationInfo.JobResultUri = "host:80"
+
+       type args struct {
+               mockReturnBody   []byte
+               mockReturnStatus int
+       }
+       tests := []struct {
+               name         string
+               args         args
+               wantedStatus int
+               wantedBody   string
+       }{
+               {
+                       name: "Stop with successful job deletion, should return ok",
+                       args: args{
+                               mockReturnBody:   []byte(""),
+                               mockReturnStatus: http.StatusOK,
+                       },
+                       wantedStatus: http.StatusOK,
+               },
+               {
+                       name: "Stop with error response at job deletion, should return error",
+                       args: args{
+                               mockReturnBody:   []byte("error"),
+                               mockReturnStatus: http.StatusBadRequest,
+                       },
+                       wantedStatus: http.StatusBadRequest,
+                       wantedBody:   "Please remove job 14e7bb84-a44d-44c1-90b7-6995a92ad43c manually",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       clientMock := setUpClientMock(tt.args.mockReturnBody, tt.args.mockReturnStatus)
+
+                       handler := http.HandlerFunc(stopHandler)
+                       responseRecorder := httptest.NewRecorder()
+                       r, _ := http.NewRequest(http.MethodPost, "/stop", nil)
+
+                       handler.ServeHTTP(responseRecorder, r)
+
+                       assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
+                       assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
+
+                       var actualRequest *http.Request
+                       clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+                               actualRequest = req
+                               return true
+                       }))
+                       assertions.Equal(http.MethodDelete, actualRequest.Method)
+                       assertions.Equal("http", actualRequest.URL.Scheme)
+                       assertions.Equal("enrichmentservice:8083", actualRequest.URL.Host)
+                       assertions.Equal("/data-consumer/v1/info-jobs/14e7bb84-a44d-44c1-90b7-6995a92ad43c", actualRequest.URL.Path)
+                       clientMock.AssertNumberOfCalls(t, "Do", 1)
+               })
+       }
+}
+
+func Test_deleteOnShutdown(t *testing.T) {
+       assertions := require.New(t)
+
+       var buf bytes.Buffer
+       log.SetOutput(&buf)
+
+       t.Cleanup(func() {
+               log.SetOutput(os.Stderr)
+       })
+
+       type args struct {
+               mockReturnBody   []byte
+               mockReturnStatus int
+       }
+       tests := []struct {
+               name      string
+               args      args
+               wantedLog string
+       }{
+               {
+                       name: "Delete with successful job deletion, should return ok",
+                       args: args{
+                               mockReturnBody:   []byte(""),
+                               mockReturnStatus: http.StatusOK,
+                       },
+               },
+               {
+                       name: "Stop with error response at job deletion, should return error",
+                       args: args{
+                               mockReturnBody:   []byte("error"),
+                               mockReturnStatus: http.StatusBadRequest,
+                       },
+                       wantedLog: "Please remove job 14e7bb84-a44d-44c1-90b7-6995a92ad43c manually",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       setUpClientMock(tt.args.mockReturnBody, tt.args.mockReturnStatus)
+
+                       c := make(chan os.Signal, 1)
+                       go deleteOnShutdown(c)
+                       c <- syscall.SIGTERM
+
+                       waitForLogToBeWritten(&buf)
+
+                       log := buf.String()
+                       if tt.wantedLog != "" {
+                               assertions.Contains(log, "level=error")
+                               assertions.Contains(log, "Unable to delete job on shutdown due to:")
+                               assertions.Contains(log, tt.wantedLog)
+                       }
+               })
+       }
+}
+
+func waitForLogToBeWritten(logBuf *bytes.Buffer) {
+       wg := sync.WaitGroup{}
+       wg.Add(1)
+       for {
+               if waitTimeout(&wg, 10*time.Millisecond) && logBuf.Len() != 0 {
+                       wg.Done()
+                       break
+               }
+       }
+}
+
+// waitTimeout waits for the waitgroup for the specified max timeout.
+// Returns true if waiting timed out.
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               return false // completed normally
+       case <-time.After(timeout):
+               return true // timed out
+       }
+}
+
+func setUpClientMock(body []byte, status int) *mocks.HTTPClient {
+       clientMock := mocks.HTTPClient{}
+       clientMock.On("Do", mock.Anything).Return(&http.Response{
+               Body:       ioutil.NopCloser(bytes.NewReader(body)),
+               StatusCode: status,
+       }, nil)
+       client = &clientMock
+       return &clientMock
+}
diff --git a/test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go b/test/usecases/oruclosedlooprecovery/goversion/mocks/Server.go
new file mode 100644 (file)
index 0000000..ad16503
--- /dev/null
@@ -0,0 +1,24 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import mock "github.com/stretchr/testify/mock"
+
+// Server is an autogenerated mock type for the Server type
+type Server struct {
+       mock.Mock
+}
+
+// ListenAndServe provides a mock function with given fields:
+func (_m *Server) ListenAndServe() error {
+       ret := _m.Called()
+
+       var r0 error
+       if rf, ok := ret.Get(0).(func() error); ok {
+               r0 = rf()
+       } else {
+               r0 = ret.Error(0)
+       }
+
+       return r0
+}
diff --git a/test/usecases/oruclosedlooprecovery/goversion/simulator/producer.go b/test/usecases/oruclosedlooprecovery/goversion/simulator/producer.go
new file mode 100644 (file)
index 0000000..78462ba
--- /dev/null
@@ -0,0 +1,66 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "net/http"
+       "time"
+
+       "oransc.org/usecase/oruclosedloop/internal/ves"
+)
+
+func main() {
+       message := ves.FaultMessage{
+               Event: ves.Event{
+                       CommonEventHeader: ves.CommonEventHeader{
+                               Domain:     "fault",
+                               SourceName: "ERICSSON-O-RU-11220",
+                       },
+                       FaultFields: ves.FaultFields{
+                               AlarmCondition: "28",
+                       },
+               },
+       }
+       client := &http.Client{
+               Timeout: 5 * time.Second,
+       }
+
+       critical := true
+       for range time.Tick(2 * time.Second) {
+               if critical {
+                       message.Event.FaultFields.EventSeverity = "CRITICAL"
+                       critical = false
+               } else {
+                       critical = true
+                       message.Event.FaultFields.EventSeverity = "NORMAL"
+               }
+               m, _ := json.Marshal(message)
+               msgToSend, _ := json.Marshal([]string{string(m)})
+
+               req, _ := http.NewRequest(http.MethodPost, "http://localhost:40935", bytes.NewBuffer(msgToSend))
+               req.Header.Set("Content-Type", "application/json; charset=utf-8")
+
+               client.Do(req)
+       }
+
+}