2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
74 "github.com/spf13/viper"
77 var RMRCounterOpts = []CounterOpts{
78 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
79 {Name: "Received", Help: "The total number of received RMR messages"},
80 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
81 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
84 var RMRErrors = map[int]string{
85 C.RMR_OK: "state is good",
86 C.RMR_ERR_BADARG: "argument passed to function was unusable",
87 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
88 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
89 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
90 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
91 C.RMR_ERR_CALLFAILED: "unable to send call() message",
92 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
93 C.RMR_ERR_WHID: "wormhole id was invalid",
94 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
95 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
96 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
97 C.RMR_ERR_TIMEOUT: "message processing call timed out",
98 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
99 C.RMR_ERR_TRUNC: "received message likely truncated",
100 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
101 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
104 //-----------------------------------------------------------------------------
106 //-----------------------------------------------------------------------------
107 type RMRParams struct {
122 func (params *RMRParams) String() string {
124 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
128 //-----------------------------------------------------------------------------
130 //-----------------------------------------------------------------------------
131 type RMRClientParams struct {
136 func (params *RMRClientParams) String() string {
137 return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
138 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
139 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
142 //-----------------------------------------------------------------------------
144 //-----------------------------------------------------------------------------
145 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
146 p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
147 m := C.int(params.RmrData.MaxSize)
148 c := C.int(params.RmrData.ThreadType)
149 defer C.free(unsafe.Pointer(p))
150 ctx := C.rmr_init(p, m, c)
152 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
155 Logger.Info("new rmrClient with parameters: %s", params.String())
157 if params.RmrData.LowLatency {
158 C.rmr_set_low_latency(ctx)
160 if params.RmrData.FastAck {
166 consumers: make([]MessageConsumer, 0),
167 stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
168 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
172 func NewRMRClient() *RMRClient {
173 p := GetPortData("rmrdata")
174 if p.Port == 0 || viper.IsSet("rmr.protPort") {
175 // Old xApp descriptor used, fallback to rmr section
176 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
177 p.MaxSize = viper.GetInt("rmr.maxSize")
178 p.ThreadType = viper.GetInt("rmr.threadType")
179 p.LowLatency = viper.GetBool("rmr.lowLatency")
180 p.FastAck = viper.GetBool("rmr.fastAck")
181 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
184 return NewRMRClientWithParams(
191 func (m *RMRClient) Start(c MessageConsumer) {
193 m.consumers = append(m.consumers, c)
199 m.ready = int(C.rmr_ready(m.context))
200 m.contextMux.Unlock()
202 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
206 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
208 time.Sleep(1 * time.Second)
213 if m.readyCb != nil {
214 go m.readyCb(m.readyCbParams)
219 rfd := C.rmr_get_rcvfd(m.context)
220 m.contextMux.Unlock()
221 efd := C.init_epoll(rfd)
225 if int(C.wait_epoll(efd, rfd)) == 0 {
229 rxBuffer := C.rmr_rcv_msg(m.context, nil)
230 m.contextMux.Unlock()
233 m.LogMBufError("RecvMsg failed", rxBuffer)
234 m.UpdateStatCounter("ReceiveError")
237 m.UpdateStatCounter("Received")
238 m.parseMessage(rxBuffer)
245 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
246 if len(m.consumers) == 0 {
247 Logger.Info("rmrClient: No message handlers defined, message discarded!")
251 params := &RMRParams{}
252 params.Mbuf = rxBuffer
253 params.Mtype = int(rxBuffer.mtype)
254 params.SubId = int(rxBuffer.sub_id)
255 params.Meid = &RMRMeid{}
257 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
258 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
259 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
262 xidBuf := make([]byte, int(C.RMR_MAX_XID))
263 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
264 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
267 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
268 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
269 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
272 // Default case: a single consumer
273 if len(m.consumers) == 1 && m.consumers[0] != nil {
274 params.PayloadLen = int(rxBuffer.len)
275 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
276 err := m.consumers[0].Consume(params)
278 Logger.Warn("rmrClient: Consumer returned error: %v", err)
284 // Special case for multiple consumers
285 for _, c := range m.consumers {
286 cptr := unsafe.Pointer(rxBuffer.payload)
287 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
288 params.PayloadLen = int(rxBuffer.len)
289 params.Mtype = int(rxBuffer.mtype)
290 params.SubId = int(rxBuffer.sub_id)
292 err := c.Consume(params)
294 Logger.Warn("rmrClient: Consumer returned error: %v", err)
300 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
302 defer m.contextMux.Unlock()
303 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
305 Logger.Error("rmrClient: Allocating message buffer failed!")
310 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
312 defer m.contextMux.Unlock()
313 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
315 Logger.Error("rmrClient: Allocating message buffer failed!")
320 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
325 defer m.contextMux.Unlock()
329 func (m *RMRClient) SendMsg(params *RMRParams) bool {
330 return m.Send(params, false)
333 func (m *RMRClient) SendRts(params *RMRParams) bool {
334 return m.Send(params, true)
337 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
338 status := m.Send(params, isRts)
340 for ; i < int(to)*2 && status == false; i++ {
341 status = m.Send(params, isRts)
343 time.Sleep(500 * time.Millisecond)
347 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
348 if params.Mbuf != nil {
356 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
362 payLen := len(params.Payload)
363 if params.PayloadLen != 0 {
364 payLen = params.PayloadLen
367 txBuffer := params.Mbuf
371 txBuffer = m.ReAllocate(txBuffer, payLen)
373 txBuffer = m.Allocate(payLen)
379 txBuffer.mtype = C.int(params.Mtype)
380 txBuffer.sub_id = C.int(params.SubId)
381 txBuffer.len = C.int(payLen)
383 datap := C.CBytes(params.Payload)
386 if params.Meid != nil {
387 b := make([]byte, int(C.RMR_MAX_MEID))
388 copy(b, []byte(params.Meid.RanName))
389 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
392 xidLen := len(params.Xid)
393 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
394 b := make([]byte, int(C.RMR_MAX_XID))
395 copy(b, []byte(params.Xid))
396 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
399 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
404 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
406 txBuffer := m.CopyBuffer(params)
410 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
411 if params.status == int(C.RMR_OK) {
417 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
419 currBuffer *C.rmr_mbuf_t
425 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
428 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
430 currBuffer = C.rmr_send_msg(m.context, txBuffer)
433 m.contextMux.Unlock()
435 if currBuffer == nil {
436 m.UpdateStatCounter("TransmitError")
437 return m.LogMBufError("SendBuf failed", txBuffer)
440 // Just quick retry seems to help for K8s issue
441 if m.maxRetryOnFailure == 0 {
442 m.maxRetryOnFailure = 5
445 for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
448 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
451 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
453 currBuffer = C.rmr_send_msg(m.context, txBuffer)
456 m.contextMux.Unlock()
459 if currBuffer == nil {
460 m.UpdateStatCounter("TransmitError")
461 m.LogMBufError("SendBuf failed", currBuffer)
462 return int(C.RMR_ERR_INITFAILED)
465 if currBuffer.state != C.RMR_OK {
466 m.UpdateStatCounter("TransmitError")
467 m.LogMBufError("SendBuf failed", currBuffer)
469 m.UpdateStatCounter("Transmitted")
471 defer m.Free(currBuffer)
472 return int(currBuffer.state)
476 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
478 currBuffer *C.rmr_mbuf_t
479 counterName string = "Transmitted"
481 txBuffer := m.CopyBuffer(params)
483 return C.RMR_ERR_INITFAILED, ""
489 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
490 m.contextMux.Unlock()
492 if currBuffer == nil {
493 m.UpdateStatCounter("TransmitError")
494 return m.LogMBufError("SendBuf failed", txBuffer), ""
497 if currBuffer.state != C.RMR_OK {
498 counterName = "TransmitError"
499 m.LogMBufError("SendBuf failed", currBuffer)
502 m.UpdateStatCounter(counterName)
503 defer m.Free(currBuffer)
505 cptr := unsafe.Pointer(currBuffer.payload)
506 payload := C.GoBytes(cptr, C.int(currBuffer.len))
508 return int(currBuffer.state), string(payload)
511 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
512 return m.Wh_open(target)
515 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
517 defer m.contextMux.Unlock()
518 endpoint := C.CString(target)
519 return C.rmr_wh_open(m.context, endpoint)
522 func (m *RMRClient) Closewh(whid int) {
523 m.Wh_close(C.rmr_whid_t(whid))
526 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
528 defer m.contextMux.Unlock()
529 C.rmr_wh_close(m.context, whid)
532 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
533 if params.status == int(C.RMR_ERR_RETRY) {
539 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
540 if params.status == int(C.RMR_ERR_NOENDPT) {
546 func (m *RMRClient) UpdateStatCounter(name string) {
552 func (m *RMRClient) RegisterMetrics() {
553 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
556 func (m *RMRClient) Wait() {
560 func (m *RMRClient) IsReady() bool {
564 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
566 m.readyCbParams = params
569 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
570 id, ok := RICMessageTypes[name]
574 func (m *RMRClient) GetRicMessageName(id int) (s string) {
575 for k, v := range RICMessageTypes {
583 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
585 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
586 return int(mbuf.state)
588 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))