2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
75 var RMRCounterOpts = []CounterOpts{
76 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
77 {Name: "Received", Help: "The total number of received RMR messages"},
78 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
79 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
82 var RMRErrors = map[int]string{
83 C.RMR_OK: "state is good",
84 C.RMR_ERR_BADARG: "argument passed to function was unusable",
85 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
86 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
87 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
88 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
89 C.RMR_ERR_CALLFAILED: "unable to send call() message",
90 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
91 C.RMR_ERR_WHID: "wormhole id was invalid",
92 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
93 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
94 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
95 C.RMR_ERR_TIMEOUT: "message processing call timed out",
96 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
97 C.RMR_ERR_TRUNC: "received message likely truncated",
98 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
99 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
102 //-----------------------------------------------------------------------------
104 //-----------------------------------------------------------------------------
105 type RMRParams struct {
120 func (params *RMRParams) String() string {
122 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
126 //-----------------------------------------------------------------------------
128 //-----------------------------------------------------------------------------
129 type RMRClientParams struct {
134 func (params *RMRClientParams) String() string {
135 return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
136 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
137 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
140 //-----------------------------------------------------------------------------
142 //-----------------------------------------------------------------------------
143 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
144 p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
145 m := C.int(params.RmrData.MaxSize)
146 c := C.int(params.RmrData.ThreadType)
147 defer C.free(unsafe.Pointer(p))
148 ctx := C.rmr_init(p, m, c)
150 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
153 Logger.Info("new rmrClient with parameters: %s", params.String())
155 if params.RmrData.LowLatency {
156 C.rmr_set_low_latency(ctx)
158 if params.RmrData.FastAck {
164 consumers: make([]MessageConsumer, 0),
165 stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
166 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
170 func NewRMRClient() *RMRClient {
171 p := GetPortData("rmr-data")
172 return NewRMRClientWithParams(
179 func (m *RMRClient) Start(c MessageConsumer) {
181 m.consumers = append(m.consumers, c)
187 m.ready = int(C.rmr_ready(m.context))
188 m.contextMux.Unlock()
190 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
194 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
196 time.Sleep(1 * time.Second)
201 if m.readyCb != nil {
202 go m.readyCb(m.readyCbParams)
207 rfd := C.rmr_get_rcvfd(m.context)
208 m.contextMux.Unlock()
209 efd := C.init_epoll(rfd)
213 if int(C.wait_epoll(efd, rfd)) == 0 {
217 rxBuffer := C.rmr_rcv_msg(m.context, nil)
218 m.contextMux.Unlock()
221 m.LogMBufError("RecvMsg failed", rxBuffer)
222 m.UpdateStatCounter("ReceiveError")
225 m.UpdateStatCounter("Received")
226 m.parseMessage(rxBuffer)
233 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
234 if len(m.consumers) == 0 {
235 Logger.Info("rmrClient: No message handlers defined, message discarded!")
239 params := &RMRParams{}
240 params.Mbuf = rxBuffer
241 params.Mtype = int(rxBuffer.mtype)
242 params.SubId = int(rxBuffer.sub_id)
243 params.Meid = &RMRMeid{}
245 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
246 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
247 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
250 xidBuf := make([]byte, int(C.RMR_MAX_XID))
251 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
252 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
255 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
256 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
257 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
260 // Default case: a single consumer
261 if len(m.consumers) == 1 && m.consumers[0] != nil {
262 params.PayloadLen = int(rxBuffer.len)
263 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
264 err := m.consumers[0].Consume(params)
266 Logger.Warn("rmrClient: Consumer returned error: %v", err)
271 // Special case for multiple consumers
272 for _, c := range m.consumers {
273 cptr := unsafe.Pointer(rxBuffer.payload)
274 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
275 params.PayloadLen = int(rxBuffer.len)
276 params.Mtype = int(rxBuffer.mtype)
277 params.SubId = int(rxBuffer.sub_id)
279 err := c.Consume(params)
281 Logger.Warn("rmrClient: Consumer returned error: %v", err)
286 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
288 defer m.contextMux.Unlock()
289 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
291 Logger.Error("rmrClient: Allocating message buffer failed!")
296 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
298 defer m.contextMux.Unlock()
299 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
301 Logger.Error("rmrClient: Allocating message buffer failed!")
306 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
311 defer m.contextMux.Unlock()
315 func (m *RMRClient) SendMsg(params *RMRParams) bool {
316 return m.Send(params, false)
319 func (m *RMRClient) SendRts(params *RMRParams) bool {
320 return m.Send(params, true)
323 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
324 status := m.Send(params, isRts)
326 for ; i < int(to)*2 && status == false; i++ {
327 status = m.Send(params, isRts)
329 time.Sleep(500 * time.Millisecond)
333 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
334 if params.Mbuf != nil {
342 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
348 payLen := len(params.Payload)
349 if params.PayloadLen != 0 {
350 payLen = params.PayloadLen
353 txBuffer := params.Mbuf
357 txBuffer = m.ReAllocate(txBuffer, payLen)
359 txBuffer = m.Allocate(payLen)
365 txBuffer.mtype = C.int(params.Mtype)
366 txBuffer.sub_id = C.int(params.SubId)
367 txBuffer.len = C.int(payLen)
369 datap := C.CBytes(params.Payload)
372 if params.Meid != nil {
373 b := make([]byte, int(C.RMR_MAX_MEID))
374 copy(b, []byte(params.Meid.RanName))
375 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
378 xidLen := len(params.Xid)
379 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
380 b := make([]byte, int(C.RMR_MAX_XID))
381 copy(b, []byte(params.Xid))
382 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
385 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
390 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
392 txBuffer := m.CopyBuffer(params)
396 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
397 if params.status == int(C.RMR_OK) {
403 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
405 currBuffer *C.rmr_mbuf_t
411 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
414 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
416 currBuffer = C.rmr_send_msg(m.context, txBuffer)
419 m.contextMux.Unlock()
421 if currBuffer == nil {
422 m.UpdateStatCounter("TransmitError")
423 return m.LogMBufError("SendBuf failed", txBuffer)
426 // Just quick retry seems to help for K8s issue
427 if m.maxRetryOnFailure == 0 {
428 m.maxRetryOnFailure = 5
431 for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
434 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
437 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
439 currBuffer = C.rmr_send_msg(m.context, txBuffer)
442 m.contextMux.Unlock()
445 if currBuffer == nil {
446 m.UpdateStatCounter("TransmitError")
447 m.LogMBufError("SendBuf failed", currBuffer)
448 return int(C.RMR_ERR_INITFAILED)
451 if currBuffer.state != C.RMR_OK {
452 m.UpdateStatCounter("TransmitError")
453 m.LogMBufError("SendBuf failed", currBuffer)
455 m.UpdateStatCounter("Transmitted")
457 defer m.Free(currBuffer)
458 return int(currBuffer.state)
462 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
464 currBuffer *C.rmr_mbuf_t
465 counterName string = "Transmitted"
467 txBuffer := m.CopyBuffer(params)
469 return C.RMR_ERR_INITFAILED, ""
475 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
476 m.contextMux.Unlock()
478 if currBuffer == nil {
479 m.UpdateStatCounter("TransmitError")
480 return m.LogMBufError("SendBuf failed", txBuffer), ""
483 if currBuffer.state != C.RMR_OK {
484 counterName = "TransmitError"
485 m.LogMBufError("SendBuf failed", currBuffer)
488 m.UpdateStatCounter(counterName)
489 defer m.Free(currBuffer)
491 cptr := unsafe.Pointer(currBuffer.payload)
492 payload := C.GoBytes(cptr, C.int(currBuffer.len))
494 return int(currBuffer.state), string(payload)
497 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
498 return m.Wh_open(target)
501 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
503 defer m.contextMux.Unlock()
504 endpoint := C.CString(target)
505 return C.rmr_wh_open(m.context, endpoint)
508 func (m *RMRClient) Closewh(whid int) {
509 m.Wh_close(C.rmr_whid_t(whid))
512 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
514 defer m.contextMux.Unlock()
515 C.rmr_wh_close(m.context, whid)
518 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
519 if params.status == int(C.RMR_ERR_RETRY) {
525 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
526 if params.status == int(C.RMR_ERR_NOENDPT) {
532 func (m *RMRClient) UpdateStatCounter(name string) {
538 func (m *RMRClient) RegisterMetrics() {
539 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
542 func (m *RMRClient) Wait() {
546 func (m *RMRClient) IsReady() bool {
550 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
552 m.readyCbParams = params
555 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
556 id, ok := RICMessageTypes[name]
560 func (m *RMRClient) GetRicMessageName(id int) (s string) {
561 for k, v := range RICMessageTypes {
569 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
571 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
572 return int(mbuf.state)
574 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))