2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_si
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
90 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
91 p := C.CString(protPort)
93 c := C.int(threadType)
94 defer C.free(unsafe.Pointer(p))
96 //ctx := C.rmr_init(p, m, C.int(0))
97 //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
98 ctx := C.rmr_init(p, m, c)
100 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
105 numWorkers: numWorkers,
107 consumers: make([]MessageConsumer, 0),
108 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
112 func NewRMRClient() *RMRClient {
113 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
116 func (m *RMRClient) Start(c MessageConsumer) {
118 m.consumers = append(m.consumers, c)
123 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
124 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
128 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
130 time.Sleep(1 * time.Second)
133 m.wg.Add(m.numWorkers)
135 if m.readyCb != nil {
136 go m.readyCb(m.readyCbParams)
139 for w := 0; w < m.numWorkers; w++ {
140 go m.Worker("worker-"+strconv.Itoa(w), 0)
145 func (m *RMRClient) Worker(taskName string, msgSize int) {
146 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
150 rxBuffer := C.rmr_rcv_msg(m.context, nil)
152 m.LogMBufError("RecvMsg failed", rxBuffer)
153 m.UpdateStatCounter("ReceiveError")
156 m.UpdateStatCounter("Received")
158 go m.parseMessage(rxBuffer)
162 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
163 if len(m.consumers) == 0 {
164 Logger.Info("rmrClient: No message handlers defined, message discarded!")
168 params := &RMRParams{}
169 params.Mbuf = rxBuffer
170 params.Mtype = int(rxBuffer.mtype)
171 params.SubId = int(rxBuffer.sub_id)
172 params.Meid = &RMRMeid{}
174 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
175 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
176 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
179 xidBuf := make([]byte, int(C.RMR_MAX_XID))
180 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
181 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
184 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
185 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
186 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
189 // Default case: a single consumer
190 if len(m.consumers) == 1 && m.consumers[0] != nil {
191 params.PayloadLen = int(rxBuffer.len)
192 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
193 err := m.consumers[0].Consume(params)
195 Logger.Warn("rmrClient: Consumer returned error: %v", err)
200 // Special case for multiple consumers
201 for _, c := range m.consumers {
202 cptr := unsafe.Pointer(rxBuffer.payload)
203 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
204 params.PayloadLen = int(rxBuffer.len)
205 params.Mtype = int(rxBuffer.mtype)
206 params.SubId = int(rxBuffer.sub_id)
208 err := c.Consume(params)
210 Logger.Warn("rmrClient: Consumer returned error: %v", err)
215 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
216 buf := C.rmr_alloc_msg(m.context, C.int(size))
218 Logger.Error("rmrClient: Allocating message buffer failed!")
223 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
230 func (m *RMRClient) SendMsg(params *RMRParams) bool {
231 return m.Send(params, false)
234 func (m *RMRClient) SendRts(params *RMRParams) bool {
235 return m.Send(params, true)
238 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
239 if params.Mbuf != nil {
244 payLen := len(params.Payload)
245 if params.PayloadLen != 0 {
246 payLen = params.PayloadLen
249 txBuffer := m.Allocate(payLen)
253 txBuffer.mtype = C.int(params.Mtype)
254 txBuffer.sub_id = C.int(params.SubId)
255 txBuffer.len = C.int(payLen)
257 datap := C.CBytes(params.Payload)
261 if params.Meid != nil {
262 b := make([]byte, int(C.RMR_MAX_MEID))
263 copy(b, []byte(params.Meid.RanName))
264 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
266 xidLen := len(params.Xid)
267 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
268 b := make([]byte, int(C.RMR_MAX_XID))
269 copy(b, []byte(params.Xid))
270 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
273 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
277 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
279 txBuffer := m.CopyBuffer(params)
283 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
284 if params.status == int(C.RMR_OK) {
290 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
292 currBuffer *C.rmr_mbuf_t
293 counterName string = "Transmitted"
298 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
301 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
303 currBuffer = C.rmr_send_msg(m.context, txBuffer)
307 if currBuffer == nil {
308 m.UpdateStatCounter("TransmitError")
309 return m.LogMBufError("SendBuf failed", txBuffer)
312 // Just quick retry seems to help for K8s issue
313 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
314 if maxRetryOnFailure == 0 {
315 maxRetryOnFailure = 5
318 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
320 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
323 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
325 currBuffer = C.rmr_send_msg(m.context, txBuffer)
330 if currBuffer.state != C.RMR_OK {
331 counterName = "TransmitError"
332 m.LogMBufError("SendBuf failed", currBuffer)
335 m.UpdateStatCounter(counterName)
336 defer m.Free(currBuffer)
338 return int(currBuffer.state)
341 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
343 currBuffer *C.rmr_mbuf_t
344 counterName string = "Transmitted"
346 txBuffer := m.CopyBuffer(params)
348 return C.RMR_ERR_INITFAILED, ""
353 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
355 if currBuffer == nil {
356 m.UpdateStatCounter("TransmitError")
357 return m.LogMBufError("SendBuf failed", txBuffer), ""
360 if currBuffer.state != C.RMR_OK {
361 counterName = "TransmitError"
362 m.LogMBufError("SendBuf failed", currBuffer)
365 m.UpdateStatCounter(counterName)
366 defer m.Free(currBuffer)
368 cptr := unsafe.Pointer(currBuffer.payload)
369 payload := C.GoBytes(cptr, C.int(currBuffer.len))
371 return int(currBuffer.state), string(payload)
374 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
375 return m.Wh_open(target)
378 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
379 endpoint := C.CString(target)
380 return C.rmr_wh_open(m.context, endpoint)
383 func (m *RMRClient) Closewh(whid int) {
384 m.Wh_close(C.rmr_whid_t(whid))
387 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
388 C.rmr_wh_close(m.context, whid)
391 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
392 if params.status == int(C.RMR_ERR_RETRY) {
398 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
399 if params.status == int(C.RMR_ERR_NOENDPT) {
405 func (m *RMRClient) UpdateStatCounter(name string) {
411 func (m *RMRClient) RegisterMetrics() {
412 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
415 func (m *RMRClient) Wait() {
419 func (m *RMRClient) IsReady() bool {
423 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
425 m.readyCbParams = params
428 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
429 id, ok := RICMessageTypes[name]
433 func (m *RMRClient) GetRicMessageName(id int) (s string) {
434 for k, v := range RICMessageTypes {
442 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
443 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
444 return int(mbuf.state)
448 func (m *RMRClient) GetStat() (r RMRStatistics) {