Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
+The producer provides a REST API to control the log level. The available levels are the same as the ones used in the configuration above.
+
+ PUT https://mrproducer:8085/admin/log?level=<new level>
+
## Development
To make it easy to test during development of the producer, two stubs are provided in the `stub` folder.
-One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
>1. cd stub/dmaap
>2. go build
->3. ./dmaap
+>3. ./dmaap [-port \<PORT>]
-One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
>1. cd stub/consumer
>2. go build
->3. ./consumer
+>3. ./consumer [-port \<PORT>]
Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in.
go 1.17
require (
+ github.com/gorilla/mux v1.8.0
+ github.com/hashicorp/go-retryablehttp v0.7.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
- github.com/gorilla/mux v1.8.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
- github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.0 // indirect
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
"time"
"github.com/hashicorp/go-retryablehttp"
+ log "github.com/sirupsen/logrus"
)
// HTTPClient interface
func CreateRetryClient(cert tls.Certificate) *http.Client {
rawRetryClient := retryablehttp.NewClient()
+ rawRetryClient.Logger = leveledLogger{}
rawRetryClient.RetryWaitMax = time.Minute
rawRetryClient.RetryMax = math.MaxInt
rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
u, _ := url.Parse(configUrl)
return u.Scheme == "https"
}
+
+// Used to get leveled logging in the RetryClient
+type leveledLogger struct {
+}
+
+func (ll leveledLogger) Error(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Error(msg)
+}
+func (ll leveledLogger) Info(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Info(msg)
+}
+func (ll leveledLogger) Debug(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Debug(msg)
+}
+func (ll leveledLogger) Warn(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Warn(msg)
+}
+
+func getFields(keysAndValues []interface{}) log.Fields {
+ fields := log.Fields{}
+ for i := 0; i < len(keysAndValues); i = i + 2 {
+ fields[fmt.Sprint(keysAndValues[i])] = keysAndValues[i+1]
+ }
+ return fields
+}
"net/http"
"github.com/gorilla/mux"
+ log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
)
const AddJobPath = "/jobs"
const jobIdToken = "infoJobId"
const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}"
+const logLevelToken = "level"
+const logAdminPath = "/admin/log"
type ProducerCallbackHandler struct {
jobsManager jobs.JobsManager
r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status")
r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add")
r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
+ r.HandleFunc(logAdminPath, callbackHandler.setLogLevel).Methods(http.MethodPut).Name("setLogLevel")
r.NotFoundHandler = ¬FoundHandler{}
r.MethodNotAllowedHandler = &methodNotAllowedHandler{}
return r
h.jobsManager.DeleteJobFromRESTCall(id)
}
+func (h *ProducerCallbackHandler) setLogLevel(w http.ResponseWriter, r *http.Request) {
+ query := r.URL.Query()
+ logLevelStr := query.Get(logLevelToken)
+ if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
+ log.SetLevel(loglevel)
+ } else {
+ http.Error(w, fmt.Sprintf("Invalid log level: %v. Log level will not be changed!", logLevelStr), http.StatusBadRequest)
+ return
+ }
+}
+
type notFoundHandler struct{}
func (h *notFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t))
assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code)
assertions.Contains(responseRecorder.Body.String(), "Method is not supported.")
+
+ setLogLevelRoute := r.Get("setLogLevel")
+ assertions.NotNil(setLogLevelRoute)
+ supportedMethods, err = setLogLevelRoute.GetMethods()
+ assertions.Equal([]string{http.MethodPut}, supportedMethods)
+ assertions.Nil(err)
+ path, _ = setLogLevelRoute.GetPathTemplate()
+ assertions.Equal("/admin/log", path)
}
func TestStatusHandler(t *testing.T) {
},
},
wantedStatus: http.StatusOK,
- wantedBody: "",
},
{
name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
}
+func TestSetLogLevel(t *testing.T) {
+ assertions := require.New(t)
+
+ type args struct {
+ logLevel string
+ }
+ tests := []struct {
+ name string
+ args args
+ wantedStatus int
+ wantedBody string
+ }{
+ {
+ name: "Set to valid log level, should return OK",
+ args: args{
+ logLevel: "Debug",
+ },
+ wantedStatus: http.StatusOK,
+ },
+ {
+ name: "Set to invalid log level, should return BadRequest",
+ args: args{
+ logLevel: "bad",
+ },
+ wantedStatus: http.StatusBadRequest,
+ wantedBody: "Invalid log level: bad",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ callbackHandlerUnderTest := NewProducerCallbackHandler(nil)
+
+ handler := http.HandlerFunc(callbackHandlerUnderTest.setLogLevel)
+ responseRecorder := httptest.NewRecorder()
+ r, _ := http.NewRequest(http.MethodPut, "/admin/log?level="+tt.args.logLevel, nil)
+
+ handler.ServeHTTP(responseRecorder, r)
+
+ assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
+ assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
+ })
+ }
+}
+
func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
var body io.Reader
if jobInfo != nil {
}
retryClient := restclient.CreateRetryClient(cert)
- jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 5*time.Second))
+ jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
log.Fatalf("Stopping producer due to: %v", err)
}
registerJob(*port)
- fmt.Print("Starting consumer on port: ", *port)
+ fmt.Println("Starting consumer on port: ", *port)
fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), nil))
}
InfoTypeId: "STD_Fault_Messages",
JobDefinition: "{}",
}
- fmt.Print("Registering consumer: ", jobInfo)
+ fmt.Println("Registering consumer: ", jobInfo)
body, _ := json.Marshal(jobInfo)
putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)
if putErr != nil {
- fmt.Printf("Unable to register consumer: %v", putErr)
+ fmt.Println("Unable to register consumer: ", putErr)
}
}
var responseBody []byte
if critical {
responseBody = getFaultMessage("CRITICAL")
+ fmt.Println("Sending CRITICAL")
critical = false
} else {
responseBody = getFaultMessage("NORMAL")
+ fmt.Println("Sending NORMAL")
critical = true
}
- // w.Write(responseBody)
fmt.Fprint(w, string(responseBody))
}