2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
70 "github.com/spf13/viper"
76 var RMRCounterOpts = []CounterOpts{
77 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78 {Name: "Received", Help: "The total number of received RMR messages"},
79 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
83 var RMRErrors = map[int]string{
84 C.RMR_OK: "state is good",
85 C.RMR_ERR_BADARG: "argument passed to function was unusable",
86 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
87 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
88 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
89 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90 C.RMR_ERR_CALLFAILED: "unable to send call() message",
91 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
92 C.RMR_ERR_WHID: "wormhole id was invalid",
93 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
94 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
96 C.RMR_ERR_TIMEOUT: "message processing call timed out",
97 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
98 C.RMR_ERR_TRUNC: "received message likely truncated",
99 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
103 //-----------------------------------------------------------------------------
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
121 func (params *RMRParams) String() string {
123 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
127 //-----------------------------------------------------------------------------
129 //-----------------------------------------------------------------------------
130 type RMRClientParams struct {
139 func (params *RMRClientParams) String() string {
140 return fmt.Sprintf("ProtPort=%s MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
141 params.ProtPort, params.MaxSize, params.ThreadType, params.StatDesc, params.LowLatency, params.FastAck)
144 //-----------------------------------------------------------------------------
146 //-----------------------------------------------------------------------------
147 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
148 p := C.CString(params.ProtPort)
149 m := C.int(params.MaxSize)
150 c := C.int(params.ThreadType)
151 defer C.free(unsafe.Pointer(p))
152 ctx := C.rmr_init(p, m, c)
154 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
157 Logger.Info("new rmrClient with parameters: %s", params.String())
159 if params.LowLatency {
160 C.rmr_set_low_latency(ctx)
167 protPort: params.ProtPort,
169 consumers: make([]MessageConsumer, 0),
170 stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
174 func NewRMRClient() *RMRClient {
175 return NewRMRClientWithParams(
177 ProtPort: viper.GetString("rmr.protPort"),
178 MaxSize: viper.GetInt("rmr.maxSize"),
179 ThreadType: viper.GetInt("rmr.threadType"),
181 LowLatency: viper.GetBool("rmr.lowLatency"),
182 FastAck: viper.GetBool("rmr.fastAck"),
186 func (m *RMRClient) Start(c MessageConsumer) {
188 m.consumers = append(m.consumers, c)
194 m.ready = int(C.rmr_ready(m.context))
195 m.contextMux.Unlock()
197 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
201 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
203 time.Sleep(1 * time.Second)
208 if m.readyCb != nil {
209 go m.readyCb(m.readyCbParams)
214 rfd := C.rmr_get_rcvfd(m.context)
215 m.contextMux.Unlock()
216 efd := C.init_epoll(rfd)
220 if int(C.wait_epoll(efd, rfd)) == 0 {
224 rxBuffer := C.rmr_rcv_msg(m.context, nil)
225 m.contextMux.Unlock()
228 m.LogMBufError("RecvMsg failed", rxBuffer)
229 m.UpdateStatCounter("ReceiveError")
232 m.UpdateStatCounter("Received")
233 m.parseMessage(rxBuffer)
240 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
241 if len(m.consumers) == 0 {
242 Logger.Info("rmrClient: No message handlers defined, message discarded!")
246 params := &RMRParams{}
247 params.Mbuf = rxBuffer
248 params.Mtype = int(rxBuffer.mtype)
249 params.SubId = int(rxBuffer.sub_id)
250 params.Meid = &RMRMeid{}
252 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
253 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
254 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
257 xidBuf := make([]byte, int(C.RMR_MAX_XID))
258 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
259 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
262 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
263 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
264 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
267 // Default case: a single consumer
268 if len(m.consumers) == 1 && m.consumers[0] != nil {
269 params.PayloadLen = int(rxBuffer.len)
270 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
271 err := m.consumers[0].Consume(params)
273 Logger.Warn("rmrClient: Consumer returned error: %v", err)
278 // Special case for multiple consumers
279 for _, c := range m.consumers {
280 cptr := unsafe.Pointer(rxBuffer.payload)
281 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
282 params.PayloadLen = int(rxBuffer.len)
283 params.Mtype = int(rxBuffer.mtype)
284 params.SubId = int(rxBuffer.sub_id)
286 err := c.Consume(params)
288 Logger.Warn("rmrClient: Consumer returned error: %v", err)
293 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
295 defer m.contextMux.Unlock()
296 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
298 Logger.Error("rmrClient: Allocating message buffer failed!")
303 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
305 defer m.contextMux.Unlock()
306 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
308 Logger.Error("rmrClient: Allocating message buffer failed!")
313 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
318 defer m.contextMux.Unlock()
322 func (m *RMRClient) SendMsg(params *RMRParams) bool {
323 return m.Send(params, false)
326 func (m *RMRClient) SendRts(params *RMRParams) bool {
327 return m.Send(params, true)
330 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
331 status := m.Send(params, isRts)
333 for ; i < int(to)*2 && status == false; i++ {
334 status = m.Send(params, isRts)
336 time.Sleep(500 * time.Millisecond)
340 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
341 if params.Mbuf != nil {
349 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
355 payLen := len(params.Payload)
356 if params.PayloadLen != 0 {
357 payLen = params.PayloadLen
360 txBuffer := params.Mbuf
364 txBuffer = m.ReAllocate(txBuffer, payLen)
366 txBuffer = m.Allocate(payLen)
372 txBuffer.mtype = C.int(params.Mtype)
373 txBuffer.sub_id = C.int(params.SubId)
374 txBuffer.len = C.int(payLen)
376 datap := C.CBytes(params.Payload)
379 if params.Meid != nil {
380 b := make([]byte, int(C.RMR_MAX_MEID))
381 copy(b, []byte(params.Meid.RanName))
382 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
385 xidLen := len(params.Xid)
386 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
387 b := make([]byte, int(C.RMR_MAX_XID))
388 copy(b, []byte(params.Xid))
389 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
392 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
397 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
399 txBuffer := m.CopyBuffer(params)
403 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
404 if params.status == int(C.RMR_OK) {
410 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
412 currBuffer *C.rmr_mbuf_t
413 counterName string = "Transmitted"
419 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
422 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
424 currBuffer = C.rmr_send_msg(m.context, txBuffer)
427 m.contextMux.Unlock()
429 if currBuffer == nil {
430 m.UpdateStatCounter("TransmitError")
431 return m.LogMBufError("SendBuf failed", txBuffer)
434 // Just quick retry seems to help for K8s issue
435 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
436 if maxRetryOnFailure == 0 {
437 maxRetryOnFailure = 5
440 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
443 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
446 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
448 currBuffer = C.rmr_send_msg(m.context, txBuffer)
451 m.contextMux.Unlock()
454 if currBuffer.state != C.RMR_OK {
455 counterName = "TransmitError"
456 m.LogMBufError("SendBuf failed", currBuffer)
459 m.UpdateStatCounter(counterName)
460 defer m.Free(currBuffer)
462 return int(currBuffer.state)
465 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
467 currBuffer *C.rmr_mbuf_t
468 counterName string = "Transmitted"
470 txBuffer := m.CopyBuffer(params)
472 return C.RMR_ERR_INITFAILED, ""
478 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
479 m.contextMux.Unlock()
481 if currBuffer == nil {
482 m.UpdateStatCounter("TransmitError")
483 return m.LogMBufError("SendBuf failed", txBuffer), ""
486 if currBuffer.state != C.RMR_OK {
487 counterName = "TransmitError"
488 m.LogMBufError("SendBuf failed", currBuffer)
491 m.UpdateStatCounter(counterName)
492 defer m.Free(currBuffer)
494 cptr := unsafe.Pointer(currBuffer.payload)
495 payload := C.GoBytes(cptr, C.int(currBuffer.len))
497 return int(currBuffer.state), string(payload)
500 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
501 return m.Wh_open(target)
504 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
506 defer m.contextMux.Unlock()
507 endpoint := C.CString(target)
508 return C.rmr_wh_open(m.context, endpoint)
511 func (m *RMRClient) Closewh(whid int) {
512 m.Wh_close(C.rmr_whid_t(whid))
515 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
517 defer m.contextMux.Unlock()
518 C.rmr_wh_close(m.context, whid)
521 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
522 if params.status == int(C.RMR_ERR_RETRY) {
528 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
529 if params.status == int(C.RMR_ERR_NOENDPT) {
535 func (m *RMRClient) UpdateStatCounter(name string) {
541 func (m *RMRClient) RegisterMetrics() {
542 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
545 func (m *RMRClient) Wait() {
549 func (m *RMRClient) IsReady() bool {
553 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
555 m.readyCbParams = params
558 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
559 id, ok := RICMessageTypes[name]
563 func (m *RMRClient) GetRicMessageName(id int) (s string) {
564 for k, v := range RICMessageTypes {
572 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
574 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
575 return int(mbuf.state)
577 Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
582 func (m *RMRClient) GetStat() (r RMRStatistics) {