2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
70 "github.com/spf13/viper"
76 var RMRCounterOpts = []CounterOpts{
77 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78 {Name: "Received", Help: "The total number of received RMR messages"},
79 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
83 var RMRErrors = map[int]string{
84 C.RMR_OK: "state is good",
85 C.RMR_ERR_BADARG: "argument passed to function was unusable",
86 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
87 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
88 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
89 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90 C.RMR_ERR_CALLFAILED: "unable to send call() message",
91 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
92 C.RMR_ERR_WHID: "wormhole id was invalid",
93 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
94 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
96 C.RMR_ERR_TIMEOUT: "message processing call timed out",
97 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
98 C.RMR_ERR_TRUNC: "received message likely truncated",
99 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
103 //-----------------------------------------------------------------------------
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
121 func (params *RMRParams) String() string {
123 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
127 //-----------------------------------------------------------------------------
129 //-----------------------------------------------------------------------------
130 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
131 p := C.CString(protPort)
133 c := C.int(threadType)
134 defer C.free(unsafe.Pointer(p))
135 ctx := C.rmr_init(p, m, c)
137 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
142 consumers: make([]MessageConsumer, 0),
143 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
147 func NewRMRClient() *RMRClient {
148 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
151 func (m *RMRClient) Start(c MessageConsumer) {
153 m.consumers = append(m.consumers, c)
159 m.ready = int(C.rmr_ready(m.context))
160 m.contextMux.Unlock()
162 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
166 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
168 time.Sleep(1 * time.Second)
173 if m.readyCb != nil {
174 go m.readyCb(m.readyCbParams)
179 rfd := C.rmr_get_rcvfd(m.context)
180 m.contextMux.Unlock()
181 efd := C.init_epoll(rfd)
185 if int(C.wait_epoll(efd, rfd)) == 0 {
189 rxBuffer := C.rmr_rcv_msg(m.context, nil)
190 m.contextMux.Unlock()
193 m.LogMBufError("RecvMsg failed", rxBuffer)
194 m.UpdateStatCounter("ReceiveError")
197 m.UpdateStatCounter("Received")
198 m.parseMessage(rxBuffer)
205 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
206 if len(m.consumers) == 0 {
207 Logger.Info("rmrClient: No message handlers defined, message discarded!")
211 params := &RMRParams{}
212 params.Mbuf = rxBuffer
213 params.Mtype = int(rxBuffer.mtype)
214 params.SubId = int(rxBuffer.sub_id)
215 params.Meid = &RMRMeid{}
217 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
218 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
219 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
222 xidBuf := make([]byte, int(C.RMR_MAX_XID))
223 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
224 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
227 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
228 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
229 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
232 // Default case: a single consumer
233 if len(m.consumers) == 1 && m.consumers[0] != nil {
234 params.PayloadLen = int(rxBuffer.len)
235 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
236 err := m.consumers[0].Consume(params)
238 Logger.Warn("rmrClient: Consumer returned error: %v", err)
243 // Special case for multiple consumers
244 for _, c := range m.consumers {
245 cptr := unsafe.Pointer(rxBuffer.payload)
246 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
247 params.PayloadLen = int(rxBuffer.len)
248 params.Mtype = int(rxBuffer.mtype)
249 params.SubId = int(rxBuffer.sub_id)
251 err := c.Consume(params)
253 Logger.Warn("rmrClient: Consumer returned error: %v", err)
258 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
260 defer m.contextMux.Unlock()
261 outbuf := C.rmr_alloc_msg(m.context, C.int(size))
263 Logger.Error("rmrClient: Allocating message buffer failed!")
268 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
270 defer m.contextMux.Unlock()
271 outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
273 Logger.Error("rmrClient: Allocating message buffer failed!")
278 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
283 defer m.contextMux.Unlock()
287 func (m *RMRClient) SendMsg(params *RMRParams) bool {
288 return m.Send(params, false)
291 func (m *RMRClient) SendRts(params *RMRParams) bool {
292 return m.Send(params, true)
295 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
296 status := m.Send(params, isRts)
298 for ; i < int(to)*2 && status == false; i++ {
299 status = m.Send(params, isRts)
301 time.Sleep(500 * time.Millisecond)
305 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
306 if params.Mbuf != nil {
314 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
316 payLen := len(params.Payload)
317 if params.PayloadLen != 0 {
318 payLen = params.PayloadLen
321 txBuffer := params.Mbuf
325 txBuffer = m.ReAllocate(txBuffer, payLen)
327 txBuffer = m.Allocate(payLen)
333 txBuffer.mtype = C.int(params.Mtype)
334 txBuffer.sub_id = C.int(params.SubId)
335 txBuffer.len = C.int(payLen)
337 datap := C.CBytes(params.Payload)
341 if params.Meid != nil {
342 b := make([]byte, int(C.RMR_MAX_MEID))
343 copy(b, []byte(params.Meid.RanName))
344 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
346 xidLen := len(params.Xid)
347 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
348 b := make([]byte, int(C.RMR_MAX_XID))
349 copy(b, []byte(params.Xid))
350 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
353 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
357 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
359 txBuffer := m.CopyBuffer(params)
363 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
364 if params.status == int(C.RMR_OK) {
370 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
372 currBuffer *C.rmr_mbuf_t
373 counterName string = "Transmitted"
379 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
382 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
384 currBuffer = C.rmr_send_msg(m.context, txBuffer)
387 m.contextMux.Unlock()
389 if currBuffer == nil {
390 m.UpdateStatCounter("TransmitError")
391 return m.LogMBufError("SendBuf failed", txBuffer)
394 // Just quick retry seems to help for K8s issue
395 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
396 if maxRetryOnFailure == 0 {
397 maxRetryOnFailure = 5
400 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
403 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
406 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
408 currBuffer = C.rmr_send_msg(m.context, txBuffer)
411 m.contextMux.Unlock()
414 if currBuffer.state != C.RMR_OK {
415 counterName = "TransmitError"
416 m.LogMBufError("SendBuf failed", currBuffer)
419 m.UpdateStatCounter(counterName)
420 defer m.Free(currBuffer)
422 return int(currBuffer.state)
425 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
427 currBuffer *C.rmr_mbuf_t
428 counterName string = "Transmitted"
430 txBuffer := m.CopyBuffer(params)
432 return C.RMR_ERR_INITFAILED, ""
438 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
439 m.contextMux.Unlock()
441 if currBuffer == nil {
442 m.UpdateStatCounter("TransmitError")
443 return m.LogMBufError("SendBuf failed", txBuffer), ""
446 if currBuffer.state != C.RMR_OK {
447 counterName = "TransmitError"
448 m.LogMBufError("SendBuf failed", currBuffer)
451 m.UpdateStatCounter(counterName)
452 defer m.Free(currBuffer)
454 cptr := unsafe.Pointer(currBuffer.payload)
455 payload := C.GoBytes(cptr, C.int(currBuffer.len))
457 return int(currBuffer.state), string(payload)
460 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
461 return m.Wh_open(target)
464 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
466 defer m.contextMux.Unlock()
467 endpoint := C.CString(target)
468 return C.rmr_wh_open(m.context, endpoint)
471 func (m *RMRClient) Closewh(whid int) {
472 m.Wh_close(C.rmr_whid_t(whid))
475 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
477 defer m.contextMux.Unlock()
478 C.rmr_wh_close(m.context, whid)
481 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
482 if params.status == int(C.RMR_ERR_RETRY) {
488 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
489 if params.status == int(C.RMR_ERR_NOENDPT) {
495 func (m *RMRClient) UpdateStatCounter(name string) {
501 func (m *RMRClient) RegisterMetrics() {
502 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
505 func (m *RMRClient) Wait() {
509 func (m *RMRClient) IsReady() bool {
513 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
515 m.readyCbParams = params
518 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
519 id, ok := RICMessageTypes[name]
523 func (m *RMRClient) GetRicMessageName(id int) (s string) {
524 for k, v := range RICMessageTypes {
532 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
533 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
534 return int(mbuf.state)
538 func (m *RMRClient) GetStat() (r RMRStatistics) {