2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
87 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
88 p := C.CString(protPort)
90 defer C.free(unsafe.Pointer(p))
92 ctx := C.rmr_init(p, m, C.int(0))
94 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
99 numWorkers: numWorkers,
101 consumers: make([]MessageConsumer, 0),
102 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
106 func NewRMRClient() *RMRClient {
107 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
110 func (m *RMRClient) Start(c MessageConsumer) {
112 m.consumers = append(m.consumers, c)
117 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
118 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
122 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
124 time.Sleep(1 * time.Second)
127 m.wg.Add(m.numWorkers)
129 if m.readyCb != nil {
130 go m.readyCb(m.readyCbParams)
133 for w := 0; w < m.numWorkers; w++ {
134 go m.Worker("worker-"+strconv.Itoa(w), 0)
139 func (m *RMRClient) Worker(taskName string, msgSize int) {
140 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
144 rxBuffer := C.rmr_rcv_msg(m.context, nil)
146 m.LogMBufError("RecvMsg failed", rxBuffer)
147 m.UpdateStatCounter("ReceiveError")
150 m.UpdateStatCounter("Received")
152 go m.parseMessage(rxBuffer)
156 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
157 if len(m.consumers) == 0 {
158 Logger.Info("rmrClient: No message handlers defined, message discarded!")
162 params := &RMRParams{}
163 params.Mbuf = rxBuffer
164 params.Mtype = int(rxBuffer.mtype)
165 params.SubId = int(rxBuffer.sub_id)
166 params.Meid = &RMRMeid{}
168 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
169 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
170 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
173 xidBuf := make([]byte, int(C.RMR_MAX_XID))
174 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
175 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
178 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
179 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
180 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
183 for _, c := range m.consumers {
184 cptr := unsafe.Pointer(rxBuffer.payload)
185 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
186 params.PayloadLen = int(rxBuffer.len)
188 err := c.Consume(params)
190 Logger.Warn("rmrClient: Consumer returned error: %v", err)
195 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
196 buf := C.rmr_alloc_msg(m.context, 0)
198 Logger.Error("rmrClient: Allocating message buffer failed!")
203 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
210 func (m *RMRClient) SendMsg(params *RMRParams) bool {
211 return m.Send(params, false)
214 func (m *RMRClient) SendRts(params *RMRParams) bool {
215 return m.Send(params, true)
218 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
219 txBuffer := params.Mbuf
221 txBuffer = m.Allocate()
227 txBuffer.mtype = C.int(params.Mtype)
228 txBuffer.sub_id = C.int(params.SubId)
229 txBuffer.len = C.int(len(params.Payload))
230 if params.PayloadLen != 0 {
231 txBuffer.len = C.int(params.PayloadLen)
233 datap := C.CBytes(params.Payload)
237 if params.Meid != nil {
238 b := make([]byte, int(C.RMR_MAX_MEID))
239 copy(b, []byte(params.Meid.RanName))
240 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
242 xidLen := len(params.Xid)
243 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
244 b := make([]byte, int(C.RMR_MAX_XID))
245 copy(b, []byte(params.Xid))
246 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
249 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
251 params.status = m.SendBuf(txBuffer, isRts)
252 if params.status == int(C.RMR_OK) {
258 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
260 currBuffer *C.rmr_mbuf_t
261 counterName string = "Transmitted"
266 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
268 currBuffer = C.rmr_send_msg(m.context, txBuffer)
271 if currBuffer == nil {
272 m.UpdateStatCounter("TransmitError")
273 return m.LogMBufError("SendBuf failed", txBuffer)
276 // Just quick retry seems to help for K8s issue
277 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
278 if maxRetryOnFailure == 0 {
279 maxRetryOnFailure = 5
282 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
284 currBuffer = C.rmr_rts_msg(m.context, currBuffer)
286 currBuffer = C.rmr_send_msg(m.context, currBuffer)
290 if currBuffer.state != C.RMR_OK {
291 counterName = "TransmitError"
292 m.LogMBufError("SendBuf failed", currBuffer)
295 m.UpdateStatCounter(counterName)
298 return int(currBuffer.state)
301 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
302 if params.status == int(C.RMR_ERR_RETRY) {
308 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
309 if params.status == int(C.RMR_ERR_NOENDPT) {
315 func (m *RMRClient) UpdateStatCounter(name string) {
321 func (m *RMRClient) RegisterMetrics() {
322 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
325 func (m *RMRClient) Wait() {
329 func (m *RMRClient) IsReady() bool {
333 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
335 m.readyCbParams = params
338 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
339 id, ok := RICMessageTypes[name]
343 func (m *RMRClient) GetRicMessageName(id int) (s string) {
344 for k, v := range RICMessageTypes {
352 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
353 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
354 return int(mbuf.state)
358 func (m *RMRClient) GetStat() (r RMRStatistics) {