2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_si
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
90 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
91 p := C.CString(protPort)
93 c := C.int(threadType)
94 defer C.free(unsafe.Pointer(p))
96 //ctx := C.rmr_init(p, m, C.int(0))
97 //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
98 ctx := C.rmr_init(p, m, c)
100 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
105 numWorkers: numWorkers,
107 consumers: make([]MessageConsumer, 0),
108 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
112 func NewRMRClient() *RMRClient {
113 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
116 func (m *RMRClient) Start(c MessageConsumer) {
118 m.consumers = append(m.consumers, c)
123 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
124 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
128 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
130 time.Sleep(1 * time.Second)
133 m.wg.Add(m.numWorkers)
135 if m.readyCb != nil {
136 go m.readyCb(m.readyCbParams)
139 for w := 0; w < m.numWorkers; w++ {
140 go m.Worker("worker-"+strconv.Itoa(w), 0)
145 func (m *RMRClient) Worker(taskName string, msgSize int) {
146 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
150 rxBuffer := C.rmr_rcv_msg(m.context, nil)
152 m.LogMBufError("RecvMsg failed", rxBuffer)
153 m.UpdateStatCounter("ReceiveError")
156 m.UpdateStatCounter("Received")
159 go m.parseMessage(rxBuffer)
164 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
166 if len(m.consumers) == 0 {
167 Logger.Info("rmrClient: No message handlers defined, message discarded!")
171 params := &RMRParams{}
172 params.Mbuf = rxBuffer
173 params.Mtype = int(rxBuffer.mtype)
174 params.SubId = int(rxBuffer.sub_id)
175 params.Meid = &RMRMeid{}
177 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
178 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
179 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
182 xidBuf := make([]byte, int(C.RMR_MAX_XID))
183 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
184 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
187 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
188 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
189 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
192 // Default case: a single consumer
193 if len(m.consumers) == 1 && m.consumers[0] != nil {
194 params.PayloadLen = int(rxBuffer.len)
195 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
196 err := m.consumers[0].Consume(params)
198 Logger.Warn("rmrClient: Consumer returned error: %v", err)
203 // Special case for multiple consumers
204 for _, c := range m.consumers {
205 cptr := unsafe.Pointer(rxBuffer.payload)
206 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
207 params.PayloadLen = int(rxBuffer.len)
208 params.Mtype = int(rxBuffer.mtype)
209 params.SubId = int(rxBuffer.sub_id)
211 err := c.Consume(params)
213 Logger.Warn("rmrClient: Consumer returned error: %v", err)
218 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
219 buf := C.rmr_alloc_msg(m.context, C.int(size))
221 Logger.Error("rmrClient: Allocating message buffer failed!")
226 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
233 func (m *RMRClient) SendMsg(params *RMRParams) bool {
234 return m.Send(params, false)
237 func (m *RMRClient) SendRts(params *RMRParams) bool {
238 return m.Send(params, true)
241 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
242 if params.Mbuf != nil {
247 payLen := len(params.Payload)
248 if params.PayloadLen != 0 {
249 payLen = params.PayloadLen
252 txBuffer := m.Allocate(payLen)
256 txBuffer.mtype = C.int(params.Mtype)
257 txBuffer.sub_id = C.int(params.SubId)
258 txBuffer.len = C.int(payLen)
260 datap := C.CBytes(params.Payload)
264 if params.Meid != nil {
265 b := make([]byte, int(C.RMR_MAX_MEID))
266 copy(b, []byte(params.Meid.RanName))
267 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
269 xidLen := len(params.Xid)
270 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
271 b := make([]byte, int(C.RMR_MAX_XID))
272 copy(b, []byte(params.Xid))
273 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
276 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
280 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
282 txBuffer := m.CopyBuffer(params)
286 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
287 if params.status == int(C.RMR_OK) {
293 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
295 currBuffer *C.rmr_mbuf_t
296 counterName string = "Transmitted"
301 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
304 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
306 currBuffer = C.rmr_send_msg(m.context, txBuffer)
310 if currBuffer == nil {
311 m.UpdateStatCounter("TransmitError")
312 return m.LogMBufError("SendBuf failed", txBuffer)
315 // Just quick retry seems to help for K8s issue
316 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
317 if maxRetryOnFailure == 0 {
318 maxRetryOnFailure = 5
321 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
323 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
326 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
328 currBuffer = C.rmr_send_msg(m.context, txBuffer)
333 if currBuffer.state != C.RMR_OK {
334 counterName = "TransmitError"
335 m.LogMBufError("SendBuf failed", currBuffer)
338 m.UpdateStatCounter(counterName)
339 defer m.Free(currBuffer)
341 return int(currBuffer.state)
344 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
346 currBuffer *C.rmr_mbuf_t
347 counterName string = "Transmitted"
349 txBuffer := m.CopyBuffer(params)
351 return C.RMR_ERR_INITFAILED, ""
356 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
358 if currBuffer == nil {
359 m.UpdateStatCounter("TransmitError")
360 return m.LogMBufError("SendBuf failed", txBuffer), ""
363 if currBuffer.state != C.RMR_OK {
364 counterName = "TransmitError"
365 m.LogMBufError("SendBuf failed", currBuffer)
368 m.UpdateStatCounter(counterName)
369 defer m.Free(currBuffer)
371 cptr := unsafe.Pointer(currBuffer.payload)
372 payload := C.GoBytes(cptr, C.int(currBuffer.len))
374 return int(currBuffer.state), string(payload)
377 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
378 return m.Wh_open(target)
381 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
382 endpoint := C.CString(target)
383 return C.rmr_wh_open(m.context, endpoint)
386 func (m *RMRClient) Closewh(whid int) {
387 m.Wh_close(C.rmr_whid_t(whid))
390 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
391 C.rmr_wh_close(m.context, whid)
394 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
395 if params.status == int(C.RMR_ERR_RETRY) {
401 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
402 if params.status == int(C.RMR_ERR_NOENDPT) {
408 func (m *RMRClient) UpdateStatCounter(name string) {
414 func (m *RMRClient) RegisterMetrics() {
415 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
418 func (m *RMRClient) Wait() {
422 func (m *RMRClient) IsReady() bool {
426 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
428 m.readyCbParams = params
431 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
432 id, ok := RICMessageTypes[name]
436 func (m *RMRClient) GetRicMessageName(id int) (s string) {
437 for k, v := range RICMessageTypes {
445 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
446 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
447 return int(mbuf.state)
451 func (m *RMRClient) GetStat() (r RMRStatistics) {