2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
41 "github.com/spf13/viper"
48 var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
75 type RMRParams struct {
86 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
87 p := C.CString(protPort)
89 defer C.free(unsafe.Pointer(p))
91 ctx := C.rmr_init(p, m, C.int(0))
93 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
98 numWorkers: numWorkers,
100 consumers: make([]MessageConsumer, 0),
101 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
105 func NewRMRClient() *RMRClient {
106 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
109 func (m *RMRClient) Start(c MessageConsumer) {
111 m.consumers = append(m.consumers, c)
115 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
117 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
120 time.Sleep(10 * time.Second)
122 m.wg.Add(m.numWorkers)
124 if m.readyCb != nil {
125 go m.readyCb(m.readyCbParams)
128 for w := 0; w < m.numWorkers; w++ {
129 go m.Worker("worker-"+strconv.Itoa(w), 0)
134 func (m *RMRClient) Worker(taskName string, msgSize int) {
135 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
139 rxBuffer := C.rmr_rcv_msg(m.context, nil)
141 m.LogMBufError("RecvMsg failed", rxBuffer)
142 m.UpdateStatCounter("ReceiveError")
145 m.UpdateStatCounter("Received")
147 go m.parseMessage(rxBuffer)
151 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
152 if len(m.consumers) == 0 {
153 Logger.Info("rmrClient: No message handlers defined, message discarded!")
157 params := &RMRParams{}
158 params.Mbuf = rxBuffer
159 params.Mtype = int(rxBuffer.mtype)
160 params.SubId = int(rxBuffer.sub_id)
161 params.Meid = &RMRMeid{}
163 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
164 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
165 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
168 xidBuf := make([]byte, int(C.RMR_MAX_XID))
169 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
170 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
173 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
174 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
175 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
178 for _, c := range m.consumers {
179 cptr := unsafe.Pointer(rxBuffer.payload)
180 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
181 params.PayloadLen = int(rxBuffer.len)
183 err := c.Consume(params)
185 Logger.Warn("rmrClient: Consumer returned error: %v", err)
190 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
191 buf := C.rmr_alloc_msg(m.context, 0)
193 Logger.Error("rmrClient: Allocating message buffer failed!")
198 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
205 func (m *RMRClient) SendMsg(params *RMRParams) bool {
206 return m.Send(params, false)
209 func (m *RMRClient) SendRts(params *RMRParams) bool {
210 return m.Send(params, true)
213 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
214 txBuffer := params.Mbuf
216 txBuffer = m.Allocate()
222 txBuffer.mtype = C.int(params.Mtype)
223 txBuffer.sub_id = C.int(params.SubId)
224 txBuffer.len = C.int(len(params.Payload))
225 if params.PayloadLen != 0 {
226 txBuffer.len = C.int(params.PayloadLen)
228 datap := C.CBytes(params.Payload)
232 if params.Meid != nil {
233 b := make([]byte, int(C.RMR_MAX_MEID))
234 copy(b, []byte(params.Meid.RanName))
235 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
237 xidLen := len(params.Xid)
238 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
239 b := make([]byte, int(C.RMR_MAX_XID))
240 copy(b, []byte(params.Xid))
241 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
244 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
246 return m.SendBuf(txBuffer, isRts)
249 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
251 currBuffer *C.rmr_mbuf_t
253 counterName string = "Transmitted"
258 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
260 currBuffer = C.rmr_send_msg(m.context, txBuffer)
263 if currBuffer == nil {
264 m.UpdateStatCounter("TransmitError")
265 return m.LogMBufError("SendBuf failed", txBuffer)
268 // Just quick retry seems to help for K8s issue
269 for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
271 currBuffer = C.rmr_rts_msg(m.context, currBuffer)
273 currBuffer = C.rmr_send_msg(m.context, currBuffer)
277 if currBuffer.state != C.RMR_OK {
278 counterName = "TransmitError"
279 state = m.LogMBufError("SendBuf failed", currBuffer)
282 m.UpdateStatCounter(counterName)
287 func (m *RMRClient) UpdateStatCounter(name string) {
293 func (m *RMRClient) RegisterMetrics() {
294 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
297 func (m *RMRClient) Wait() {
301 func (m *RMRClient) IsReady() bool {
305 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
307 m.readyCbParams = params
310 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
311 id, ok := RICMessageTypes[name]
315 func (m *RMRClient) GetRicMessageName(id int) (s string) {
316 for k, v := range RICMessageTypes {
324 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) bool {
325 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
330 func (m *RMRClient) GetStat() (r RMRStatistics) {