2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
75 var RMRCounterOpts = []CounterOpts{
76 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
77 {Name: "Received", Help: "The total number of received RMR messages"},
78 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
79 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
82 var RMRErrors = map[int]string{
83 C.RMR_OK: "state is good",
84 C.RMR_ERR_BADARG: "argument passed to function was unusable",
85 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
86 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
87 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
88 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
89 C.RMR_ERR_CALLFAILED: "unable to send call() message",
90 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
91 C.RMR_ERR_WHID: "wormhole id was invalid",
92 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
93 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
94 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
95 C.RMR_ERR_TIMEOUT: "message processing call timed out",
96 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
97 C.RMR_ERR_TRUNC: "received message likely truncated",
98 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
99 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
102 //-----------------------------------------------------------------------------
104 //-----------------------------------------------------------------------------
105 type RMRParams struct {
120 func (params *RMRParams) String() string {
122 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
126 //-----------------------------------------------------------------------------
128 //-----------------------------------------------------------------------------
129 type RMRClientParams struct {
134 func (params *RMRClientParams) String() string {
135 return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
136 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc, params.RmrData.LowLatency, params.RmrData.FastAck)
139 //-----------------------------------------------------------------------------
141 //-----------------------------------------------------------------------------
142 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
143 p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
144 m := C.int(params.RmrData.MaxSize)
145 c := C.int(params.RmrData.ThreadType)
146 defer C.free(unsafe.Pointer(p))
147 ctx := C.rmr_init(p, m, c)
149 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
152 Logger.Info("new rmrClient with parameters: %s", params.String())
154 if params.RmrData.LowLatency {
155 C.rmr_set_low_latency(ctx)
157 if params.RmrData.FastAck {
163 consumers: make([]MessageConsumer, 0),
164 stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
165 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
169 func NewRMRClient() *RMRClient {
170 p := GetPortData("rmr-data")
171 return NewRMRClientWithParams(
178 func (m *RMRClient) Start(c MessageConsumer) {
180 m.consumers = append(m.consumers, c)
186 m.ready = int(C.rmr_ready(m.context))
187 m.contextMux.Unlock()
189 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
193 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
195 time.Sleep(1 * time.Second)
200 if m.readyCb != nil {
201 go m.readyCb(m.readyCbParams)
206 rfd := C.rmr_get_rcvfd(m.context)
207 m.contextMux.Unlock()
208 efd := C.init_epoll(rfd)
212 if int(C.wait_epoll(efd, rfd)) == 0 {
216 rxBuffer := C.rmr_rcv_msg(m.context, nil)
217 m.contextMux.Unlock()
220 m.LogMBufError("RecvMsg failed", rxBuffer)
221 m.UpdateStatCounter("ReceiveError")
224 m.UpdateStatCounter("Received")
225 m.parseMessage(rxBuffer)
232 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
233 if len(m.consumers) == 0 {
234 Logger.Info("rmrClient: No message handlers defined, message discarded!")
238 params := &RMRParams{}
239 params.Mbuf = rxBuffer
240 params.Mtype = int(rxBuffer.mtype)
241 params.SubId = int(rxBuffer.sub_id)
242 params.Meid = &RMRMeid{}
244 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
245 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
246 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
249 xidBuf := make([]byte, int(C.RMR_MAX_XID))
250 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
251 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
254 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
255 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
256 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
259 // Default case: a single consumer
260 if len(m.consumers) == 1 && m.consumers[0] != nil {
261 params.PayloadLen = int(rxBuffer.len)
262 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
263 err := m.consumers[0].Consume(params)
265 Logger.Warn("rmrClient: Consumer returned error: %v", err)
270 // Special case for multiple consumers
271 for _, c := range m.consumers {
272 cptr := unsafe.Pointer(rxBuffer.payload)
273 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
274 params.PayloadLen = int(rxBuffer.len)
275 params.Mtype = int(rxBuffer.mtype)
276 params.SubId = int(rxBuffer.sub_id)
278 err := c.Consume(params)
280 Logger.Warn("rmrClient: Consumer returned error: %v", err)
285 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
287 defer m.contextMux.Unlock()
288 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
290 Logger.Error("rmrClient: Allocating message buffer failed!")
295 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
297 defer m.contextMux.Unlock()
298 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
300 Logger.Error("rmrClient: Allocating message buffer failed!")
305 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
310 defer m.contextMux.Unlock()
314 func (m *RMRClient) SendMsg(params *RMRParams) bool {
315 return m.Send(params, false)
318 func (m *RMRClient) SendRts(params *RMRParams) bool {
319 return m.Send(params, true)
322 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
323 status := m.Send(params, isRts)
325 for ; i < int(to)*2 && status == false; i++ {
326 status = m.Send(params, isRts)
328 time.Sleep(500 * time.Millisecond)
332 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
333 if params.Mbuf != nil {
341 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
347 payLen := len(params.Payload)
348 if params.PayloadLen != 0 {
349 payLen = params.PayloadLen
352 txBuffer := params.Mbuf
356 txBuffer = m.ReAllocate(txBuffer, payLen)
358 txBuffer = m.Allocate(payLen)
364 txBuffer.mtype = C.int(params.Mtype)
365 txBuffer.sub_id = C.int(params.SubId)
366 txBuffer.len = C.int(payLen)
368 datap := C.CBytes(params.Payload)
371 if params.Meid != nil {
372 b := make([]byte, int(C.RMR_MAX_MEID))
373 copy(b, []byte(params.Meid.RanName))
374 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
377 xidLen := len(params.Xid)
378 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
379 b := make([]byte, int(C.RMR_MAX_XID))
380 copy(b, []byte(params.Xid))
381 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
384 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
389 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
391 txBuffer := m.CopyBuffer(params)
395 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
396 if params.status == int(C.RMR_OK) {
402 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
404 currBuffer *C.rmr_mbuf_t
410 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
413 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
415 currBuffer = C.rmr_send_msg(m.context, txBuffer)
418 m.contextMux.Unlock()
420 if currBuffer == nil {
421 m.UpdateStatCounter("TransmitError")
422 return m.LogMBufError("SendBuf failed", txBuffer)
425 // Just quick retry seems to help for K8s issue
426 if m.maxRetryOnFailure == 0 {
427 m.maxRetryOnFailure = 5
430 for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
433 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
436 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
438 currBuffer = C.rmr_send_msg(m.context, txBuffer)
441 m.contextMux.Unlock()
444 if currBuffer == nil {
445 m.UpdateStatCounter("TransmitError")
446 m.LogMBufError("SendBuf failed", currBuffer)
447 return int(C.RMR_ERR_INITFAILED)
450 if currBuffer.state != C.RMR_OK {
451 m.UpdateStatCounter("TransmitError")
452 m.LogMBufError("SendBuf failed", currBuffer)
454 m.UpdateStatCounter("Transmitted")
456 defer m.Free(currBuffer)
457 return int(currBuffer.state)
461 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
463 currBuffer *C.rmr_mbuf_t
464 counterName string = "Transmitted"
466 txBuffer := m.CopyBuffer(params)
468 return C.RMR_ERR_INITFAILED, ""
474 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
475 m.contextMux.Unlock()
477 if currBuffer == nil {
478 m.UpdateStatCounter("TransmitError")
479 return m.LogMBufError("SendBuf failed", txBuffer), ""
482 if currBuffer.state != C.RMR_OK {
483 counterName = "TransmitError"
484 m.LogMBufError("SendBuf failed", currBuffer)
487 m.UpdateStatCounter(counterName)
488 defer m.Free(currBuffer)
490 cptr := unsafe.Pointer(currBuffer.payload)
491 payload := C.GoBytes(cptr, C.int(currBuffer.len))
493 return int(currBuffer.state), string(payload)
496 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
497 return m.Wh_open(target)
500 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
502 defer m.contextMux.Unlock()
503 endpoint := C.CString(target)
504 return C.rmr_wh_open(m.context, endpoint)
507 func (m *RMRClient) Closewh(whid int) {
508 m.Wh_close(C.rmr_whid_t(whid))
511 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
513 defer m.contextMux.Unlock()
514 C.rmr_wh_close(m.context, whid)
517 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
518 if params.status == int(C.RMR_ERR_RETRY) {
524 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
525 if params.status == int(C.RMR_ERR_NOENDPT) {
531 func (m *RMRClient) UpdateStatCounter(name string) {
537 func (m *RMRClient) RegisterMetrics() {
538 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
541 func (m *RMRClient) Wait() {
545 func (m *RMRClient) IsReady() bool {
549 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
551 m.readyCbParams = params
554 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
555 id, ok := RICMessageTypes[name]
559 func (m *RMRClient) GetRicMessageName(id int) (s string) {
560 for k, v := range RICMessageTypes {
568 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
570 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
571 return int(mbuf.state)
573 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))