2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
74 "github.com/spf13/viper"
77 var RMRCounterOpts = []CounterOpts{
78 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
79 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80 {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"},
81 {Name: "Received", Help: "The total number of received RMR messages"},
82 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
83 {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"},
86 var RMRGaugeOpts = []CounterOpts{
87 {Name: "Enqueued", Help: "The total number of enqueued in RMR library"},
88 {Name: "Dropped", Help: "The total number of dropped in RMR library"},
91 var RMRErrors = map[int]string{
92 C.RMR_OK: "state is good",
93 C.RMR_ERR_BADARG: "argument passed to function was unusable",
94 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
95 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
96 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
97 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
98 C.RMR_ERR_CALLFAILED: "unable to send call() message",
99 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
100 C.RMR_ERR_WHID: "wormhole id was invalid",
101 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
102 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
103 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
104 C.RMR_ERR_TIMEOUT: "message processing call timed out",
105 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
106 C.RMR_ERR_TRUNC: "received message likely truncated",
107 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
108 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
111 //-----------------------------------------------------------------------------
113 //-----------------------------------------------------------------------------
114 type RMRParams struct {
129 func (params *RMRParams) String() string {
131 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
135 //-----------------------------------------------------------------------------
137 //-----------------------------------------------------------------------------
138 type RMRClientParams struct {
143 func (params *RMRClientParams) String() string {
144 return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
145 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
146 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
149 //-----------------------------------------------------------------------------
151 //-----------------------------------------------------------------------------
152 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
153 p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
154 m := C.int(params.RmrData.MaxSize)
155 c := C.int(params.RmrData.ThreadType)
156 defer C.free(unsafe.Pointer(p))
157 ctx := C.rmr_init(p, m, c)
159 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
162 Logger.Info("new rmrClient with parameters: %s", params.String())
164 if params.RmrData.LowLatency {
165 C.rmr_set_low_latency(ctx)
167 if params.RmrData.FastAck {
173 consumers: make([]MessageConsumer, 0),
174 statc: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
175 statg: Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc),
176 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
180 func NewRMRClient() *RMRClient {
181 p := GetPortData("rmrdata")
182 if p.Port == 0 || viper.IsSet("rmr.protPort") {
183 // Old xApp descriptor used, fallback to rmr section
184 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
185 p.MaxSize = viper.GetInt("rmr.maxSize")
186 p.ThreadType = viper.GetInt("rmr.threadType")
187 p.LowLatency = viper.GetBool("rmr.lowLatency")
188 p.FastAck = viper.GetBool("rmr.fastAck")
189 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
192 return NewRMRClientWithParams(
199 func (m *RMRClient) Start(c MessageConsumer) {
201 m.consumers = append(m.consumers, c)
207 m.ready = int(C.rmr_ready(m.context))
208 m.contextMux.Unlock()
210 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
214 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
216 time.Sleep(1 * time.Second)
220 if m.readyCb != nil {
221 go m.readyCb(m.readyCbParams)
227 rfd := C.rmr_get_rcvfd(m.context)
228 m.contextMux.Unlock()
229 efd := C.init_epoll(rfd)
234 if int(C.wait_epoll(efd, rfd)) == 0 {
238 rxBuffer := C.rmr_rcv_msg(m.context, nil)
239 m.contextMux.Unlock()
242 m.LogMBufError("RecvMsg failed", rxBuffer)
243 m.UpdateStatCounter("ReceiveError")
246 m.UpdateStatCounter("Received")
247 m.parseMessage(rxBuffer)
256 time.Sleep(1 * time.Second)
263 func (m *RMRClient) UpdateRmrStats() {
264 param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{}))))
266 C.rmr_get_rx_debug_info(m.context, param)
267 m.contextMux.Unlock()
269 m.statg["Enqueued"].Set(float64(param.enqueue))
270 m.statg["Dropped"].Set(float64(param.drop))
272 C.free(unsafe.Pointer(param))
275 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
276 if len(m.consumers) == 0 {
277 Logger.Info("rmrClient: No message handlers defined, message discarded!")
281 params := &RMRParams{}
282 params.Mbuf = rxBuffer
283 params.Mtype = int(rxBuffer.mtype)
284 params.SubId = int(rxBuffer.sub_id)
285 params.Meid = &RMRMeid{}
287 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
288 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
289 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
292 xidBuf := make([]byte, int(C.RMR_MAX_XID))
293 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
294 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
297 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
298 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
299 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
302 // Default case: a single consumer
303 if len(m.consumers) == 1 && m.consumers[0] != nil {
304 params.PayloadLen = int(rxBuffer.len)
305 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
306 err := m.consumers[0].Consume(params)
308 Logger.Warn("rmrClient: Consumer returned error: %v", err)
314 // Special case for multiple consumers
315 for _, c := range m.consumers {
316 cptr := unsafe.Pointer(rxBuffer.payload)
317 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
318 params.PayloadLen = int(rxBuffer.len)
319 params.Mtype = int(rxBuffer.mtype)
320 params.SubId = int(rxBuffer.sub_id)
322 err := c.Consume(params)
324 Logger.Warn("rmrClient: Consumer returned error: %v", err)
330 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
332 defer m.contextMux.Unlock()
333 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
335 Logger.Error("rmrClient: Allocating message buffer failed!")
340 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
342 defer m.contextMux.Unlock()
343 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
345 Logger.Error("rmrClient: Allocating message buffer failed!")
350 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
355 defer m.contextMux.Unlock()
359 func (m *RMRClient) SendMsg(params *RMRParams) bool {
360 return m.Send(params, false)
363 func (m *RMRClient) SendRts(params *RMRParams) bool {
364 return m.Send(params, true)
367 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
368 status := m.Send(params, isRts)
370 for ; i < int(to)*2 && status == false; i++ {
371 status = m.Send(params, isRts)
373 m.UpdateStatCounter("SendWithRetryRetry")
374 time.Sleep(500 * time.Millisecond)
378 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
379 if params.Mbuf != nil {
387 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
393 payLen := len(params.Payload)
394 if params.PayloadLen != 0 {
395 payLen = params.PayloadLen
398 txBuffer := params.Mbuf
402 txBuffer = m.ReAllocate(txBuffer, payLen)
404 txBuffer = m.Allocate(payLen)
410 txBuffer.mtype = C.int(params.Mtype)
411 txBuffer.sub_id = C.int(params.SubId)
412 txBuffer.len = C.int(payLen)
414 datap := C.CBytes(params.Payload)
417 if params.Meid != nil {
418 b := make([]byte, int(C.RMR_MAX_MEID))
419 copy(b, []byte(params.Meid.RanName))
420 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
423 xidLen := len(params.Xid)
424 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
425 b := make([]byte, int(C.RMR_MAX_XID))
426 copy(b, []byte(params.Xid))
427 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
430 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
435 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
437 txBuffer := m.CopyBuffer(params)
441 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
442 if params.status == int(C.RMR_OK) {
448 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
450 currBuffer *C.rmr_mbuf_t
456 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
459 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
461 currBuffer = C.rmr_send_msg(m.context, txBuffer)
464 m.contextMux.Unlock()
466 if currBuffer == nil {
467 m.UpdateStatCounter("TransmitError")
468 return m.LogMBufError("SendBuf failed", txBuffer)
471 // Just quick retry seems to help for K8s issue
472 if m.maxRetryOnFailure == 0 {
473 m.maxRetryOnFailure = 5
476 for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
479 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
482 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
484 currBuffer = C.rmr_send_msg(m.context, txBuffer)
487 m.contextMux.Unlock()
488 m.UpdateStatCounter("TransmitRetry")
491 if currBuffer == nil {
492 m.UpdateStatCounter("TransmitError")
493 m.LogMBufError("SendBuf failed", currBuffer)
494 return int(C.RMR_ERR_INITFAILED)
497 if currBuffer.state != C.RMR_OK {
498 m.UpdateStatCounter("TransmitError")
499 m.LogMBufError("SendBuf failed", currBuffer)
501 m.UpdateStatCounter("Transmitted")
503 defer m.Free(currBuffer)
504 return int(currBuffer.state)
508 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
510 currBuffer *C.rmr_mbuf_t
511 counterName string = "Transmitted"
513 txBuffer := m.CopyBuffer(params)
515 return C.RMR_ERR_INITFAILED, ""
521 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
522 m.contextMux.Unlock()
524 if currBuffer == nil {
525 m.UpdateStatCounter("TransmitError")
526 return m.LogMBufError("SendBuf failed", txBuffer), ""
529 if currBuffer.state != C.RMR_OK {
530 counterName = "TransmitError"
531 m.LogMBufError("SendBuf failed", currBuffer)
534 m.UpdateStatCounter(counterName)
535 defer m.Free(currBuffer)
537 cptr := unsafe.Pointer(currBuffer.payload)
538 payload := C.GoBytes(cptr, C.int(currBuffer.len))
540 return int(currBuffer.state), string(payload)
543 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
544 return m.Wh_open(target)
547 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
549 defer m.contextMux.Unlock()
550 endpoint := C.CString(target)
551 return C.rmr_wh_open(m.context, endpoint)
554 func (m *RMRClient) Closewh(whid int) {
555 m.Wh_close(C.rmr_whid_t(whid))
558 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
560 defer m.contextMux.Unlock()
561 C.rmr_wh_close(m.context, whid)
564 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
565 if params.status == int(C.RMR_ERR_RETRY) {
571 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
572 if params.status == int(C.RMR_ERR_NOENDPT) {
578 func (m *RMRClient) UpdateStatCounter(name string) {
584 func (m *RMRClient) RegisterMetrics() {
585 m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
586 m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR")
589 func (m *RMRClient) Wait() {
593 func (m *RMRClient) IsReady() bool {
597 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
599 m.readyCbParams = params
602 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
603 id, ok := RICMessageTypes[name]
607 func (m *RMRClient) GetRicMessageName(id int) (s string) {
608 for k, v := range RICMessageTypes {
616 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
618 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
619 return int(mbuf.state)
621 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))