2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
88 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
89 p := C.CString(protPort)
91 c := C.int(threadType)
92 defer C.free(unsafe.Pointer(p))
94 //ctx := C.rmr_init(p, m, C.int(0))
95 //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
96 ctx := C.rmr_init(p, m, c)
98 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
103 numWorkers: numWorkers,
105 consumers: make([]MessageConsumer, 0),
106 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
110 func NewRMRClient() *RMRClient {
111 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
114 func (m *RMRClient) Start(c MessageConsumer) {
116 m.consumers = append(m.consumers, c)
121 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
122 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
126 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
128 time.Sleep(1 * time.Second)
131 m.wg.Add(m.numWorkers)
133 if m.readyCb != nil {
134 go m.readyCb(m.readyCbParams)
137 for w := 0; w < m.numWorkers; w++ {
138 go m.Worker("worker-"+strconv.Itoa(w), 0)
143 func (m *RMRClient) Worker(taskName string, msgSize int) {
144 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
148 rxBuffer := C.rmr_rcv_msg(m.context, nil)
150 m.LogMBufError("RecvMsg failed", rxBuffer)
151 m.UpdateStatCounter("ReceiveError")
154 m.UpdateStatCounter("Received")
156 go m.parseMessage(rxBuffer)
160 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
161 if len(m.consumers) == 0 {
162 Logger.Info("rmrClient: No message handlers defined, message discarded!")
166 params := &RMRParams{}
167 params.Mbuf = rxBuffer
168 params.Mtype = int(rxBuffer.mtype)
169 params.SubId = int(rxBuffer.sub_id)
170 params.Meid = &RMRMeid{}
172 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
173 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
174 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
177 xidBuf := make([]byte, int(C.RMR_MAX_XID))
178 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
179 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
182 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
183 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
184 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
187 // Default case: a single consumer
188 if len(m.consumers) == 1 && m.consumers[0] != nil {
189 params.PayloadLen = int(rxBuffer.len)
190 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
191 err := m.consumers[0].Consume(params)
193 Logger.Warn("rmrClient: Consumer returned error: %v", err)
198 // Special case for multiple consumers
199 for _, c := range m.consumers {
200 cptr := unsafe.Pointer(rxBuffer.payload)
201 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
202 params.PayloadLen = int(rxBuffer.len)
203 params.Mtype = int(rxBuffer.mtype)
204 params.SubId = int(rxBuffer.sub_id)
206 err := c.Consume(params)
208 Logger.Warn("rmrClient: Consumer returned error: %v", err)
213 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
214 buf := C.rmr_alloc_msg(m.context, 0)
216 Logger.Error("rmrClient: Allocating message buffer failed!")
221 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
228 func (m *RMRClient) SendMsg(params *RMRParams) bool {
229 return m.Send(params, false)
232 func (m *RMRClient) SendRts(params *RMRParams) bool {
233 return m.Send(params, true)
236 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
237 txBuffer := params.Mbuf
239 txBuffer = m.Allocate()
245 txBuffer.mtype = C.int(params.Mtype)
246 txBuffer.sub_id = C.int(params.SubId)
247 txBuffer.len = C.int(len(params.Payload))
248 if params.PayloadLen != 0 {
249 txBuffer.len = C.int(params.PayloadLen)
251 datap := C.CBytes(params.Payload)
255 if params.Meid != nil {
256 b := make([]byte, int(C.RMR_MAX_MEID))
257 copy(b, []byte(params.Meid.RanName))
258 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
260 xidLen := len(params.Xid)
261 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
262 b := make([]byte, int(C.RMR_MAX_XID))
263 copy(b, []byte(params.Xid))
264 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
267 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
269 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
270 if params.status == int(C.RMR_OK) {
276 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
278 currBuffer *C.rmr_mbuf_t
279 counterName string = "Transmitted"
284 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
287 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
289 currBuffer = C.rmr_send_msg(m.context, txBuffer)
293 if currBuffer == nil {
294 m.UpdateStatCounter("TransmitError")
295 return m.LogMBufError("SendBuf failed", txBuffer)
298 // Just quick retry seems to help for K8s issue
299 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
300 if maxRetryOnFailure == 0 {
301 maxRetryOnFailure = 5
304 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
306 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
309 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
311 currBuffer = C.rmr_send_msg(m.context, txBuffer)
316 if currBuffer.state != C.RMR_OK {
317 counterName = "TransmitError"
318 m.LogMBufError("SendBuf failed", currBuffer)
321 m.UpdateStatCounter(counterName)
322 defer m.Free(currBuffer)
324 return int(currBuffer.state)
327 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
328 return m.Wh_open(target)
331 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
332 endpoint := C.CString(target)
333 return C.rmr_wh_open(m.context, endpoint)
336 func (m *RMRClient) Closewh(whid int) {
337 m.Wh_close(C.rmr_whid_t(whid))
340 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
341 C.rmr_wh_close(m.context, whid)
344 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
345 if params.status == int(C.RMR_ERR_RETRY) {
351 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
352 if params.status == int(C.RMR_ERR_NOENDPT) {
358 func (m *RMRClient) UpdateStatCounter(name string) {
364 func (m *RMRClient) RegisterMetrics() {
365 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
368 func (m *RMRClient) Wait() {
372 func (m *RMRClient) IsReady() bool {
376 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
378 m.readyCbParams = params
381 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
382 id, ok := RICMessageTypes[name]
386 func (m *RMRClient) GetRicMessageName(id int) (s string) {
387 for k, v := range RICMessageTypes {
395 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
396 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
397 return int(mbuf.state)
401 func (m *RMRClient) GetStat() (r RMRStatistics) {