2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
87 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
88 p := C.CString(protPort)
90 defer C.free(unsafe.Pointer(p))
92 ctx := C.rmr_init(p, m, C.int(0))
94 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
99 numWorkers: numWorkers,
101 consumers: make([]MessageConsumer, 0),
102 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
106 func NewRMRClient() *RMRClient {
107 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
110 func (m *RMRClient) Start(c MessageConsumer) {
112 m.consumers = append(m.consumers, c)
117 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
118 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
122 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
124 time.Sleep(1 * time.Second)
127 m.wg.Add(m.numWorkers)
129 if m.readyCb != nil {
130 go m.readyCb(m.readyCbParams)
133 for w := 0; w < m.numWorkers; w++ {
134 go m.Worker("worker-"+strconv.Itoa(w), 0)
139 func (m *RMRClient) Worker(taskName string, msgSize int) {
140 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
144 rxBuffer := C.rmr_rcv_msg(m.context, nil)
146 m.LogMBufError("RecvMsg failed", rxBuffer)
147 m.UpdateStatCounter("ReceiveError")
150 m.UpdateStatCounter("Received")
152 go m.parseMessage(rxBuffer)
156 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
157 if len(m.consumers) == 0 {
158 Logger.Info("rmrClient: No message handlers defined, message discarded!")
162 params := &RMRParams{}
163 params.Mbuf = rxBuffer
164 params.Mtype = int(rxBuffer.mtype)
165 params.SubId = int(rxBuffer.sub_id)
166 params.Meid = &RMRMeid{}
168 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
169 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
170 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
173 xidBuf := make([]byte, int(C.RMR_MAX_XID))
174 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
175 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
178 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
179 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
180 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
183 // Default case: a single consumer
184 if len(m.consumers) == 1 && m.consumers[0] != nil {
185 params.PayloadLen = int(rxBuffer.len)
186 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
187 err := m.consumers[0].Consume(params)
189 Logger.Warn("rmrClient: Consumer returned error: %v", err)
194 // Special case for multiple consumers
195 for _, c := range m.consumers {
196 cptr := unsafe.Pointer(rxBuffer.payload)
197 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
198 params.PayloadLen = int(rxBuffer.len)
199 params.Mtype = int(rxBuffer.mtype)
200 params.SubId = int(rxBuffer.sub_id)
202 err := c.Consume(params)
204 Logger.Warn("rmrClient: Consumer returned error: %v", err)
209 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
210 buf := C.rmr_alloc_msg(m.context, 0)
212 Logger.Error("rmrClient: Allocating message buffer failed!")
217 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
224 func (m *RMRClient) SendMsg(params *RMRParams) bool {
225 return m.Send(params, false)
228 func (m *RMRClient) SendRts(params *RMRParams) bool {
229 return m.Send(params, true)
232 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
233 txBuffer := params.Mbuf
235 txBuffer = m.Allocate()
241 txBuffer.mtype = C.int(params.Mtype)
242 txBuffer.sub_id = C.int(params.SubId)
243 txBuffer.len = C.int(len(params.Payload))
244 if params.PayloadLen != 0 {
245 txBuffer.len = C.int(params.PayloadLen)
247 datap := C.CBytes(params.Payload)
251 if params.Meid != nil {
252 b := make([]byte, int(C.RMR_MAX_MEID))
253 copy(b, []byte(params.Meid.RanName))
254 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
256 xidLen := len(params.Xid)
257 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
258 b := make([]byte, int(C.RMR_MAX_XID))
259 copy(b, []byte(params.Xid))
260 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
263 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
265 params.status = m.SendBuf(txBuffer, isRts)
266 if params.status == int(C.RMR_OK) {
272 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
274 currBuffer *C.rmr_mbuf_t
275 counterName string = "Transmitted"
280 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
282 currBuffer = C.rmr_send_msg(m.context, txBuffer)
285 if currBuffer == nil {
286 m.UpdateStatCounter("TransmitError")
287 return m.LogMBufError("SendBuf failed", txBuffer)
290 // Just quick retry seems to help for K8s issue
291 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
292 if maxRetryOnFailure == 0 {
293 maxRetryOnFailure = 5
296 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
298 currBuffer = C.rmr_rts_msg(m.context, currBuffer)
300 currBuffer = C.rmr_send_msg(m.context, currBuffer)
304 if currBuffer.state != C.RMR_OK {
305 counterName = "TransmitError"
306 m.LogMBufError("SendBuf failed", currBuffer)
309 m.UpdateStatCounter(counterName)
310 defer m.Free(currBuffer)
312 return int(currBuffer.state)
315 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
316 if params.status == int(C.RMR_ERR_RETRY) {
322 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
323 if params.status == int(C.RMR_ERR_NOENDPT) {
329 func (m *RMRClient) UpdateStatCounter(name string) {
335 func (m *RMRClient) RegisterMetrics() {
336 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
339 func (m *RMRClient) Wait() {
343 func (m *RMRClient) IsReady() bool {
347 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
349 m.readyCbParams = params
352 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
353 id, ok := RICMessageTypes[name]
357 func (m *RMRClient) GetRicMessageName(id int) (s string) {
358 for k, v := range RICMessageTypes {
366 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
367 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
368 return int(mbuf.state)
372 func (m *RMRClient) GetStat() (r RMRStatistics) {