2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
86 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
87 p := C.CString(protPort)
89 defer C.free(unsafe.Pointer(p))
91 ctx := C.rmr_init(p, m, C.int(0))
93 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
98 numWorkers: numWorkers,
100 consumers: make([]MessageConsumer, 0),
101 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
105 func NewRMRClient() *RMRClient {
106 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
109 func (m *RMRClient) Start(c MessageConsumer) {
111 m.consumers = append(m.consumers, c)
116 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
117 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
121 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
123 time.Sleep(1 * time.Second)
126 m.wg.Add(m.numWorkers)
128 if m.readyCb != nil {
129 go m.readyCb(m.readyCbParams)
132 for w := 0; w < m.numWorkers; w++ {
133 go m.Worker("worker-"+strconv.Itoa(w), 0)
138 func (m *RMRClient) Worker(taskName string, msgSize int) {
139 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
143 rxBuffer := C.rmr_rcv_msg(m.context, nil)
145 m.LogMBufError("RecvMsg failed", rxBuffer)
146 m.UpdateStatCounter("ReceiveError")
149 m.UpdateStatCounter("Received")
151 go m.parseMessage(rxBuffer)
155 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
156 if len(m.consumers) == 0 {
157 Logger.Info("rmrClient: No message handlers defined, message discarded!")
161 params := &RMRParams{}
162 params.Mbuf = rxBuffer
163 params.Mtype = int(rxBuffer.mtype)
164 params.SubId = int(rxBuffer.sub_id)
165 params.Meid = &RMRMeid{}
167 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
168 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
169 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
172 xidBuf := make([]byte, int(C.RMR_MAX_XID))
173 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
174 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
177 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
178 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
179 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
182 for _, c := range m.consumers {
183 cptr := unsafe.Pointer(rxBuffer.payload)
184 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
185 params.PayloadLen = int(rxBuffer.len)
187 err := c.Consume(params)
189 Logger.Warn("rmrClient: Consumer returned error: %v", err)
194 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
195 buf := C.rmr_alloc_msg(m.context, 0)
197 Logger.Error("rmrClient: Allocating message buffer failed!")
202 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
209 func (m *RMRClient) SendMsg(params *RMRParams) bool {
210 return m.Send(params, false)
213 func (m *RMRClient) SendRts(params *RMRParams) bool {
214 return m.Send(params, true)
217 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
218 txBuffer := params.Mbuf
220 txBuffer = m.Allocate()
226 txBuffer.mtype = C.int(params.Mtype)
227 txBuffer.sub_id = C.int(params.SubId)
228 txBuffer.len = C.int(len(params.Payload))
229 if params.PayloadLen != 0 {
230 txBuffer.len = C.int(params.PayloadLen)
232 datap := C.CBytes(params.Payload)
236 if params.Meid != nil {
237 b := make([]byte, int(C.RMR_MAX_MEID))
238 copy(b, []byte(params.Meid.RanName))
239 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
241 xidLen := len(params.Xid)
242 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
243 b := make([]byte, int(C.RMR_MAX_XID))
244 copy(b, []byte(params.Xid))
245 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
248 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
250 return m.SendBuf(txBuffer, isRts)
253 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
255 currBuffer *C.rmr_mbuf_t
257 counterName string = "Transmitted"
262 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
264 currBuffer = C.rmr_send_msg(m.context, txBuffer)
267 if currBuffer == nil {
268 m.UpdateStatCounter("TransmitError")
269 return m.LogMBufError("SendBuf failed", txBuffer)
272 // Just quick retry seems to help for K8s issue
273 for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
275 currBuffer = C.rmr_rts_msg(m.context, currBuffer)
277 currBuffer = C.rmr_send_msg(m.context, currBuffer)
281 if currBuffer.state != C.RMR_OK {
282 counterName = "TransmitError"
283 state = m.LogMBufError("SendBuf failed", currBuffer)
286 m.UpdateStatCounter(counterName)
291 func (m *RMRClient) UpdateStatCounter(name string) {
297 func (m *RMRClient) RegisterMetrics() {
298 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
301 func (m *RMRClient) Wait() {
305 func (m *RMRClient) IsReady() bool {
309 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
311 m.readyCbParams = params
314 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
315 id, ok := RICMessageTypes[name]
319 func (m *RMRClient) GetRicMessageName(id int) (s string) {
320 for k, v := range RICMessageTypes {
328 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) bool {
329 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
334 func (m *RMRClient) GetStat() (r RMRStatistics) {