2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
33 void write_bytes_array(unsigned char *dst, void *data, int len) {
34 memcpy((void *)dst, (void *)data, len);
37 int init_epoll(int rcv_fd) {
38 struct epoll_event epe;
39 int epoll_fd = epoll_create1( 0 );
42 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
46 void close_epoll(int epoll_fd) {
52 int wait_epoll(int epoll_fd,int rcv_fd) {
53 struct epoll_event events[1];
54 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
55 if( events[0].data.fd == rcv_fd ) {
63 #cgo LDFLAGS: -lrmr_si
75 "github.com/spf13/viper"
78 var RMRCounterOpts = []CounterOpts{
79 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
80 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
81 {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"},
82 {Name: "Received", Help: "The total number of received RMR messages"},
83 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
84 {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"},
87 var RMRGaugeOpts = []CounterOpts{
88 {Name: "Enqueued", Help: "The total number of enqueued in RMR library"},
89 {Name: "Dropped", Help: "The total number of dropped in RMR library"},
92 var RMRErrors = map[int]string{
93 C.RMR_OK: "state is good",
94 C.RMR_ERR_BADARG: "argument passed to function was unusable",
95 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
96 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
97 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
98 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
99 C.RMR_ERR_CALLFAILED: "unable to send call() message",
100 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
101 C.RMR_ERR_WHID: "wormhole id was invalid",
102 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
103 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
104 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
105 C.RMR_ERR_TIMEOUT: "message processing call timed out",
106 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
107 C.RMR_ERR_TRUNC: "received message likely truncated",
108 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
109 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
112 // -----------------------------------------------------------------------------
114 // -----------------------------------------------------------------------------
115 type RMRParams struct {
130 func (params *RMRParams) String() string {
132 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
136 // -----------------------------------------------------------------------------
138 // -----------------------------------------------------------------------------
139 type RMRClientParams struct {
144 func (params *RMRClientParams) String() string {
145 return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
146 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
147 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
150 // -----------------------------------------------------------------------------
152 // -----------------------------------------------------------------------------
153 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
154 p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
155 m := C.int(params.RmrData.MaxSize)
156 c := C.int(params.RmrData.ThreadType)
157 defer C.free(unsafe.Pointer(p))
158 ctx := C.rmr_init(p, m, c)
160 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
163 Logger.Info("new rmrClient with parameters: %s", params.String())
165 if params.RmrData.LowLatency {
166 C.rmr_set_low_latency(ctx)
168 if params.RmrData.FastAck {
174 consumers: make([]MessageConsumer, 0),
175 statc: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
176 statg: Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc),
177 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
181 func NewRMRClient() *RMRClient {
182 p := GetPortData("rmrdata")
183 if p.Port == 0 || viper.IsSet("rmr.protPort") {
184 // Old xApp descriptor used, fallback to rmr section
185 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
186 p.MaxSize = viper.GetInt("rmr.maxSize")
187 p.ThreadType = viper.GetInt("rmr.threadType")
188 p.LowLatency = viper.GetBool("rmr.lowLatency")
189 p.FastAck = viper.GetBool("rmr.fastAck")
190 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
193 return NewRMRClientWithParams(
200 func (m *RMRClient) Start(c MessageConsumer) {
202 m.consumers = append(m.consumers, c)
208 m.ready = int(C.rmr_ready(m.context))
209 m.contextMux.Unlock()
211 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
215 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
217 time.Sleep(1 * time.Second)
221 if m.readyCb != nil {
222 go m.readyCb(m.readyCbParams)
228 rfd := C.rmr_get_rcvfd(m.context)
229 m.contextMux.Unlock()
230 efd := C.init_epoll(rfd)
235 if int(C.wait_epoll(efd, rfd)) == 0 {
239 rxBuffer := C.rmr_rcv_msg(m.context, nil)
240 m.contextMux.Unlock()
243 m.LogMBufError("RecvMsg failed", rxBuffer)
244 m.UpdateStatCounter("ReceiveError")
247 m.UpdateStatCounter("Received")
248 m.parseMessage(rxBuffer)
257 time.Sleep(1 * time.Second)
264 func (m *RMRClient) UpdateRmrStats() {
265 param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{}))))
267 C.rmr_get_rx_debug_info(m.context, param)
268 m.contextMux.Unlock()
270 m.statg["Enqueued"].Set(float64(param.enqueue))
271 m.statg["Dropped"].Set(float64(param.drop))
273 C.free(unsafe.Pointer(param))
276 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
277 if len(m.consumers) == 0 {
278 Logger.Info("rmrClient: No message handlers defined, message discarded!")
282 params := &RMRParams{}
283 params.Mbuf = rxBuffer
284 params.Mtype = int(rxBuffer.mtype)
285 params.SubId = int(rxBuffer.sub_id)
286 params.Meid = &RMRMeid{}
288 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
289 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
290 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
293 xidBuf := make([]byte, int(C.RMR_MAX_XID))
294 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
295 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
298 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
299 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
300 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
303 // Default case: a single consumer
304 if len(m.consumers) == 1 && m.consumers[0] != nil {
305 params.PayloadLen = int(rxBuffer.len)
306 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
307 err := m.consumers[0].Consume(params)
309 Logger.Warn("rmrClient: Consumer returned error: %v", err)
315 // Special case for multiple consumers
316 for _, c := range m.consumers {
317 cptr := unsafe.Pointer(rxBuffer.payload)
318 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
319 params.PayloadLen = int(rxBuffer.len)
320 params.Mtype = int(rxBuffer.mtype)
321 params.SubId = int(rxBuffer.sub_id)
323 err := c.Consume(params)
325 Logger.Warn("rmrClient: Consumer returned error: %v", err)
331 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
333 defer m.contextMux.Unlock()
334 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
336 Logger.Error("rmrClient: Allocating message buffer failed!")
341 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
343 defer m.contextMux.Unlock()
344 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
346 Logger.Error("rmrClient: Allocating message buffer failed!")
351 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
356 defer m.contextMux.Unlock()
360 func (m *RMRClient) SendMsg(params *RMRParams) bool {
361 return m.Send(params, false)
364 func (m *RMRClient) SendRts(params *RMRParams) bool {
365 return m.Send(params, true)
368 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
369 status := m.Send(params, isRts)
371 for ; i < int(to)*2 && status == false; i++ {
372 status = m.Send(params, isRts)
374 m.UpdateStatCounter("SendWithRetryRetry")
375 time.Sleep(500 * time.Millisecond)
379 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
380 if params.Mbuf != nil {
388 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
394 payLen := len(params.Payload)
395 if params.PayloadLen != 0 {
396 payLen = params.PayloadLen
399 txBuffer := params.Mbuf
403 txBuffer = m.ReAllocate(txBuffer, payLen)
405 txBuffer = m.Allocate(payLen)
411 txBuffer.mtype = C.int(params.Mtype)
412 txBuffer.sub_id = C.int(params.SubId)
413 txBuffer.len = C.int(payLen)
415 datap := C.CBytes(params.Payload)
418 if params.Meid != nil {
419 b := make([]byte, int(C.RMR_MAX_MEID))
420 copy(b, []byte(params.Meid.RanName))
421 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
424 xidLen := len(params.Xid)
425 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
426 b := make([]byte, int(C.RMR_MAX_XID))
427 copy(b, []byte(params.Xid))
428 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
431 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
436 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
438 txBuffer := m.CopyBuffer(params)
442 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
443 if params.status == int(C.RMR_OK) {
449 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
451 currBuffer *C.rmr_mbuf_t
457 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
460 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
462 currBuffer = C.rmr_send_msg(m.context, txBuffer)
465 m.contextMux.Unlock()
467 if currBuffer == nil {
468 m.UpdateStatCounter("TransmitError")
469 return m.LogMBufError("SendBuf failed", txBuffer)
472 // Just quick retry seems to help for K8s issue
473 if m.maxRetryOnFailure == 0 {
474 m.maxRetryOnFailure = 5
477 for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
480 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
483 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
485 currBuffer = C.rmr_send_msg(m.context, txBuffer)
488 m.contextMux.Unlock()
489 m.UpdateStatCounter("TransmitRetry")
492 if currBuffer == nil {
493 m.UpdateStatCounter("TransmitError")
494 m.LogMBufError("SendBuf failed", currBuffer)
495 return int(C.RMR_ERR_INITFAILED)
498 if currBuffer.state != C.RMR_OK {
499 m.UpdateStatCounter("TransmitError")
500 m.LogMBufError("SendBuf failed", currBuffer)
502 m.UpdateStatCounter("Transmitted")
504 defer m.Free(currBuffer)
505 return int(currBuffer.state)
509 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
511 currBuffer *C.rmr_mbuf_t
512 counterName string = "Transmitted"
514 txBuffer := m.CopyBuffer(params)
516 return C.RMR_ERR_INITFAILED, ""
522 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
523 m.contextMux.Unlock()
525 if currBuffer == nil {
526 m.UpdateStatCounter("TransmitError")
527 return m.LogMBufError("SendBuf failed", txBuffer), ""
530 if currBuffer.state != C.RMR_OK {
531 counterName = "TransmitError"
532 m.LogMBufError("SendBuf failed", currBuffer)
535 m.UpdateStatCounter(counterName)
536 defer m.Free(currBuffer)
538 cptr := unsafe.Pointer(currBuffer.payload)
539 payload := C.GoBytes(cptr, C.int(currBuffer.len))
541 return int(currBuffer.state), string(payload)
544 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
545 return m.Wh_open(target)
548 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
550 defer m.contextMux.Unlock()
551 endpoint := C.CString(target)
552 return C.rmr_wh_open(m.context, endpoint)
555 func (m *RMRClient) Closewh(whid int) {
556 m.Wh_close(C.rmr_whid_t(whid))
559 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
561 defer m.contextMux.Unlock()
562 C.rmr_wh_close(m.context, whid)
565 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
566 if params.status == int(C.RMR_ERR_RETRY) {
572 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
573 if params.status == int(C.RMR_ERR_NOENDPT) {
579 func (m *RMRClient) UpdateStatCounter(name string) {
585 func (m *RMRClient) RegisterMetrics() {
586 m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
587 m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR")
590 func (m *RMRClient) Wait() {
594 func (m *RMRClient) IsReady() bool {
598 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
600 m.readyCbParams = params
603 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
604 id, ok := RICMessageTypes[name]
608 func (m *RMRClient) GetRicMessageName(id int) (s string) {
609 for k, v := range RICMessageTypes {
617 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
619 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
620 return int(mbuf.state)
622 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))