From d28b8dd34b07b5af77802ba408d859b00215cd35 Mon Sep 17 00:00:00 2001 From: Juha Hyttinen Date: Wed, 27 May 2020 09:21:08 +0300 Subject: [PATCH] C rmr context is now protected. Stabilizes rmr usage with multithreads Logger "wrapper" checks log level before continuing, saves few cycles Multiple worker support removed. Application can do this Change-Id: I8771e5a93a9ea8175e1e6ab7559c12b3569e04dd Signed-off-by: Juha Hyttinen --- config/config-file.yaml | 1 - examples/config/config-file.json | 1 - pkg/xapp/logger.go | 12 +++++ pkg/xapp/rmr.go | 110 ++++++++++++++++++++++++++------------ pkg/xapp/types.go | 3 +- pkg/xapp/xapp_test.go | 13 +++++ test/manifest/config-file-rx.yaml | 1 - test/manifest/config-file-tx.yaml | 1 - 8 files changed, 101 insertions(+), 41 deletions(-) diff --git a/config/config-file.yaml b/config/config-file.yaml index 59656c8..a75c078 100755 --- a/config/config-file.yaml +++ b/config/config-file.yaml @@ -20,7 +20,6 @@ "rmr": "protPort": "tcp:4560" "maxSize": 2072 - "numWorkers": 1 "maxRetryOnFailure": 5 "threadType": 0 "subscription": diff --git a/examples/config/config-file.json b/examples/config/config-file.json index e9f412a..23c4c5f 100755 --- a/examples/config/config-file.json +++ b/examples/config/config-file.json @@ -15,7 +15,6 @@ "rmr": { "protPort": "tcp:4560", "maxSize": 65536, - "numWorkers": 1, "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_INDICATION"], "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ", "RIC_SGNB_ADDITION_REQ", "RIC_SGNB_ADDITION_ACK"] }, diff --git a/pkg/xapp/logger.go b/pkg/xapp/logger.go index cd1b03b..5009354 100755 --- a/pkg/xapp/logger.go +++ b/pkg/xapp/logger.go @@ -45,21 +45,33 @@ func (l *Log) SetMdc(key string, value string) { } func (l *Log) Error(pattern string, args ...interface{}) { + if l.logger.LevelGet() < mdclog.ERR { + return + } l.SetMdc("time", timeFormat()) l.logger.Error(pattern, args...) } func (l *Log) Warn(pattern string, args ...interface{}) { + if l.logger.LevelGet() < mdclog.WARN { + return + } l.SetMdc("time", timeFormat()) l.logger.Warning(pattern, args...) } func (l *Log) Info(pattern string, args ...interface{}) { + if l.logger.LevelGet() < mdclog.INFO { + return + } l.SetMdc("time", timeFormat()) l.logger.Info(pattern, args...) } func (l *Log) Debug(pattern string, args ...interface{}) { + if l.logger.LevelGet() < mdclog.DEBUG { + return + } l.SetMdc("time", timeFormat()) l.logger.Debug(pattern, args...) } diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index d310b3a..90bc64b 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -24,6 +24,8 @@ package xapp #include #include #include +#include +#include #include #include @@ -31,6 +33,31 @@ void write_bytes_array(unsigned char *dst, void *data, int len) { memcpy((void *)dst, (void *)data, len); } +int init_epoll(int rcv_fd) { + struct epoll_event epe; + int epoll_fd = epoll_create1( 0 ); + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; + epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe ); + return epoll_fd; +} + +void close_epoll(int epoll_fd) { + if(epoll_fd >= 0) { + close(epoll_fd); + } +} + +int wait_epoll(int epoll_fd,int rcv_fd) { + struct epoll_event events[1]; + if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) { + if( events[0].data.fd == rcv_fd ) { + return 1; + } + } + return 0; +} + #cgo CFLAGS: -I../ #cgo LDFLAGS: -lrmr_si */ @@ -39,7 +66,6 @@ import "C" import ( "fmt" "github.com/spf13/viper" - "strconv" "strings" "time" "unsafe" @@ -87,30 +113,25 @@ type RMRParams struct { status int } -func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient { +func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient { p := C.CString(protPort) m := C.int(maxSize) c := C.int(threadType) defer C.free(unsafe.Pointer(p)) - - //ctx := C.rmr_init(p, m, C.int(0)) - //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD) ctx := C.rmr_init(p, m, c) if ctx == nil { Logger.Error("rmrClient: Initializing RMR context failed, bailing out!") } - return &RMRClient{ - protPort: protPort, - numWorkers: numWorkers, - context: ctx, - consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc), + protPort: protPort, + context: ctx, + consumers: make([]MessageConsumer, 0), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc), } } func NewRMRClient() *RMRClient { - return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR") + return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR") } func (m *RMRClient) Start(c MessageConsumer) { @@ -120,7 +141,10 @@ func (m *RMRClient) Start(c MessageConsumer) { var counter int = 0 for { - if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 { + m.contextMux.Lock() + m.ready = int(C.rmr_ready(m.context)) + m.contextMux.Unlock() + if m.ready == 1 { Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter) break } @@ -130,39 +154,41 @@ func (m *RMRClient) Start(c MessageConsumer) { time.Sleep(1 * time.Second) counter++ } - m.wg.Add(m.numWorkers) + m.wg.Add(1) if m.readyCb != nil { go m.readyCb(m.readyCbParams) } - for w := 0; w < m.numWorkers; w++ { - go m.Worker("worker-"+strconv.Itoa(w), 0) - } - m.Wait() -} + go func() { + m.contextMux.Lock() + rfd := C.rmr_get_rcvfd(m.context) + m.contextMux.Unlock() + efd := C.init_epoll(rfd) -func (m *RMRClient) Worker(taskName string, msgSize int) { - Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort) - - defer m.wg.Done() - for { - rxBuffer := C.rmr_rcv_msg(m.context, nil) - if rxBuffer == nil { - m.LogMBufError("RecvMsg failed", rxBuffer) - m.UpdateStatCounter("ReceiveError") - continue + defer m.wg.Done() + for { + if int(C.wait_epoll(efd, rfd)) == 0 { + continue + } + m.contextMux.Lock() + rxBuffer := C.rmr_rcv_msg(m.context, nil) + m.contextMux.Unlock() + + if rxBuffer == nil { + m.LogMBufError("RecvMsg failed", rxBuffer) + m.UpdateStatCounter("ReceiveError") + continue + } + m.UpdateStatCounter("Received") + m.parseMessage(rxBuffer) } - m.UpdateStatCounter("Received") + }() - m.msgWg.Add(1) - go m.parseMessage(rxBuffer) - m.msgWg.Wait() - } + m.wg.Wait() } func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { - defer m.msgWg.Done() if len(m.consumers) == 0 { Logger.Info("rmrClient: No message handlers defined, message discarded!") return @@ -216,6 +242,8 @@ func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { } func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t { + m.contextMux.Lock() + defer m.contextMux.Unlock() buf := C.rmr_alloc_msg(m.context, C.int(size)) if buf == nil { Logger.Error("rmrClient: Allocating message buffer failed!") @@ -227,6 +255,8 @@ func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) { if mbuf == nil { return } + m.contextMux.Lock() + defer m.contextMux.Unlock() C.rmr_free_msg(mbuf) } @@ -296,6 +326,7 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { counterName string = "Transmitted" ) + m.contextMux.Lock() txBuffer.state = 0 if whid != 0 { currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) @@ -306,6 +337,7 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { currBuffer = C.rmr_send_msg(m.context, txBuffer) } } + m.contextMux.Unlock() if currBuffer == nil { m.UpdateStatCounter("TransmitError") @@ -319,6 +351,7 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { } for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ { + m.contextMux.Lock() if whid != 0 { currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) } else { @@ -328,6 +361,7 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { currBuffer = C.rmr_send_msg(m.context, txBuffer) } } + m.contextMux.Unlock() } if currBuffer.state != C.RMR_OK { @@ -353,7 +387,9 @@ func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) { txBuffer.state = 0 + m.contextMux.Lock() currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout)) + m.contextMux.Unlock() if currBuffer == nil { m.UpdateStatCounter("TransmitError") @@ -379,6 +415,8 @@ func (m *RMRClient) Openwh(target string) C.rmr_whid_t { } func (m *RMRClient) Wh_open(target string) C.rmr_whid_t { + m.contextMux.Lock() + defer m.contextMux.Unlock() endpoint := C.CString(target) return C.rmr_wh_open(m.context, endpoint) } @@ -388,6 +426,8 @@ func (m *RMRClient) Closewh(whid int) { } func (m *RMRClient) Wh_close(whid C.rmr_whid_t) { + m.contextMux.Lock() + defer m.contextMux.Unlock() C.rmr_wh_close(m.context, whid) } diff --git a/pkg/xapp/types.go b/pkg/xapp/types.go index 251dae5..f3c47eb 100755 --- a/pkg/xapp/types.go +++ b/pkg/xapp/types.go @@ -28,11 +28,10 @@ type RMRStatistics struct{} type RMRClient struct { protPort string - numWorkers int + contextMux sync.Mutex context unsafe.Pointer ready int wg sync.WaitGroup - msgWg sync.WaitGroup mux sync.Mutex stat map[string]Counter consumers []MessageConsumer diff --git a/pkg/xapp/xapp_test.go b/pkg/xapp/xapp_test.go index 37cca2a..abf9345 100755 --- a/pkg/xapp/xapp_test.go +++ b/pkg/xapp/xapp_test.go @@ -51,6 +51,7 @@ func TestMain(m *testing.M) { } func TestGetHealthCheckRetursServiceUnavailableError(t *testing.T) { + Logger.Info("CASE: TestGetHealthCheckRetursServiceUnavailableError") req, _ := http.NewRequest("GET", "/ric/v1/health/ready", nil) /*response :=*/ executeRequest(req) @@ -58,6 +59,7 @@ func TestGetHealthCheckRetursServiceUnavailableError(t *testing.T) { } func TestGetHealthCheckReturnsSuccess(t *testing.T) { + Logger.Info("CASE: TestGetHealthCheckReturnsSuccess") for Rmr.IsReady() == false { time.Sleep(time.Duration(2) * time.Second) } @@ -69,6 +71,7 @@ func TestGetHealthCheckReturnsSuccess(t *testing.T) { } func TestInjectQuerySinglePath(t *testing.T) { + Logger.Info("CASE: TestInjectQuerySinglePath") var handler = func(w http.ResponseWriter, r *http.Request) { } @@ -80,6 +83,7 @@ func TestInjectQuerySinglePath(t *testing.T) { } func TestInjectQueryMultiplePaths(t *testing.T) { + Logger.Info("CASE: TestInjectQueryMultiplePaths") var handler = func(w http.ResponseWriter, r *http.Request) { } @@ -91,6 +95,7 @@ func TestInjectQueryMultiplePaths(t *testing.T) { } func TestInjectQueryFailures(t *testing.T) { + Logger.Info("CASE: TestInjectQueryFailures") var handler = func(w http.ResponseWriter, r *http.Request) { } @@ -102,6 +107,7 @@ func TestInjectQueryFailures(t *testing.T) { } func TestMessagesReceivedSuccessfully(t *testing.T) { + Logger.Info("CASE: TestMessagesReceivedSuccessfully") time.Sleep(time.Duration(5) * time.Second) for i := 0; i < 100; i++ { params := &RMRParams{} @@ -144,6 +150,7 @@ func TestMessagesReceivedSuccessfully(t *testing.T) { } func TestMessagesReceivedSuccessfullyUsingWh(t *testing.T) { + Logger.Info("CASE: TestMessagesReceivedSuccessfullyUsingWh") time.Sleep(time.Duration(5) * time.Second) whid := Rmr.Openwh("localhost:4560") time.Sleep(time.Duration(1) * time.Second) @@ -190,6 +197,7 @@ func TestMessagesReceivedSuccessfullyUsingWh(t *testing.T) { } func TestMessagesReceivedSuccessfullyUsingWhCall(t *testing.T) { + Logger.Info("CASE: TestMessagesReceivedSuccessfullyUsingWhCall") time.Sleep(time.Duration(5) * time.Second) whid := Rmr.Openwh("localhost:4560") params := &RMRParams{} @@ -231,6 +239,7 @@ func TestMessagesReceivedSuccessfullyUsingWhCall(t *testing.T) { } func TestSubscribeChannels(t *testing.T) { + Logger.Info("CASE: TestSubscribeChannels") if !viper.GetBool("db.waitForSdl") { return } @@ -252,6 +261,7 @@ func TestSubscribeChannels(t *testing.T) { } func TestGetRicMessageSuccess(t *testing.T) { + Logger.Info("CASE: TestGetRicMessageSuccess") id, ok := Rmr.GetRicMessageId("RIC_SUB_REQ") if !ok || id != 12010 { t.Errorf("Error: GetRicMessageId failed: id=%d", id) @@ -264,6 +274,7 @@ func TestGetRicMessageSuccess(t *testing.T) { } func TestGetRicMessageFails(t *testing.T) { + Logger.Info("CASE: TestGetRicMessageFails") ok := Rmr.IsRetryError(&RMRParams{status: 0}) if ok { t.Errorf("Error: IsRetryError returned wrong value") @@ -286,6 +297,7 @@ func TestGetRicMessageFails(t *testing.T) { } func TestIsErrorFunctions(t *testing.T) { + Logger.Info("CASE: TestIsErrorFunctions") id, ok := Rmr.GetRicMessageId("RIC_SUB_REQ") if !ok || id != 12010 { t.Errorf("Error: GetRicMessageId failed: id=%d", id) @@ -298,6 +310,7 @@ func TestIsErrorFunctions(t *testing.T) { } func TestTeardown(t *testing.T) { + Logger.Info("CASE: TestTeardown") Sdl.Clear() } diff --git a/test/manifest/config-file-rx.yaml b/test/manifest/config-file-rx.yaml index 4b1c3d0..94427ba 100755 --- a/test/manifest/config-file-rx.yaml +++ b/test/manifest/config-file-rx.yaml @@ -20,7 +20,6 @@ "rmr": "protPort": "tcp:4560" "maxSize": 2072 - "numWorkers": 1 "db": "host": "localhost" "port": 6379 diff --git a/test/manifest/config-file-tx.yaml b/test/manifest/config-file-tx.yaml index c3ba458..952902e 100755 --- a/test/manifest/config-file-tx.yaml +++ b/test/manifest/config-file-tx.yaml @@ -20,7 +20,6 @@ "rmr": "protPort": "tcp:4591" "maxSize": 2072 - "numWorkers": 1 "db": "host": "localhost" "port": 6379 -- 2.16.6