2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 #include <sys/epoll.h>
30 #include <rmr/RIC_message_types.h>
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
36 int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
45 void close_epoll(int epoll_fd) {
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
62 #cgo LDFLAGS: -lrmr_si
68 "github.com/spf13/viper"
74 var RMRCounterOpts = []CounterOpts{
75 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
76 {Name: "Received", Help: "The total number of received RMR messages"},
77 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
78 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
81 var RMRErrors = map[int]string{
82 C.RMR_OK: "state is good",
83 C.RMR_ERR_BADARG: "argument passed to function was unusable",
84 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
85 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
86 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
87 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
88 C.RMR_ERR_CALLFAILED: "unable to send call() message",
89 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
90 C.RMR_ERR_WHID: "wormhole id was invalid",
91 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
92 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
93 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
94 C.RMR_ERR_TIMEOUT: "message processing call timed out",
95 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
96 C.RMR_ERR_TRUNC: "received message likely truncated",
97 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
98 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
101 type RMRParams struct {
116 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
117 p := C.CString(protPort)
119 c := C.int(threadType)
120 defer C.free(unsafe.Pointer(p))
121 ctx := C.rmr_init(p, m, c)
123 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
128 consumers: make([]MessageConsumer, 0),
129 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
133 func NewRMRClient() *RMRClient {
134 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
137 func (m *RMRClient) Start(c MessageConsumer) {
139 m.consumers = append(m.consumers, c)
145 m.ready = int(C.rmr_ready(m.context))
146 m.contextMux.Unlock()
148 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
152 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
154 time.Sleep(1 * time.Second)
159 if m.readyCb != nil {
160 go m.readyCb(m.readyCbParams)
165 rfd := C.rmr_get_rcvfd(m.context)
166 m.contextMux.Unlock()
167 efd := C.init_epoll(rfd)
171 if int(C.wait_epoll(efd, rfd)) == 0 {
175 rxBuffer := C.rmr_rcv_msg(m.context, nil)
176 m.contextMux.Unlock()
179 m.LogMBufError("RecvMsg failed", rxBuffer)
180 m.UpdateStatCounter("ReceiveError")
183 m.UpdateStatCounter("Received")
184 m.parseMessage(rxBuffer)
191 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
192 if len(m.consumers) == 0 {
193 Logger.Info("rmrClient: No message handlers defined, message discarded!")
197 params := &RMRParams{}
198 params.Mbuf = rxBuffer
199 params.Mtype = int(rxBuffer.mtype)
200 params.SubId = int(rxBuffer.sub_id)
201 params.Meid = &RMRMeid{}
203 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
204 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
205 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
208 xidBuf := make([]byte, int(C.RMR_MAX_XID))
209 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
210 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
213 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
214 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
215 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
218 // Default case: a single consumer
219 if len(m.consumers) == 1 && m.consumers[0] != nil {
220 params.PayloadLen = int(rxBuffer.len)
221 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
222 err := m.consumers[0].Consume(params)
224 Logger.Warn("rmrClient: Consumer returned error: %v", err)
229 // Special case for multiple consumers
230 for _, c := range m.consumers {
231 cptr := unsafe.Pointer(rxBuffer.payload)
232 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
233 params.PayloadLen = int(rxBuffer.len)
234 params.Mtype = int(rxBuffer.mtype)
235 params.SubId = int(rxBuffer.sub_id)
237 err := c.Consume(params)
239 Logger.Warn("rmrClient: Consumer returned error: %v", err)
244 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
246 defer m.contextMux.Unlock()
247 buf := C.rmr_alloc_msg(m.context, C.int(size))
249 Logger.Error("rmrClient: Allocating message buffer failed!")
254 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
259 defer m.contextMux.Unlock()
263 func (m *RMRClient) SendMsg(params *RMRParams) bool {
264 return m.Send(params, false)
267 func (m *RMRClient) SendRts(params *RMRParams) bool {
268 return m.Send(params, true)
271 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
272 if params.Mbuf != nil {
277 payLen := len(params.Payload)
278 if params.PayloadLen != 0 {
279 payLen = params.PayloadLen
282 txBuffer := m.Allocate(payLen)
286 txBuffer.mtype = C.int(params.Mtype)
287 txBuffer.sub_id = C.int(params.SubId)
288 txBuffer.len = C.int(payLen)
290 datap := C.CBytes(params.Payload)
294 if params.Meid != nil {
295 b := make([]byte, int(C.RMR_MAX_MEID))
296 copy(b, []byte(params.Meid.RanName))
297 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
299 xidLen := len(params.Xid)
300 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
301 b := make([]byte, int(C.RMR_MAX_XID))
302 copy(b, []byte(params.Xid))
303 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
306 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
310 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
312 txBuffer := m.CopyBuffer(params)
316 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
317 if params.status == int(C.RMR_OK) {
323 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
325 currBuffer *C.rmr_mbuf_t
326 counterName string = "Transmitted"
332 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
335 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
337 currBuffer = C.rmr_send_msg(m.context, txBuffer)
340 m.contextMux.Unlock()
342 if currBuffer == nil {
343 m.UpdateStatCounter("TransmitError")
344 return m.LogMBufError("SendBuf failed", txBuffer)
347 // Just quick retry seems to help for K8s issue
348 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
349 if maxRetryOnFailure == 0 {
350 maxRetryOnFailure = 5
353 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
356 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
359 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
361 currBuffer = C.rmr_send_msg(m.context, txBuffer)
364 m.contextMux.Unlock()
367 if currBuffer.state != C.RMR_OK {
368 counterName = "TransmitError"
369 m.LogMBufError("SendBuf failed", currBuffer)
372 m.UpdateStatCounter(counterName)
373 defer m.Free(currBuffer)
375 return int(currBuffer.state)
378 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
380 currBuffer *C.rmr_mbuf_t
381 counterName string = "Transmitted"
383 txBuffer := m.CopyBuffer(params)
385 return C.RMR_ERR_INITFAILED, ""
391 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
392 m.contextMux.Unlock()
394 if currBuffer == nil {
395 m.UpdateStatCounter("TransmitError")
396 return m.LogMBufError("SendBuf failed", txBuffer), ""
399 if currBuffer.state != C.RMR_OK {
400 counterName = "TransmitError"
401 m.LogMBufError("SendBuf failed", currBuffer)
404 m.UpdateStatCounter(counterName)
405 defer m.Free(currBuffer)
407 cptr := unsafe.Pointer(currBuffer.payload)
408 payload := C.GoBytes(cptr, C.int(currBuffer.len))
410 return int(currBuffer.state), string(payload)
413 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
414 return m.Wh_open(target)
417 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
419 defer m.contextMux.Unlock()
420 endpoint := C.CString(target)
421 return C.rmr_wh_open(m.context, endpoint)
424 func (m *RMRClient) Closewh(whid int) {
425 m.Wh_close(C.rmr_whid_t(whid))
428 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
430 defer m.contextMux.Unlock()
431 C.rmr_wh_close(m.context, whid)
434 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
435 if params.status == int(C.RMR_ERR_RETRY) {
441 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
442 if params.status == int(C.RMR_ERR_NOENDPT) {
448 func (m *RMRClient) UpdateStatCounter(name string) {
454 func (m *RMRClient) RegisterMetrics() {
455 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
458 func (m *RMRClient) Wait() {
462 func (m *RMRClient) IsReady() bool {
466 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
468 m.readyCbParams = params
471 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
472 id, ok := RICMessageTypes[name]
476 func (m *RMRClient) GetRicMessageName(id int) (s string) {
477 for k, v := range RICMessageTypes {
485 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
486 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
487 return int(mbuf.state)
491 func (m *RMRClient) GetStat() (r RMRStatistics) {