2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
70 "github.com/spf13/viper"
76 var RMRCounterOpts = []CounterOpts{
77 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78 {Name: "Received", Help: "The total number of received RMR messages"},
79 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
83 var RMRErrors = map[int]string{
84 C.RMR_OK: "state is good",
85 C.RMR_ERR_BADARG: "argument passed to function was unusable",
86 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
87 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
88 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
89 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90 C.RMR_ERR_CALLFAILED: "unable to send call() message",
91 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
92 C.RMR_ERR_WHID: "wormhole id was invalid",
93 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
94 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
96 C.RMR_ERR_TIMEOUT: "message processing call timed out",
97 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
98 C.RMR_ERR_TRUNC: "received message likely truncated",
99 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
103 //-----------------------------------------------------------------------------
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
121 func (params *RMRParams) String() string {
123 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
127 //-----------------------------------------------------------------------------
129 //-----------------------------------------------------------------------------
130 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
131 p := C.CString(protPort)
133 c := C.int(threadType)
134 defer C.free(unsafe.Pointer(p))
135 ctx := C.rmr_init(p, m, c)
137 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
142 consumers: make([]MessageConsumer, 0),
143 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
147 func NewRMRClient() *RMRClient {
148 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
151 func (m *RMRClient) Start(c MessageConsumer) {
153 m.consumers = append(m.consumers, c)
159 m.ready = int(C.rmr_ready(m.context))
160 m.contextMux.Unlock()
162 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
166 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
168 time.Sleep(1 * time.Second)
173 if m.readyCb != nil {
174 go m.readyCb(m.readyCbParams)
179 rfd := C.rmr_get_rcvfd(m.context)
180 m.contextMux.Unlock()
181 efd := C.init_epoll(rfd)
185 if int(C.wait_epoll(efd, rfd)) == 0 {
189 rxBuffer := C.rmr_rcv_msg(m.context, nil)
190 m.contextMux.Unlock()
193 m.LogMBufError("RecvMsg failed", rxBuffer)
194 m.UpdateStatCounter("ReceiveError")
197 m.UpdateStatCounter("Received")
198 m.parseMessage(rxBuffer)
205 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
206 if len(m.consumers) == 0 {
207 Logger.Info("rmrClient: No message handlers defined, message discarded!")
211 params := &RMRParams{}
212 params.Mbuf = rxBuffer
213 params.Mtype = int(rxBuffer.mtype)
214 params.SubId = int(rxBuffer.sub_id)
215 params.Meid = &RMRMeid{}
217 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
218 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
219 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
222 xidBuf := make([]byte, int(C.RMR_MAX_XID))
223 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
224 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
227 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
228 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
229 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
232 // Default case: a single consumer
233 if len(m.consumers) == 1 && m.consumers[0] != nil {
234 params.PayloadLen = int(rxBuffer.len)
235 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
236 err := m.consumers[0].Consume(params)
238 Logger.Warn("rmrClient: Consumer returned error: %v", err)
243 // Special case for multiple consumers
244 for _, c := range m.consumers {
245 cptr := unsafe.Pointer(rxBuffer.payload)
246 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
247 params.PayloadLen = int(rxBuffer.len)
248 params.Mtype = int(rxBuffer.mtype)
249 params.SubId = int(rxBuffer.sub_id)
251 err := c.Consume(params)
253 Logger.Warn("rmrClient: Consumer returned error: %v", err)
258 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
260 defer m.contextMux.Unlock()
261 buf := C.rmr_alloc_msg(m.context, C.int(size))
263 Logger.Error("rmrClient: Allocating message buffer failed!")
268 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
273 defer m.contextMux.Unlock()
277 func (m *RMRClient) SendMsg(params *RMRParams) bool {
278 return m.Send(params, false)
281 func (m *RMRClient) SendRts(params *RMRParams) bool {
282 return m.Send(params, true)
285 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
286 status := m.Send(params, isRts)
288 for ; i < int(to)*2 && status == false; i++ {
289 status = m.Send(params, isRts)
291 time.Sleep(500 * time.Millisecond)
295 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
296 if params.Mbuf != nil {
304 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
305 if params.Mbuf != nil {
310 payLen := len(params.Payload)
311 if params.PayloadLen != 0 {
312 payLen = params.PayloadLen
315 txBuffer := m.Allocate(payLen)
319 txBuffer.mtype = C.int(params.Mtype)
320 txBuffer.sub_id = C.int(params.SubId)
321 txBuffer.len = C.int(payLen)
323 datap := C.CBytes(params.Payload)
327 if params.Meid != nil {
328 b := make([]byte, int(C.RMR_MAX_MEID))
329 copy(b, []byte(params.Meid.RanName))
330 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
332 xidLen := len(params.Xid)
333 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
334 b := make([]byte, int(C.RMR_MAX_XID))
335 copy(b, []byte(params.Xid))
336 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
339 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
343 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
345 txBuffer := m.CopyBuffer(params)
349 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
350 if params.status == int(C.RMR_OK) {
356 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
358 currBuffer *C.rmr_mbuf_t
359 counterName string = "Transmitted"
365 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
368 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
370 currBuffer = C.rmr_send_msg(m.context, txBuffer)
373 m.contextMux.Unlock()
375 if currBuffer == nil {
376 m.UpdateStatCounter("TransmitError")
377 return m.LogMBufError("SendBuf failed", txBuffer)
380 // Just quick retry seems to help for K8s issue
381 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
382 if maxRetryOnFailure == 0 {
383 maxRetryOnFailure = 5
386 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
389 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
392 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
394 currBuffer = C.rmr_send_msg(m.context, txBuffer)
397 m.contextMux.Unlock()
400 if currBuffer.state != C.RMR_OK {
401 counterName = "TransmitError"
402 m.LogMBufError("SendBuf failed", currBuffer)
405 m.UpdateStatCounter(counterName)
406 defer m.Free(currBuffer)
408 return int(currBuffer.state)
411 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
413 currBuffer *C.rmr_mbuf_t
414 counterName string = "Transmitted"
416 txBuffer := m.CopyBuffer(params)
418 return C.RMR_ERR_INITFAILED, ""
424 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
425 m.contextMux.Unlock()
427 if currBuffer == nil {
428 m.UpdateStatCounter("TransmitError")
429 return m.LogMBufError("SendBuf failed", txBuffer), ""
432 if currBuffer.state != C.RMR_OK {
433 counterName = "TransmitError"
434 m.LogMBufError("SendBuf failed", currBuffer)
437 m.UpdateStatCounter(counterName)
438 defer m.Free(currBuffer)
440 cptr := unsafe.Pointer(currBuffer.payload)
441 payload := C.GoBytes(cptr, C.int(currBuffer.len))
443 return int(currBuffer.state), string(payload)
446 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
447 return m.Wh_open(target)
450 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
452 defer m.contextMux.Unlock()
453 endpoint := C.CString(target)
454 return C.rmr_wh_open(m.context, endpoint)
457 func (m *RMRClient) Closewh(whid int) {
458 m.Wh_close(C.rmr_whid_t(whid))
461 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
463 defer m.contextMux.Unlock()
464 C.rmr_wh_close(m.context, whid)
467 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
468 if params.status == int(C.RMR_ERR_RETRY) {
474 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
475 if params.status == int(C.RMR_ERR_NOENDPT) {
481 func (m *RMRClient) UpdateStatCounter(name string) {
487 func (m *RMRClient) RegisterMetrics() {
488 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
491 func (m *RMRClient) Wait() {
495 func (m *RMRClient) IsReady() bool {
499 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
501 m.readyCbParams = params
504 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
505 id, ok := RICMessageTypes[name]
509 func (m *RMRClient) GetRicMessageName(id int) (s string) {
510 for k, v := range RICMessageTypes {
518 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
519 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
520 return int(mbuf.state)
524 func (m *RMRClient) GetStat() (r RMRStatistics) {