2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_si
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
90 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
91 p := C.CString(protPort)
93 c := C.int(threadType)
94 defer C.free(unsafe.Pointer(p))
96 //ctx := C.rmr_init(p, m, C.int(0))
97 //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
98 ctx := C.rmr_init(p, m, c)
100 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
105 numWorkers: numWorkers,
107 consumers: make([]MessageConsumer, 0),
108 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
112 func NewRMRClient() *RMRClient {
113 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
116 func (m *RMRClient) Start(c MessageConsumer) {
118 m.consumers = append(m.consumers, c)
123 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
124 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
128 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
130 time.Sleep(1 * time.Second)
133 m.wg.Add(m.numWorkers)
135 if m.readyCb != nil {
136 go m.readyCb(m.readyCbParams)
139 for w := 0; w < m.numWorkers; w++ {
140 go m.Worker("worker-"+strconv.Itoa(w), 0)
145 func (m *RMRClient) Worker(taskName string, msgSize int) {
146 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
150 rxBuffer := C.rmr_rcv_msg(m.context, nil)
152 m.LogMBufError("RecvMsg failed", rxBuffer)
153 m.UpdateStatCounter("ReceiveError")
156 m.UpdateStatCounter("Received")
158 go m.parseMessage(rxBuffer)
162 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
163 if len(m.consumers) == 0 {
164 Logger.Info("rmrClient: No message handlers defined, message discarded!")
168 params := &RMRParams{}
169 params.Mbuf = rxBuffer
170 params.Mtype = int(rxBuffer.mtype)
171 params.SubId = int(rxBuffer.sub_id)
172 params.Meid = &RMRMeid{}
174 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
175 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
176 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
179 xidBuf := make([]byte, int(C.RMR_MAX_XID))
180 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
181 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
184 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
185 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
186 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
189 // Default case: a single consumer
190 if len(m.consumers) == 1 && m.consumers[0] != nil {
191 params.PayloadLen = int(rxBuffer.len)
192 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
193 err := m.consumers[0].Consume(params)
195 Logger.Warn("rmrClient: Consumer returned error: %v", err)
200 // Special case for multiple consumers
201 for _, c := range m.consumers {
202 cptr := unsafe.Pointer(rxBuffer.payload)
203 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
204 params.PayloadLen = int(rxBuffer.len)
205 params.Mtype = int(rxBuffer.mtype)
206 params.SubId = int(rxBuffer.sub_id)
208 err := c.Consume(params)
210 Logger.Warn("rmrClient: Consumer returned error: %v", err)
215 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
216 buf := C.rmr_alloc_msg(m.context, 0)
218 Logger.Error("rmrClient: Allocating message buffer failed!")
223 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
230 func (m *RMRClient) SendMsg(params *RMRParams) bool {
231 return m.Send(params, false)
234 func (m *RMRClient) SendRts(params *RMRParams) bool {
235 return m.Send(params, true)
238 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
239 txBuffer := params.Mbuf
241 txBuffer = m.Allocate()
247 txBuffer.mtype = C.int(params.Mtype)
248 txBuffer.sub_id = C.int(params.SubId)
249 txBuffer.len = C.int(len(params.Payload))
250 if params.PayloadLen != 0 {
251 txBuffer.len = C.int(params.PayloadLen)
253 datap := C.CBytes(params.Payload)
257 if params.Meid != nil {
258 b := make([]byte, int(C.RMR_MAX_MEID))
259 copy(b, []byte(params.Meid.RanName))
260 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
262 xidLen := len(params.Xid)
263 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
264 b := make([]byte, int(C.RMR_MAX_XID))
265 copy(b, []byte(params.Xid))
266 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
269 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
273 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
275 txBuffer := m.CopyBuffer(params)
279 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
280 if params.status == int(C.RMR_OK) {
286 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
288 currBuffer *C.rmr_mbuf_t
289 counterName string = "Transmitted"
294 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
297 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
299 currBuffer = C.rmr_send_msg(m.context, txBuffer)
303 if currBuffer == nil {
304 m.UpdateStatCounter("TransmitError")
305 return m.LogMBufError("SendBuf failed", txBuffer)
308 // Just quick retry seems to help for K8s issue
309 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
310 if maxRetryOnFailure == 0 {
311 maxRetryOnFailure = 5
314 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
316 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
319 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
321 currBuffer = C.rmr_send_msg(m.context, txBuffer)
326 if currBuffer.state != C.RMR_OK {
327 counterName = "TransmitError"
328 m.LogMBufError("SendBuf failed", currBuffer)
331 m.UpdateStatCounter(counterName)
332 defer m.Free(currBuffer)
334 return int(currBuffer.state)
337 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
339 currBuffer *C.rmr_mbuf_t
340 counterName string = "Transmitted"
342 txBuffer := m.CopyBuffer(params)
344 return C.RMR_ERR_INITFAILED, ""
349 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
351 if currBuffer == nil {
352 m.UpdateStatCounter("TransmitError")
353 return m.LogMBufError("SendBuf failed", txBuffer), ""
356 if currBuffer.state != C.RMR_OK {
357 counterName = "TransmitError"
358 m.LogMBufError("SendBuf failed", currBuffer)
361 m.UpdateStatCounter(counterName)
362 defer m.Free(currBuffer)
364 cptr := unsafe.Pointer(currBuffer.payload)
365 payload := C.GoBytes(cptr, C.int(currBuffer.len))
367 return int(currBuffer.state), string(payload)
370 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
371 return m.Wh_open(target)
374 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
375 endpoint := C.CString(target)
376 return C.rmr_wh_open(m.context, endpoint)
379 func (m *RMRClient) Closewh(whid int) {
380 m.Wh_close(C.rmr_whid_t(whid))
383 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
384 C.rmr_wh_close(m.context, whid)
387 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
388 if params.status == int(C.RMR_ERR_RETRY) {
394 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
395 if params.status == int(C.RMR_ERR_NOENDPT) {
401 func (m *RMRClient) UpdateStatCounter(name string) {
407 func (m *RMRClient) RegisterMetrics() {
408 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
411 func (m *RMRClient) Wait() {
415 func (m *RMRClient) IsReady() bool {
419 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
421 m.readyCbParams = params
424 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
425 id, ok := RICMessageTypes[name]
429 func (m *RMRClient) GetRicMessageName(id int) (s string) {
430 for k, v := range RICMessageTypes {
438 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
439 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
440 return int(mbuf.state)
444 func (m *RMRClient) GetStat() (r RMRStatistics) {