2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
70 "github.com/spf13/viper"
76 var RMRCounterOpts = []CounterOpts{
77 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78 {Name: "Received", Help: "The total number of received RMR messages"},
79 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
83 var RMRErrors = map[int]string{
84 C.RMR_OK: "state is good",
85 C.RMR_ERR_BADARG: "argument passed to function was unusable",
86 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
87 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
88 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
89 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90 C.RMR_ERR_CALLFAILED: "unable to send call() message",
91 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
92 C.RMR_ERR_WHID: "wormhole id was invalid",
93 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
94 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
96 C.RMR_ERR_TIMEOUT: "message processing call timed out",
97 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
98 C.RMR_ERR_TRUNC: "received message likely truncated",
99 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
103 //-----------------------------------------------------------------------------
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
121 func (params *RMRParams) String() string {
123 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
127 //-----------------------------------------------------------------------------
129 //-----------------------------------------------------------------------------
130 type RMRClientParams struct {
135 func (params *RMRClientParams) String() string {
136 return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
137 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
138 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
141 //-----------------------------------------------------------------------------
143 //-----------------------------------------------------------------------------
144 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
145 p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
146 m := C.int(params.RmrData.MaxSize)
147 c := C.int(params.RmrData.ThreadType)
148 defer C.free(unsafe.Pointer(p))
149 ctx := C.rmr_init(p, m, c)
151 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
154 Logger.Info("new rmrClient with parameters: %s", params.String())
156 if params.RmrData.LowLatency {
157 C.rmr_set_low_latency(ctx)
159 if params.RmrData.FastAck {
165 consumers: make([]MessageConsumer, 0),
166 stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
167 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
171 func NewRMRClient() *RMRClient {
172 p := GetPortData("rmr-data")
173 if p.Port == 0 || viper.IsSet("rmr.protPort") {
174 // Old xApp descriptor used, fallback to rmr section
175 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
176 p.MaxSize = viper.GetInt("rmr.maxSize")
177 p.ThreadType = viper.GetInt("rmr.threadType")
178 p.LowLatency = viper.GetBool("rmr.lowLatency")
179 p.FastAck = viper.GetBool("rmr.fastAck")
180 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
183 return NewRMRClientWithParams(
190 func (m *RMRClient) Start(c MessageConsumer) {
192 m.consumers = append(m.consumers, c)
198 m.ready = int(C.rmr_ready(m.context))
199 m.contextMux.Unlock()
201 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
205 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
207 time.Sleep(1 * time.Second)
212 if m.readyCb != nil {
213 go m.readyCb(m.readyCbParams)
218 rfd := C.rmr_get_rcvfd(m.context)
219 m.contextMux.Unlock()
220 efd := C.init_epoll(rfd)
224 if int(C.wait_epoll(efd, rfd)) == 0 {
228 rxBuffer := C.rmr_rcv_msg(m.context, nil)
229 m.contextMux.Unlock()
232 m.LogMBufError("RecvMsg failed", rxBuffer)
233 m.UpdateStatCounter("ReceiveError")
236 m.UpdateStatCounter("Received")
237 m.parseMessage(rxBuffer)
244 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
245 if len(m.consumers) == 0 {
246 Logger.Info("rmrClient: No message handlers defined, message discarded!")
250 params := &RMRParams{}
251 params.Mbuf = rxBuffer
252 params.Mtype = int(rxBuffer.mtype)
253 params.SubId = int(rxBuffer.sub_id)
254 params.Meid = &RMRMeid{}
256 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
257 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
258 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
261 xidBuf := make([]byte, int(C.RMR_MAX_XID))
262 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
263 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
266 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
267 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
268 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
271 // Default case: a single consumer
272 if len(m.consumers) == 1 && m.consumers[0] != nil {
273 params.PayloadLen = int(rxBuffer.len)
274 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
275 err := m.consumers[0].Consume(params)
277 Logger.Warn("rmrClient: Consumer returned error: %v", err)
282 // Special case for multiple consumers
283 for _, c := range m.consumers {
284 cptr := unsafe.Pointer(rxBuffer.payload)
285 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
286 params.PayloadLen = int(rxBuffer.len)
287 params.Mtype = int(rxBuffer.mtype)
288 params.SubId = int(rxBuffer.sub_id)
290 err := c.Consume(params)
292 Logger.Warn("rmrClient: Consumer returned error: %v", err)
297 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
299 defer m.contextMux.Unlock()
300 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
302 Logger.Error("rmrClient: Allocating message buffer failed!")
307 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
309 defer m.contextMux.Unlock()
310 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
312 Logger.Error("rmrClient: Allocating message buffer failed!")
317 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
322 defer m.contextMux.Unlock()
326 func (m *RMRClient) SendMsg(params *RMRParams) bool {
327 return m.Send(params, false)
330 func (m *RMRClient) SendRts(params *RMRParams) bool {
331 return m.Send(params, true)
334 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
335 status := m.Send(params, isRts)
337 for ; i < int(to)*2 && status == false; i++ {
338 status = m.Send(params, isRts)
340 time.Sleep(500 * time.Millisecond)
344 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
345 if params.Mbuf != nil {
353 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
359 payLen := len(params.Payload)
360 if params.PayloadLen != 0 {
361 payLen = params.PayloadLen
364 txBuffer := params.Mbuf
368 txBuffer = m.ReAllocate(txBuffer, payLen)
370 txBuffer = m.Allocate(payLen)
376 txBuffer.mtype = C.int(params.Mtype)
377 txBuffer.sub_id = C.int(params.SubId)
378 txBuffer.len = C.int(payLen)
380 datap := C.CBytes(params.Payload)
383 if params.Meid != nil {
384 b := make([]byte, int(C.RMR_MAX_MEID))
385 copy(b, []byte(params.Meid.RanName))
386 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
389 xidLen := len(params.Xid)
390 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
391 b := make([]byte, int(C.RMR_MAX_XID))
392 copy(b, []byte(params.Xid))
393 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
396 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
401 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
403 txBuffer := m.CopyBuffer(params)
407 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
408 if params.status == int(C.RMR_OK) {
414 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
416 currBuffer *C.rmr_mbuf_t
422 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
425 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
427 currBuffer = C.rmr_send_msg(m.context, txBuffer)
430 m.contextMux.Unlock()
432 if currBuffer == nil {
433 m.UpdateStatCounter("TransmitError")
434 return m.LogMBufError("SendBuf failed", txBuffer)
437 // Just quick retry seems to help for K8s issue
438 if m.maxRetryOnFailure == 0 {
439 m.maxRetryOnFailure = 5
442 for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
445 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
448 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
450 currBuffer = C.rmr_send_msg(m.context, txBuffer)
453 m.contextMux.Unlock()
456 if currBuffer == nil {
457 m.UpdateStatCounter("TransmitError")
458 m.LogMBufError("SendBuf failed", currBuffer)
459 return int(C.RMR_ERR_INITFAILED)
462 if currBuffer.state != C.RMR_OK {
463 m.UpdateStatCounter("TransmitError")
464 m.LogMBufError("SendBuf failed", currBuffer)
466 m.UpdateStatCounter("Transmitted")
468 defer m.Free(currBuffer)
469 return int(currBuffer.state)
473 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
475 currBuffer *C.rmr_mbuf_t
476 counterName string = "Transmitted"
478 txBuffer := m.CopyBuffer(params)
480 return C.RMR_ERR_INITFAILED, ""
486 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
487 m.contextMux.Unlock()
489 if currBuffer == nil {
490 m.UpdateStatCounter("TransmitError")
491 return m.LogMBufError("SendBuf failed", txBuffer), ""
494 if currBuffer.state != C.RMR_OK {
495 counterName = "TransmitError"
496 m.LogMBufError("SendBuf failed", currBuffer)
499 m.UpdateStatCounter(counterName)
500 defer m.Free(currBuffer)
502 cptr := unsafe.Pointer(currBuffer.payload)
503 payload := C.GoBytes(cptr, C.int(currBuffer.len))
505 return int(currBuffer.state), string(payload)
508 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
509 return m.Wh_open(target)
512 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
514 defer m.contextMux.Unlock()
515 endpoint := C.CString(target)
516 return C.rmr_wh_open(m.context, endpoint)
519 func (m *RMRClient) Closewh(whid int) {
520 m.Wh_close(C.rmr_whid_t(whid))
523 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
525 defer m.contextMux.Unlock()
526 C.rmr_wh_close(m.context, whid)
529 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
530 if params.status == int(C.RMR_ERR_RETRY) {
536 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
537 if params.status == int(C.RMR_ERR_NOENDPT) {
543 func (m *RMRClient) UpdateStatCounter(name string) {
549 func (m *RMRClient) RegisterMetrics() {
550 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
553 func (m *RMRClient) Wait() {
557 func (m *RMRClient) IsReady() bool {
561 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
563 m.readyCbParams = params
566 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
567 id, ok := RICMessageTypes[name]
571 func (m *RMRClient) GetRicMessageName(id int) (s string) {
572 for k, v := range RICMessageTypes {
580 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
582 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
583 return int(mbuf.state)
585 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))