Add some rmr library stats into metrics 67/8567/1 f-release v0.9.14
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Wed, 15 Jun 2022 07:54:43 +0000 (10:54 +0300)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Wed, 15 Jun 2022 08:56:07 +0000 (11:56 +0300)
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
Change-Id: I25e07abe5e1772c085116cef23c1273820cb051c

ci/Dockerfile
go.mod
pkg/xapp/restapi.go
pkg/xapp/rmr.go
pkg/xapp/types.go

index 71aaed3..e354c89 100755 (executable)
@@ -37,7 +37,7 @@ RUN apt-get update -y \
 RUN curl -s https://packagecloud.io/install/repositories/o-ran-sc/master/script.deb.sh | bash
 
 # RMR
-ARG RMRVERSION=4.2.2
+ARG RMRVERSION=4.8.0
 #RUN apt-get install -y rmr=${RMRVERSION} rmr-dev=${RMRVERSION}
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb
diff --git a/go.mod b/go.mod
index 0e018c7..4e8257b 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
 module gerrit.o-ran-sc.org/r/ric-plt/xapp-frame
 
-go 1.12
+go 1.16
 
 require (
        gerrit.o-ran-sc.org/r/com/golog v0.0.2
@@ -25,6 +25,39 @@ require (
        github.com/spf13/viper v1.4.0
        github.com/stretchr/testify v1.5.1
        golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
+)
+
+require (
+       github.com/PuerkitoBio/purell v1.1.1 // indirect
+       github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
+       github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
+       github.com/beorn7/perks v1.0.0 // indirect
+       github.com/davecgh/go-spew v1.1.1 // indirect
+       github.com/docker/go-units v0.4.0 // indirect
+       github.com/go-openapi/analysis v0.19.5 // indirect
+       github.com/go-openapi/jsonpointer v0.19.3 // indirect
+       github.com/go-openapi/jsonreference v0.19.3 // indirect
+       github.com/go-redis/redis v6.15.9+incompatible // indirect
+       github.com/go-stack/stack v1.8.0 // indirect
+       github.com/hashicorp/hcl v1.0.0 // indirect
+       github.com/magiconair/properties v1.8.0 // indirect
+       github.com/mailru/easyjson v0.7.0 // indirect
+       github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
+       github.com/mitchellh/mapstructure v1.1.2 // indirect
+       github.com/pelletier/go-toml v1.2.0 // indirect
+       github.com/pmezard/go-difflib v1.0.0 // indirect
+       github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect
+       github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 // indirect
+       github.com/spf13/afero v1.2.2 // indirect
+       github.com/spf13/cast v1.3.0 // indirect
+       github.com/spf13/jwalterweatherman v1.0.0 // indirect
+       github.com/spf13/pflag v1.0.3 // indirect
+       github.com/stretchr/objx v0.2.0 // indirect
+       go.mongodb.org/mongo-driver v1.1.2 // indirect
+       golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 // indirect
+       golang.org/x/text v0.3.2 // indirect
+       google.golang.org/protobuf v1.23.0 // indirect
+       gopkg.in/yaml.v2 v2.3.0 // indirect
        k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
 )
 
index 12101a8..54e18ea 100755 (executable)
@@ -158,7 +158,7 @@ func (r *Router) CollectDefaultSymptomData(fileName string, data interface{}) st
        //
        // Collect metrics
        //
-       if metrics, err := r.GetLocalMetrics(GetPortData("http").Port); err == nil {
+       if metrics, err := r.GetLocalMetrics(); err == nil {
                if err := Util.WriteToFile(baseDir+"metrics.json", metrics); err != nil {
                        Logger.Error("writeToFile failed for metrics.json: %v", err)
                }
@@ -235,7 +235,7 @@ func (r *Router) SendSymptomDataError(w http.ResponseWriter, req *http.Request,
        http.Error(w, message, http.StatusInternalServerError)
 }
 
-func (r *Router) GetLocalMetrics(port int) (string, error) {
+func (r *Router) GetLocalMetrics() (string, error) {
        buf := &bytes.Buffer{}
        enc := expfmt.NewEncoder(buf, expfmt.FmtText)
        vals, err := prometheus.DefaultGatherer.Gather()
@@ -251,6 +251,14 @@ func (r *Router) GetLocalMetrics(port int) (string, error) {
        return string(buf.Bytes()), nil
 }
 
+//Resource.InjectRoute(url, metricsHandler, "GET")
+//func metricsHandler(w http.ResponseWriter, r *http.Request) {
+//     w.Header().Set("Content-Type", "text/plain")
+//     w.WriteHeader(http.StatusOK)
+//     metrics, _ := Resource.GetLocalMetrics()
+//     w.Write([]byte(metrics))
+//}
+
 func IsHealthProbeReady() bool {
        return healthReady
 }
index 9342828..028cc3c 100755 (executable)
@@ -76,9 +76,16 @@ import (
 
 var RMRCounterOpts = []CounterOpts{
        {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
-       {Name: "Received", Help: "The total number of received RMR messages"},
        {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
+       {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"},
+       {Name: "Received", Help: "The total number of received RMR messages"},
        {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
+       {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"},
+}
+
+var RMRGaugeOpts = []CounterOpts{
+       {Name: "Enqueued", Help: "The total number of enqueued in RMR library"},
+       {Name: "Dropped", Help: "The total number of dropped in RMR library"},
 }
 
 var RMRErrors = map[int]string{
@@ -164,7 +171,8 @@ func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
        return &RMRClient{
                context:           ctx,
                consumers:         make([]MessageConsumer, 0),
-               stat:              Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
+               statc:             Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
+               statg:             Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc),
                maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
        }
 }
@@ -208,12 +216,12 @@ func (m *RMRClient) Start(c MessageConsumer) {
                time.Sleep(1 * time.Second)
                counter++
        }
-       m.wg.Add(1)
 
        if m.readyCb != nil {
                go m.readyCb(m.readyCbParams)
        }
 
+       m.wg.Add(1)
        go func() {
                m.contextMux.Lock()
                rfd := C.rmr_get_rcvfd(m.context)
@@ -222,6 +230,7 @@ func (m *RMRClient) Start(c MessageConsumer) {
 
                defer m.wg.Done()
                for {
+
                        if int(C.wait_epoll(efd, rfd)) == 0 {
                                continue
                        }
@@ -239,9 +248,30 @@ func (m *RMRClient) Start(c MessageConsumer) {
                }
        }()
 
+       m.wg.Add(1)
+       go func() {
+               defer m.wg.Done()
+               for {
+                       m.UpdateRmrStats()
+                       time.Sleep(1 * time.Second)
+               }
+       }()
+
        m.wg.Wait()
 }
 
+func (m *RMRClient) UpdateRmrStats() {
+       param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{}))))
+       m.contextMux.Lock()
+       C.rmr_get_rx_debug_info(m.context, param)
+       m.contextMux.Unlock()
+       m.mux.Lock()
+       m.statg["Enqueued"].Set(float64(param.enqueue))
+       m.statg["Dropped"].Set(float64(param.drop))
+       m.mux.Unlock()
+       C.free(unsafe.Pointer(param))
+}
+
 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
        if len(m.consumers) == 0 {
                Logger.Info("rmrClient: No message handlers defined, message discarded!")
@@ -340,6 +370,7 @@ func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duratio
        for ; i < int(to)*2 && status == false; i++ {
                status = m.Send(params, isRts)
                if status == false {
+                       m.UpdateStatCounter("SendWithRetryRetry")
                        time.Sleep(500 * time.Millisecond)
                }
        }
@@ -454,6 +485,7 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
                        }
                }
                m.contextMux.Unlock()
+               m.UpdateStatCounter("TransmitRetry")
        }
 
        if currBuffer == nil {
@@ -545,12 +577,13 @@ func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
 
 func (m *RMRClient) UpdateStatCounter(name string) {
        m.mux.Lock()
-       m.stat[name].Inc()
+       m.statc[name].Inc()
        m.mux.Unlock()
 }
 
 func (m *RMRClient) RegisterMetrics() {
-       m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
+       m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
+       m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR")
 }
 
 func (m *RMRClient) Wait() {
index b517ff3..6ae981a 100755 (executable)
@@ -29,7 +29,8 @@ type RMRClient struct {
        ready             int
        wg                sync.WaitGroup
        mux               sync.Mutex
-       stat              map[string]Counter
+       statc             map[string]Counter
+       statg             map[string]Gauge
        consumers         []MessageConsumer
        readyCb           ReadyCB
        readyCbParams     interface{}