2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
86 func NewRMRClient() *RMRClient {
87 p := C.CString(viper.GetString("rmr.protPort"))
88 m := C.int(viper.GetInt("rmr.maxSize"))
89 defer C.free(unsafe.Pointer(p))
91 ctx := C.rmr_init(p, m, C.int(0))
93 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
98 consumers: make([]MessageConsumer, 0),
99 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
103 func (m *RMRClient) Start(c MessageConsumer) {
105 m.consumers = append(m.consumers, c)
109 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
111 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
114 time.Sleep(10 * time.Second)
116 m.wg.Add(viper.GetInt("rmr.numWorkers"))
118 if m.readyCb != nil {
119 go m.readyCb(m.readyCbParams)
122 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
123 go m.Worker("worker-"+strconv.Itoa(w), 0)
128 func (m *RMRClient) Worker(taskName string, msgSize int) {
129 p := viper.GetString("rmr.protPort")
130 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
134 rxBuffer := C.rmr_rcv_msg(m.context, nil)
136 m.LogMBufError("RecvMsg failed", rxBuffer)
137 m.UpdateStatCounter("ReceiveError")
140 m.UpdateStatCounter("Received")
142 go m.parseMessage(rxBuffer)
146 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
147 if len(m.consumers) == 0 {
148 Logger.Info("rmrClient: No message handlers defined, message discarded!")
152 params := &RMRParams{}
153 params.Mbuf = rxBuffer
154 params.Mtype = int(rxBuffer.mtype)
155 params.SubId = int(rxBuffer.sub_id)
156 params.Meid = &RMRMeid{}
158 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
159 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
160 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
163 xidBuf := make([]byte, int(C.RMR_MAX_XID))
164 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
165 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
168 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
169 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
170 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
173 for _, c := range m.consumers {
174 cptr := unsafe.Pointer(rxBuffer.payload)
175 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
176 params.PayloadLen = int(rxBuffer.len)
178 err := c.Consume(params)
180 Logger.Warn("rmrClient: Consumer returned error: %v", err)
185 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
186 buf := C.rmr_alloc_msg(m.context, 0)
188 Logger.Error("rmrClient: Allocating message buffer failed!")
193 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
200 func (m *RMRClient) SendMsg(params *RMRParams) bool {
201 return m.Send(params, false)
204 func (m *RMRClient) SendRts(params *RMRParams) bool {
205 return m.Send(params, true)
208 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
209 txBuffer := params.Mbuf
211 txBuffer = m.Allocate()
214 txBuffer.mtype = C.int(params.Mtype)
215 txBuffer.sub_id = C.int(params.SubId)
216 txBuffer.len = C.int(len(params.Payload))
217 if params.PayloadLen != 0 {
218 txBuffer.len = C.int(params.PayloadLen)
220 datap := C.CBytes(params.Payload)
224 if params.Meid != nil {
225 b := make([]byte, int(C.RMR_MAX_MEID))
226 copy(b, []byte(params.Meid.RanName))
227 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
229 xidLen := len(params.Xid)
230 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
231 b := make([]byte, int(C.RMR_MAX_XID))
232 copy(b, []byte(params.Xid))
233 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
236 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
238 return m.SendBuf(txBuffer, isRts)
241 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
243 currBuffer *C.rmr_mbuf_t
245 counterName string = "Transmitted"
250 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
252 currBuffer = C.rmr_send_msg(m.context, txBuffer)
255 if currBuffer == nil {
256 m.UpdateStatCounter("TransmitError")
257 return m.LogMBufError("SendBuf failed", txBuffer)
260 // Just quick retry seems to help for K8s issue
261 for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
263 currBuffer = C.rmr_rts_msg(m.context, currBuffer)
265 currBuffer = C.rmr_send_msg(m.context, currBuffer)
269 if currBuffer.state != C.RMR_OK {
270 counterName = "TransmitError"
271 state = m.LogMBufError("SendBuf failed", currBuffer)
274 m.UpdateStatCounter(counterName)
279 func (m *RMRClient) UpdateStatCounter(name string) {
285 func (m *RMRClient) RegisterMetrics() {
286 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
289 func (m *RMRClient) Wait() {
293 func (m *RMRClient) IsReady() bool {
297 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
299 m.readyCbParams = params
302 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
303 id, ok := RICMessageTypes[name]
307 func (m *RMRClient) GetRicMessageName(id int) (s string) {
308 for k, v := range RICMessageTypes {
316 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) bool {
317 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
322 func (m *RMRClient) GetStat() (r RMRStatistics) {