2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
70 "github.com/spf13/viper"
76 var RMRCounterOpts = []CounterOpts{
77 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78 {Name: "Received", Help: "The total number of received RMR messages"},
79 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
83 var RMRErrors = map[int]string{
84 C.RMR_OK: "state is good",
85 C.RMR_ERR_BADARG: "argument passed to function was unusable",
86 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
87 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
88 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
89 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90 C.RMR_ERR_CALLFAILED: "unable to send call() message",
91 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
92 C.RMR_ERR_WHID: "wormhole id was invalid",
93 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
94 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
96 C.RMR_ERR_TIMEOUT: "message processing call timed out",
97 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
98 C.RMR_ERR_TRUNC: "received message likely truncated",
99 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
103 //-----------------------------------------------------------------------------
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
121 func (params *RMRParams) String() string {
123 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
127 //-----------------------------------------------------------------------------
129 //-----------------------------------------------------------------------------
130 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
131 p := C.CString(protPort)
133 c := C.int(threadType)
134 defer C.free(unsafe.Pointer(p))
135 ctx := C.rmr_init(p, m, c)
137 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
142 consumers: make([]MessageConsumer, 0),
143 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
147 func NewRMRClient() *RMRClient {
148 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
151 func (m *RMRClient) Start(c MessageConsumer) {
153 m.consumers = append(m.consumers, c)
159 m.ready = int(C.rmr_ready(m.context))
160 m.contextMux.Unlock()
162 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
166 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
168 time.Sleep(1 * time.Second)
173 if m.readyCb != nil {
174 go m.readyCb(m.readyCbParams)
179 rfd := C.rmr_get_rcvfd(m.context)
180 m.contextMux.Unlock()
181 efd := C.init_epoll(rfd)
185 if int(C.wait_epoll(efd, rfd)) == 0 {
189 rxBuffer := C.rmr_rcv_msg(m.context, nil)
190 m.contextMux.Unlock()
193 m.LogMBufError("RecvMsg failed", rxBuffer)
194 m.UpdateStatCounter("ReceiveError")
197 m.UpdateStatCounter("Received")
198 m.parseMessage(rxBuffer)
205 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
206 if len(m.consumers) == 0 {
207 Logger.Info("rmrClient: No message handlers defined, message discarded!")
211 params := &RMRParams{}
212 params.Mbuf = rxBuffer
213 params.Mtype = int(rxBuffer.mtype)
214 params.SubId = int(rxBuffer.sub_id)
215 params.Meid = &RMRMeid{}
217 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
218 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
219 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
222 xidBuf := make([]byte, int(C.RMR_MAX_XID))
223 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
224 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
227 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
228 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
229 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
232 // Default case: a single consumer
233 if len(m.consumers) == 1 && m.consumers[0] != nil {
234 params.PayloadLen = int(rxBuffer.len)
235 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
236 err := m.consumers[0].Consume(params)
238 Logger.Warn("rmrClient: Consumer returned error: %v", err)
243 // Special case for multiple consumers
244 for _, c := range m.consumers {
245 cptr := unsafe.Pointer(rxBuffer.payload)
246 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
247 params.PayloadLen = int(rxBuffer.len)
248 params.Mtype = int(rxBuffer.mtype)
249 params.SubId = int(rxBuffer.sub_id)
251 err := c.Consume(params)
253 Logger.Warn("rmrClient: Consumer returned error: %v", err)
258 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
260 defer m.contextMux.Unlock()
261 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
263 Logger.Error("rmrClient: Allocating message buffer failed!")
268 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
270 defer m.contextMux.Unlock()
271 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
273 Logger.Error("rmrClient: Allocating message buffer failed!")
278 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
283 defer m.contextMux.Unlock()
287 func (m *RMRClient) SendMsg(params *RMRParams) bool {
288 return m.Send(params, false)
291 func (m *RMRClient) SendRts(params *RMRParams) bool {
292 return m.Send(params, true)
295 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
296 status := m.Send(params, isRts)
298 for ; i < int(to)*2 && status == false; i++ {
299 status = m.Send(params, isRts)
301 time.Sleep(500 * time.Millisecond)
305 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
306 if params.Mbuf != nil {
314 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
320 payLen := len(params.Payload)
321 if params.PayloadLen != 0 {
322 payLen = params.PayloadLen
325 txBuffer := params.Mbuf
329 txBuffer = m.ReAllocate(txBuffer, payLen)
331 txBuffer = m.Allocate(payLen)
337 txBuffer.mtype = C.int(params.Mtype)
338 txBuffer.sub_id = C.int(params.SubId)
339 txBuffer.len = C.int(payLen)
341 datap := C.CBytes(params.Payload)
344 if params.Meid != nil {
345 b := make([]byte, int(C.RMR_MAX_MEID))
346 copy(b, []byte(params.Meid.RanName))
347 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
350 xidLen := len(params.Xid)
351 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
352 b := make([]byte, int(C.RMR_MAX_XID))
353 copy(b, []byte(params.Xid))
354 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
357 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
362 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
364 txBuffer := m.CopyBuffer(params)
368 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
369 if params.status == int(C.RMR_OK) {
375 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
377 currBuffer *C.rmr_mbuf_t
378 counterName string = "Transmitted"
384 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
387 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
389 currBuffer = C.rmr_send_msg(m.context, txBuffer)
392 m.contextMux.Unlock()
394 if currBuffer == nil {
395 m.UpdateStatCounter("TransmitError")
396 return m.LogMBufError("SendBuf failed", txBuffer)
399 // Just quick retry seems to help for K8s issue
400 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
401 if maxRetryOnFailure == 0 {
402 maxRetryOnFailure = 5
405 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
408 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
411 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
413 currBuffer = C.rmr_send_msg(m.context, txBuffer)
416 m.contextMux.Unlock()
419 if currBuffer.state != C.RMR_OK {
420 counterName = "TransmitError"
421 m.LogMBufError("SendBuf failed", currBuffer)
424 m.UpdateStatCounter(counterName)
425 defer m.Free(currBuffer)
427 return int(currBuffer.state)
430 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
432 currBuffer *C.rmr_mbuf_t
433 counterName string = "Transmitted"
435 txBuffer := m.CopyBuffer(params)
437 return C.RMR_ERR_INITFAILED, ""
443 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
444 m.contextMux.Unlock()
446 if currBuffer == nil {
447 m.UpdateStatCounter("TransmitError")
448 return m.LogMBufError("SendBuf failed", txBuffer), ""
451 if currBuffer.state != C.RMR_OK {
452 counterName = "TransmitError"
453 m.LogMBufError("SendBuf failed", currBuffer)
456 m.UpdateStatCounter(counterName)
457 defer m.Free(currBuffer)
459 cptr := unsafe.Pointer(currBuffer.payload)
460 payload := C.GoBytes(cptr, C.int(currBuffer.len))
462 return int(currBuffer.state), string(payload)
465 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
466 return m.Wh_open(target)
469 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
471 defer m.contextMux.Unlock()
472 endpoint := C.CString(target)
473 return C.rmr_wh_open(m.context, endpoint)
476 func (m *RMRClient) Closewh(whid int) {
477 m.Wh_close(C.rmr_whid_t(whid))
480 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
482 defer m.contextMux.Unlock()
483 C.rmr_wh_close(m.context, whid)
486 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
487 if params.status == int(C.RMR_ERR_RETRY) {
493 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
494 if params.status == int(C.RMR_ERR_NOENDPT) {
500 func (m *RMRClient) UpdateStatCounter(name string) {
506 func (m *RMRClient) RegisterMetrics() {
507 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
510 func (m *RMRClient) Wait() {
514 func (m *RMRClient) IsReady() bool {
518 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
520 m.readyCbParams = params
523 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
524 id, ok := RICMessageTypes[name]
528 func (m *RMRClient) GetRicMessageName(id int) (s string) {
529 for k, v := range RICMessageTypes {
537 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
538 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
539 return int(mbuf.state)
543 func (m *RMRClient) GetStat() (r RMRStatistics) {