2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
33 void write_bytes_array(unsigned char *dst, void *data, int len) {
34 memcpy((void *)dst, (void *)data, len);
37 int init_epoll(int rcv_fd) {
38 struct epoll_event epe;
39 int epoll_fd = epoll_create1( 0 );
42 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
46 void close_epoll(int epoll_fd) {
52 int wait_epoll(int epoll_fd,int rcv_fd) {
53 struct epoll_event events[1];
54 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
55 if( events[0].data.fd == rcv_fd ) {
63 #cgo LDFLAGS: -lrmr_si
75 "github.com/spf13/viper"
78 var RMRCounterOpts = []CounterOpts{
79 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
80 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
81 {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"},
82 {Name: "Received", Help: "The total number of received RMR messages"},
83 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
84 {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"},
87 var RMRGaugeOpts = []CounterOpts{
88 {Name: "Enqueued", Help: "The total number of enqueued in RMR library"},
89 {Name: "Dropped", Help: "The total number of dropped in RMR library"},
92 var RMRErrors = map[int]string{
93 C.RMR_OK: "state is good",
94 C.RMR_ERR_BADARG: "argument passed to function was unusable",
95 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
96 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
97 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
98 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
99 C.RMR_ERR_CALLFAILED: "unable to send call() message",
100 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
101 C.RMR_ERR_WHID: "wormhole id was invalid",
102 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
103 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
104 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
105 C.RMR_ERR_TIMEOUT: "message processing call timed out",
106 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
107 C.RMR_ERR_TRUNC: "received message likely truncated",
108 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
109 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
112 // -----------------------------------------------------------------------------
114 // -----------------------------------------------------------------------------
115 type RMRParams struct {
130 func (params *RMRParams) String() string {
132 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
136 // -----------------------------------------------------------------------------
138 // -----------------------------------------------------------------------------
139 type RMRClientParams struct {
144 func (params *RMRClientParams) String() string {
145 return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
146 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
147 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
150 // -----------------------------------------------------------------------------
152 // -----------------------------------------------------------------------------
153 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
154 p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
155 m := C.int(params.RmrData.MaxSize)
156 c := C.int(params.RmrData.ThreadType)
157 defer C.free(unsafe.Pointer(p))
158 ctx := C.rmr_init(p, m, c)
160 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
163 Logger.Info("new rmrClient with parameters: %s", params.String())
165 if params.RmrData.LowLatency {
166 C.rmr_set_low_latency(ctx)
168 if params.RmrData.FastAck {
174 consumers: make([]MessageConsumer, 0),
175 statc: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
176 statg: Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc),
177 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
181 func NewRMRClient() *RMRClient {
182 p := GetPortData("rmrdata")
183 if p.Port == 0 || viper.IsSet("rmr.protPort") {
184 // Old xApp descriptor used, fallback to rmr section
185 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
186 p.MaxSize = viper.GetInt("rmr.maxSize")
187 p.ThreadType = viper.GetInt("rmr.threadType")
188 p.LowLatency = viper.GetBool("rmr.lowLatency")
189 p.FastAck = viper.GetBool("rmr.fastAck")
190 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
193 return NewRMRClientWithParams(
200 func (m *RMRClient) Start(c MessageConsumer) {
202 m.consumers = append(m.consumers, c)
208 m.ready = int(C.rmr_ready(m.context))
209 m.contextMux.Unlock()
211 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
215 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
217 time.Sleep(1 * time.Second)
221 if m.readyCb != nil {
222 go m.readyCb(m.readyCbParams)
228 rfd := C.rmr_get_rcvfd(m.context)
229 m.contextMux.Unlock()
230 efd := C.init_epoll(rfd)
235 if int(C.wait_epoll(efd, rfd)) == 0 {
239 rxBuffer := C.rmr_rcv_msg(m.context, nil)
240 m.contextMux.Unlock()
243 m.LogMBufError("RecvMsg failed", rxBuffer)
244 m.UpdateStatCounter("ReceiveError")
247 m.UpdateStatCounter("Received")
248 m.parseMessage(rxBuffer)
257 time.Sleep(1 * time.Second)
264 func (m *RMRClient) UpdateRmrStats() {
265 param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{}))))
267 C.rmr_get_rx_debug_info(m.context, param)
268 m.contextMux.Unlock()
270 m.statg["Enqueued"].Set(float64(param.enqueue))
271 m.statg["Dropped"].Set(float64(param.drop))
273 C.free(unsafe.Pointer(param))
276 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
277 if len(m.consumers) == 0 {
278 Logger.Info("rmrClient: No message handlers defined, message discarded!")
282 params := &RMRParams{}
283 params.Mbuf = rxBuffer
284 params.Mtype = int(rxBuffer.mtype)
285 params.SubId = int(rxBuffer.sub_id)
286 params.Meid = &RMRMeid{}
288 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
289 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
290 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
293 xidBuf := make([]byte, int(C.RMR_MAX_XID))
294 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
295 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
298 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
299 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
300 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
303 // Default case: a single consumer
304 if len(m.consumers) == 1 && m.consumers[0] != nil {
305 params.PayloadLen = int(rxBuffer.len)
306 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
307 err := m.consumers[0].Consume(params)
309 Logger.Warn("rmrClient: Consumer returned error: %v", err)
315 // Special case for multiple consumers
316 for _, c := range m.consumers {
317 cptr := unsafe.Pointer(rxBuffer.payload)
318 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
319 params.PayloadLen = int(rxBuffer.len)
320 params.Mtype = int(rxBuffer.mtype)
321 params.SubId = int(rxBuffer.sub_id)
323 err := c.Consume(params)
325 Logger.Warn("rmrClient: Consumer returned error: %v", err)
331 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
333 defer m.contextMux.Unlock()
334 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
336 Logger.Error("rmrClient: Allocating message buffer failed!")
341 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
343 defer m.contextMux.Unlock()
344 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
346 Logger.Error("rmrClient: Allocating message buffer failed!")
351 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
356 defer m.contextMux.Unlock()
360 func (m *RMRClient) SendMsg(params *RMRParams) bool {
361 return m.Send(params, false)
364 func (m *RMRClient) SendRts(params *RMRParams) bool {
365 return m.Send(params, true)
368 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
369 status := m.Send(params, isRts)
371 for ; i < int(to)*2 && status == false; i++ {
372 status = m.Send(params, isRts)
374 m.UpdateStatCounter("SendWithRetryRetry")
375 time.Sleep(500 * time.Millisecond)
379 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
380 if params.Mbuf != nil {
388 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
394 payLen := len(params.Payload)
395 if params.PayloadLen != 0 {
396 payLen = params.PayloadLen
399 txBuffer := params.Mbuf
403 txBuffer = m.ReAllocate(txBuffer, payLen)
405 txBuffer = m.Allocate(payLen)
411 txBuffer.mtype = C.int(params.Mtype)
412 txBuffer.sub_id = C.int(params.SubId)
413 txBuffer.len = C.int(payLen)
415 datap := C.CBytes(params.Payload)
418 if params.Meid != nil {
419 b := make([]byte, int(C.RMR_MAX_MEID))
420 copy(b, []byte(params.Meid.RanName))
421 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
424 xidLen := len(params.Xid)
425 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
426 b := make([]byte, int(C.RMR_MAX_XID))
427 copy(b, []byte(params.Xid))
428 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
431 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
436 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
438 txBuffer := m.CopyBuffer(params)
442 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
443 if params.status == int(C.RMR_OK) {
449 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
452 // Just quick retry seems to help for K8s issue
453 if m.maxRetryOnFailure == 0 {
454 m.maxRetryOnFailure = 5
457 for j := 0; j <= m.maxRetryOnFailure; j++ {
460 txBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
463 txBuffer = C.rmr_rts_msg(m.context, txBuffer)
465 txBuffer = C.rmr_send_msg(m.context, txBuffer)
468 m.contextMux.Unlock()
469 if j+1 <= m.maxRetryOnFailure && txBuffer != nil && txBuffer.state == C.RMR_ERR_RETRY {
470 m.UpdateStatCounter("TransmitRetry")
477 m.UpdateStatCounter("TransmitError")
478 m.LogMBufError("SendBuf failed", txBuffer)
479 return int(C.RMR_ERR_INITFAILED)
482 if txBuffer.state != C.RMR_OK {
483 m.UpdateStatCounter("TransmitError")
484 m.LogMBufError("SendBuf failed", txBuffer)
486 m.UpdateStatCounter("Transmitted")
488 defer m.Free(txBuffer)
489 return int(txBuffer.state)
493 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
495 currBuffer *C.rmr_mbuf_t
496 counterName string = "Transmitted"
498 txBuffer := m.CopyBuffer(params)
500 return C.RMR_ERR_INITFAILED, ""
506 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
507 m.contextMux.Unlock()
509 if currBuffer == nil {
510 m.UpdateStatCounter("TransmitError")
511 return m.LogMBufError("SendBuf failed", txBuffer), ""
514 if currBuffer.state != C.RMR_OK {
515 counterName = "TransmitError"
516 m.LogMBufError("SendBuf failed", currBuffer)
519 m.UpdateStatCounter(counterName)
520 defer m.Free(currBuffer)
522 cptr := unsafe.Pointer(currBuffer.payload)
523 payload := C.GoBytes(cptr, C.int(currBuffer.len))
525 return int(currBuffer.state), string(payload)
528 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
529 return m.Wh_open(target)
532 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
534 defer m.contextMux.Unlock()
535 endpoint := C.CString(target)
536 return C.rmr_wh_open(m.context, endpoint)
539 func (m *RMRClient) Closewh(whid int) {
540 m.Wh_close(C.rmr_whid_t(whid))
543 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
545 defer m.contextMux.Unlock()
546 C.rmr_wh_close(m.context, whid)
549 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
550 if params.status == int(C.RMR_ERR_RETRY) {
556 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
557 if params.status == int(C.RMR_ERR_NOENDPT) {
563 func (m *RMRClient) UpdateStatCounter(name string) {
569 func (m *RMRClient) RegisterMetrics() {
570 m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
571 m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR")
574 func (m *RMRClient) Wait() {
578 func (m *RMRClient) IsReady() bool {
582 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
584 m.readyCbParams = params
587 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
588 id, ok := RICMessageTypes[name]
592 func (m *RMRClient) GetRicMessageName(id int) (s string) {
593 for k, v := range RICMessageTypes {
601 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
603 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
604 return int(mbuf.state)
606 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))