Make logging in Mediator Producer managed by REST 10/7110/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 23 Nov 2021 16:01:07 +0000 (17:01 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 23 Nov 2021 16:08:16 +0000 (17:08 +0100)
Issue-ID: NONRTRIC-645
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I9c091af5742e9e6ed37b1695439b7f9117172379

dmaap-mediator-producer/README.md
dmaap-mediator-producer/go.mod
dmaap-mediator-producer/go.sum
dmaap-mediator-producer/internal/restclient/HTTPClient.go
dmaap-mediator-producer/internal/server/server.go
dmaap-mediator-producer/internal/server/server_test.go
dmaap-mediator-producer/main.go
dmaap-mediator-producer/stub/consumer/consumerstub.go
dmaap-mediator-producer/stub/dmaap/mrstub.go

index 2fd7194..9a0903b 100644 (file)
@@ -38,19 +38,23 @@ At start up the producer will register the configured job types in ICS and also
 
 Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
 
+The producer provides a REST API to control the log level. The available levels are the same as the ones used in the configuration above.
+
+    PUT https://mrproducer:8085/admin/log?level=<new level>
+
 ## Development
 
 To make it easy to test during development of the producer, two stubs are provided in the `stub` folder.
 
-One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
 >1. cd stub/dmaap
 >2. go build
->3. ./dmaap
+>3. ./dmaap [-port \<PORT>]
 
-One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
 >1. cd stub/consumer
 >2. go build
->3. ./consumer
+>3. ./consumer [-port \<PORT>]
 
 Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in.
 
index ffea6a2..eaaecf7 100644 (file)
@@ -3,15 +3,15 @@ module oransc.org/nonrtric/dmaapmediatorproducer
 go 1.17
 
 require (
+       github.com/gorilla/mux v1.8.0
+       github.com/hashicorp/go-retryablehttp v0.7.0
        github.com/sirupsen/logrus v1.8.1
        github.com/stretchr/testify v1.7.0
 )
 
 require (
        github.com/davecgh/go-spew v1.1.1 // indirect
-       github.com/gorilla/mux v1.8.0 // indirect
        github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
-       github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
        github.com/pmezard/go-difflib v1.0.0 // indirect
        github.com/stretchr/objx v0.1.0 // indirect
        golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
index 8447fa0..4b3557b 100644 (file)
@@ -5,6 +5,7 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
 github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
 github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
 github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
 github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
 github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
 github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
index 8ccd4b2..9a827e7 100644 (file)
@@ -31,6 +31,7 @@ import (
        "time"
 
        "github.com/hashicorp/go-retryablehttp"
+       log "github.com/sirupsen/logrus"
 )
 
 // HTTPClient interface
@@ -115,6 +116,7 @@ func CreateClientCertificate(certPath string, keyPath string) (tls.Certificate,
 
 func CreateRetryClient(cert tls.Certificate) *http.Client {
        rawRetryClient := retryablehttp.NewClient()
+       rawRetryClient.Logger = leveledLogger{}
        rawRetryClient.RetryWaitMax = time.Minute
        rawRetryClient.RetryMax = math.MaxInt
        rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
@@ -145,3 +147,28 @@ func IsUrlSecure(configUrl string) bool {
        u, _ := url.Parse(configUrl)
        return u.Scheme == "https"
 }
+
+// Used to get leveled logging in the RetryClient
+type leveledLogger struct {
+}
+
+func (ll leveledLogger) Error(msg string, keysAndValues ...interface{}) {
+       log.WithFields(getFields(keysAndValues)).Error(msg)
+}
+func (ll leveledLogger) Info(msg string, keysAndValues ...interface{}) {
+       log.WithFields(getFields(keysAndValues)).Info(msg)
+}
+func (ll leveledLogger) Debug(msg string, keysAndValues ...interface{}) {
+       log.WithFields(getFields(keysAndValues)).Debug(msg)
+}
+func (ll leveledLogger) Warn(msg string, keysAndValues ...interface{}) {
+       log.WithFields(getFields(keysAndValues)).Warn(msg)
+}
+
+func getFields(keysAndValues []interface{}) log.Fields {
+       fields := log.Fields{}
+       for i := 0; i < len(keysAndValues); i = i + 2 {
+               fields[fmt.Sprint(keysAndValues[i])] = keysAndValues[i+1]
+       }
+       return fields
+}
index 79646c2..8c5577d 100644 (file)
@@ -27,6 +27,7 @@ import (
        "net/http"
 
        "github.com/gorilla/mux"
+       log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
@@ -34,6 +35,8 @@ const StatusPath = "/status"
 const AddJobPath = "/jobs"
 const jobIdToken = "infoJobId"
 const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}"
+const logLevelToken = "level"
+const logAdminPath = "/admin/log"
 
 type ProducerCallbackHandler struct {
        jobsManager jobs.JobsManager
@@ -51,6 +54,7 @@ func NewRouter(jm jobs.JobsManager) *mux.Router {
        r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status")
        r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add")
        r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
+       r.HandleFunc(logAdminPath, callbackHandler.setLogLevel).Methods(http.MethodPut).Name("setLogLevel")
        r.NotFoundHandler = &notFoundHandler{}
        r.MethodNotAllowedHandler = &methodNotAllowedHandler{}
        return r
@@ -87,6 +91,17 @@ func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r
        h.jobsManager.DeleteJobFromRESTCall(id)
 }
 
+func (h *ProducerCallbackHandler) setLogLevel(w http.ResponseWriter, r *http.Request) {
+       query := r.URL.Query()
+       logLevelStr := query.Get(logLevelToken)
+       if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
+               log.SetLevel(loglevel)
+       } else {
+               http.Error(w, fmt.Sprintf("Invalid log level: %v. Log level will not be changed!", logLevelStr), http.StatusBadRequest)
+               return
+       }
+}
+
 type notFoundHandler struct{}
 
 func (h *notFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
index 1d458c9..1db3644 100644 (file)
@@ -78,6 +78,14 @@ func TestNewRouter(t *testing.T) {
        handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t))
        assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code)
        assertions.Contains(responseRecorder.Body.String(), "Method is not supported.")
+
+       setLogLevelRoute := r.Get("setLogLevel")
+       assertions.NotNil(setLogLevelRoute)
+       supportedMethods, err = setLogLevelRoute.GetMethods()
+       assertions.Equal([]string{http.MethodPut}, supportedMethods)
+       assertions.Nil(err)
+       path, _ = setLogLevelRoute.GetPathTemplate()
+       assertions.Equal("/admin/log", path)
 }
 
 func TestStatusHandler(t *testing.T) {
@@ -119,7 +127,6 @@ func TestAddInfoJobHandler(t *testing.T) {
                                },
                        },
                        wantedStatus: http.StatusOK,
-                       wantedBody:   "",
                },
                {
                        name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
@@ -171,6 +178,50 @@ func TestDeleteJob(t *testing.T) {
        jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
 }
 
+func TestSetLogLevel(t *testing.T) {
+       assertions := require.New(t)
+
+       type args struct {
+               logLevel string
+       }
+       tests := []struct {
+               name         string
+               args         args
+               wantedStatus int
+               wantedBody   string
+       }{
+               {
+                       name: "Set to valid log level, should return OK",
+                       args: args{
+                               logLevel: "Debug",
+                       },
+                       wantedStatus: http.StatusOK,
+               },
+               {
+                       name: "Set to invalid log level, should return BadRequest",
+                       args: args{
+                               logLevel: "bad",
+                       },
+                       wantedStatus: http.StatusBadRequest,
+                       wantedBody:   "Invalid log level: bad",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       callbackHandlerUnderTest := NewProducerCallbackHandler(nil)
+
+                       handler := http.HandlerFunc(callbackHandlerUnderTest.setLogLevel)
+                       responseRecorder := httptest.NewRecorder()
+                       r, _ := http.NewRequest(http.MethodPut, "/admin/log?level="+tt.args.logLevel, nil)
+
+                       handler.ServeHTTP(responseRecorder, r)
+
+                       assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
+                       assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
+               })
+       }
+}
+
 func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
        var body io.Reader
        if jobInfo != nil {
index 194ed75..3a4935a 100644 (file)
@@ -56,7 +56,7 @@ func main() {
        }
        retryClient := restclient.CreateRetryClient(cert)
 
-       jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 5*time.Second))
+       jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
        if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
                log.Fatalf("Stopping producer due to: %v", err)
        }
index 5cbcaea..4260cae 100644 (file)
@@ -43,7 +43,7 @@ func main() {
 
        registerJob(*port)
 
-       fmt.Print("Starting consumer on port: ", *port)
+       fmt.Println("Starting consumer on port: ", *port)
        fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), nil))
 }
 
@@ -59,11 +59,11 @@ func registerJob(port int) {
                InfoTypeId:    "STD_Fault_Messages",
                JobDefinition: "{}",
        }
-       fmt.Print("Registering consumer: ", jobInfo)
+       fmt.Println("Registering consumer: ", jobInfo)
        body, _ := json.Marshal(jobInfo)
        putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)
        if putErr != nil {
-               fmt.Printf("Unable to register consumer: %v", putErr)
+               fmt.Println("Unable to register consumer: ", putErr)
        }
 }
 
index 36ffa39..451bc9a 100644 (file)
@@ -71,12 +71,13 @@ func handleData(w http.ResponseWriter, req *http.Request) {
        var responseBody []byte
        if critical {
                responseBody = getFaultMessage("CRITICAL")
+               fmt.Println("Sending CRITICAL")
                critical = false
        } else {
                responseBody = getFaultMessage("NORMAL")
+               fmt.Println("Sending NORMAL")
                critical = true
        }
-       // w.Write(responseBody)
        fmt.Fprint(w, string(responseBody))
 }