2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
70 "github.com/spf13/viper"
76 var RMRCounterOpts = []CounterOpts{
77 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78 {Name: "Received", Help: "The total number of received RMR messages"},
79 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
83 var RMRErrors = map[int]string{
84 C.RMR_OK: "state is good",
85 C.RMR_ERR_BADARG: "argument passed to function was unusable",
86 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
87 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
88 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
89 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90 C.RMR_ERR_CALLFAILED: "unable to send call() message",
91 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
92 C.RMR_ERR_WHID: "wormhole id was invalid",
93 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
94 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
96 C.RMR_ERR_TIMEOUT: "message processing call timed out",
97 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
98 C.RMR_ERR_TRUNC: "received message likely truncated",
99 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
103 //-----------------------------------------------------------------------------
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
121 func (params *RMRParams) String() string {
123 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
127 //-----------------------------------------------------------------------------
129 //-----------------------------------------------------------------------------
130 type RMRClientParams struct {
139 func (params *RMRClientParams) String() string {
140 return fmt.Sprintf("ProtPort=%s MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
141 params.ProtPort, params.MaxSize, params.ThreadType, params.StatDesc, params.LowLatency, params.FastAck)
144 //-----------------------------------------------------------------------------
146 //-----------------------------------------------------------------------------
147 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
148 p := C.CString(params.ProtPort)
149 m := C.int(params.MaxSize)
150 c := C.int(params.ThreadType)
151 defer C.free(unsafe.Pointer(p))
152 ctx := C.rmr_init(p, m, c)
154 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
157 Logger.Info("new rmrClient with parameters: %s", params.String())
159 if params.LowLatency {
160 C.rmr_set_low_latency(ctx)
167 protPort: params.ProtPort,
169 consumers: make([]MessageConsumer, 0),
170 stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
174 func NewRMRClient() *RMRClient {
175 return NewRMRClientWithParams(
177 ProtPort: viper.GetString("rmr.protPort"),
178 MaxSize: viper.GetInt("rmr.maxSize"),
179 ThreadType: viper.GetInt("rmr.threadType"),
181 LowLatency: viper.GetBool("rmr.lowLatency"),
182 FastAck: viper.GetBool("rmr.fastAck"),
186 func (m *RMRClient) Start(c MessageConsumer) {
188 m.consumers = append(m.consumers, c)
194 m.ready = int(C.rmr_ready(m.context))
195 m.contextMux.Unlock()
197 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
201 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
203 time.Sleep(1 * time.Second)
208 if m.readyCb != nil {
209 go m.readyCb(m.readyCbParams)
214 rfd := C.rmr_get_rcvfd(m.context)
215 m.contextMux.Unlock()
216 efd := C.init_epoll(rfd)
220 if int(C.wait_epoll(efd, rfd)) == 0 {
224 rxBuffer := C.rmr_rcv_msg(m.context, nil)
225 m.contextMux.Unlock()
228 m.LogMBufError("RecvMsg failed", rxBuffer)
229 m.UpdateStatCounter("ReceiveError")
232 m.UpdateStatCounter("Received")
233 m.parseMessage(rxBuffer)
240 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
241 if len(m.consumers) == 0 {
242 Logger.Info("rmrClient: No message handlers defined, message discarded!")
246 params := &RMRParams{}
247 params.Mbuf = rxBuffer
248 params.Mtype = int(rxBuffer.mtype)
249 params.SubId = int(rxBuffer.sub_id)
250 params.Meid = &RMRMeid{}
252 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
253 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
254 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
257 xidBuf := make([]byte, int(C.RMR_MAX_XID))
258 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
259 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
262 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
263 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
264 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
267 // Default case: a single consumer
268 if len(m.consumers) == 1 && m.consumers[0] != nil {
269 params.PayloadLen = int(rxBuffer.len)
270 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
271 err := m.consumers[0].Consume(params)
273 Logger.Warn("rmrClient: Consumer returned error: %v", err)
278 // Special case for multiple consumers
279 for _, c := range m.consumers {
280 cptr := unsafe.Pointer(rxBuffer.payload)
281 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
282 params.PayloadLen = int(rxBuffer.len)
283 params.Mtype = int(rxBuffer.mtype)
284 params.SubId = int(rxBuffer.sub_id)
286 err := c.Consume(params)
288 Logger.Warn("rmrClient: Consumer returned error: %v", err)
293 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
295 defer m.contextMux.Unlock()
296 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
298 Logger.Error("rmrClient: Allocating message buffer failed!")
303 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
305 defer m.contextMux.Unlock()
306 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
308 Logger.Error("rmrClient: Allocating message buffer failed!")
313 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
318 defer m.contextMux.Unlock()
322 func (m *RMRClient) SendMsg(params *RMRParams) bool {
323 return m.Send(params, false)
326 func (m *RMRClient) SendRts(params *RMRParams) bool {
327 return m.Send(params, true)
330 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
331 status := m.Send(params, isRts)
333 for ; i < int(to)*2 && status == false; i++ {
334 status = m.Send(params, isRts)
336 time.Sleep(500 * time.Millisecond)
340 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
341 if params.Mbuf != nil {
349 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
355 payLen := len(params.Payload)
356 if params.PayloadLen != 0 {
357 payLen = params.PayloadLen
360 txBuffer := params.Mbuf
364 txBuffer = m.ReAllocate(txBuffer, payLen)
366 txBuffer = m.Allocate(payLen)
372 txBuffer.mtype = C.int(params.Mtype)
373 txBuffer.sub_id = C.int(params.SubId)
374 txBuffer.len = C.int(payLen)
376 datap := C.CBytes(params.Payload)
379 if params.Meid != nil {
380 b := make([]byte, int(C.RMR_MAX_MEID))
381 copy(b, []byte(params.Meid.RanName))
382 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
385 xidLen := len(params.Xid)
386 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
387 b := make([]byte, int(C.RMR_MAX_XID))
388 copy(b, []byte(params.Xid))
389 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
392 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
397 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
399 txBuffer := m.CopyBuffer(params)
403 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
404 if params.status == int(C.RMR_OK) {
410 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
412 currBuffer *C.rmr_mbuf_t
418 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
421 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
423 currBuffer = C.rmr_send_msg(m.context, txBuffer)
426 m.contextMux.Unlock()
428 if currBuffer == nil {
429 m.UpdateStatCounter("TransmitError")
430 return m.LogMBufError("SendBuf failed", txBuffer)
433 // Just quick retry seems to help for K8s issue
434 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
435 if maxRetryOnFailure == 0 {
436 maxRetryOnFailure = 5
439 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
442 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
445 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
447 currBuffer = C.rmr_send_msg(m.context, txBuffer)
450 m.contextMux.Unlock()
453 if currBuffer == nil {
454 m.UpdateStatCounter("TransmitError")
455 m.LogMBufError("SendBuf failed", currBuffer)
456 return int(C.RMR_ERR_INITFAILED)
459 if currBuffer.state != C.RMR_OK {
460 m.UpdateStatCounter("TransmitError")
461 m.LogMBufError("SendBuf failed", currBuffer)
463 m.UpdateStatCounter("Transmitted")
465 defer m.Free(currBuffer)
466 return int(currBuffer.state)
470 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
472 currBuffer *C.rmr_mbuf_t
473 counterName string = "Transmitted"
475 txBuffer := m.CopyBuffer(params)
477 return C.RMR_ERR_INITFAILED, ""
483 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
484 m.contextMux.Unlock()
486 if currBuffer == nil {
487 m.UpdateStatCounter("TransmitError")
488 return m.LogMBufError("SendBuf failed", txBuffer), ""
491 if currBuffer.state != C.RMR_OK {
492 counterName = "TransmitError"
493 m.LogMBufError("SendBuf failed", currBuffer)
496 m.UpdateStatCounter(counterName)
497 defer m.Free(currBuffer)
499 cptr := unsafe.Pointer(currBuffer.payload)
500 payload := C.GoBytes(cptr, C.int(currBuffer.len))
502 return int(currBuffer.state), string(payload)
505 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
506 return m.Wh_open(target)
509 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
511 defer m.contextMux.Unlock()
512 endpoint := C.CString(target)
513 return C.rmr_wh_open(m.context, endpoint)
516 func (m *RMRClient) Closewh(whid int) {
517 m.Wh_close(C.rmr_whid_t(whid))
520 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
522 defer m.contextMux.Unlock()
523 C.rmr_wh_close(m.context, whid)
526 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
527 if params.status == int(C.RMR_ERR_RETRY) {
533 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
534 if params.status == int(C.RMR_ERR_NOENDPT) {
540 func (m *RMRClient) UpdateStatCounter(name string) {
546 func (m *RMRClient) RegisterMetrics() {
547 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
550 func (m *RMRClient) Wait() {
554 func (m *RMRClient) IsReady() bool {
558 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
560 m.readyCbParams = params
563 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
564 id, ok := RICMessageTypes[name]
568 func (m *RMRClient) GetRicMessageName(id int) (s string) {
569 for k, v := range RICMessageTypes {
577 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
579 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
580 return int(mbuf.state)
582 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
587 func (m *RMRClient) GetStat() (r RMRStatistics) {