2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
40 "github.com/spf13/viper"
47 var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
54 type RMRParams struct {
65 func NewRMRClient() *RMRClient {
66 p := C.CString(viper.GetString("rmr.protPort"))
67 m := C.int(viper.GetInt("rmr.maxSize"))
68 defer C.free(unsafe.Pointer(p))
70 ctx := C.rmr_init(p, m, C.int(0))
72 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
77 consumers: make([]MessageConsumer, 0),
78 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
82 func (m *RMRClient) Start(c MessageConsumer) {
84 m.consumers = append(m.consumers, c)
88 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
90 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
93 time.Sleep(10 * time.Second)
95 m.wg.Add(viper.GetInt("rmr.numWorkers"))
98 go m.readyCb(m.readyCbParams)
101 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
102 go m.Worker("worker-"+strconv.Itoa(w), 0)
107 func (m *RMRClient) Worker(taskName string, msgSize int) {
108 p := viper.GetString("rmr.protPort")
109 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
113 rxBuffer := C.rmr_rcv_msg(m.context, nil)
115 m.UpdateStatCounter("ReceiveError")
118 m.UpdateStatCounter("Received")
120 go m.parseMessage(rxBuffer)
124 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
125 if len(m.consumers) == 0 {
126 Logger.Info("rmrClient: No message handlers defined, message discarded!")
130 params := &RMRParams{}
131 params.Mbuf = rxBuffer
132 params.Mtype = int(rxBuffer.mtype)
133 params.SubId = int(rxBuffer.sub_id)
134 params.Meid = &RMRMeid{}
136 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
137 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
138 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
139 //params.Meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
140 //params.Meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
143 xidBuf := make([]byte, int(C.RMR_MAX_XID))
144 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
145 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
148 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
149 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
150 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
153 for _, c := range m.consumers {
154 cptr := unsafe.Pointer(rxBuffer.payload)
155 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
156 params.PayloadLen = int(rxBuffer.len)
158 err := c.Consume(params)
160 Logger.Warn("rmrClient: Consumer returned error: %v", err)
165 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
166 buf := C.rmr_alloc_msg(m.context, 0)
168 Logger.Error("rmrClient: Allocating message buffer failed!")
173 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
180 func (m *RMRClient) SendMsg(params *RMRParams) bool {
181 return m.Send(params, false)
184 func (m *RMRClient) SendRts(params *RMRParams) bool {
185 return m.Send(params, true)
188 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
189 txBuffer := params.Mbuf
191 txBuffer = m.Allocate()
194 txBuffer.mtype = C.int(params.Mtype)
195 txBuffer.sub_id = C.int(params.SubId)
196 txBuffer.len = C.int(len(params.Payload))
197 if params.PayloadLen != 0 {
198 txBuffer.len = C.int(params.PayloadLen)
200 datap := C.CBytes(params.Payload)
204 if params.Meid != nil {
205 b := make([]byte, int(C.RMR_MAX_MEID))
206 copy(b, []byte(params.Meid.RanName))
207 //copy(b, []byte(params.Meid.PlmnID))
208 //copy(b[16:], []byte(params.Meid.EnbID))
209 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
211 xidLen := len(params.Xid)
212 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
213 b := make([]byte, int(C.RMR_MAX_XID))
214 copy(b, []byte(params.Xid))
215 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
218 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
220 return m.SendBuf(txBuffer, isRts)
223 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
225 currBuffer *C.rmr_mbuf_t
227 counterName string = "Transmitted"
232 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
234 currBuffer = C.rmr_send_msg(m.context, txBuffer)
237 if currBuffer == nil {
238 m.UpdateStatCounter("TransmitError")
242 // Just quick retry seems to help for K8s issue
243 for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
245 currBuffer = C.rmr_rts_msg(m.context, currBuffer)
247 currBuffer = C.rmr_send_msg(m.context, currBuffer)
251 if currBuffer.state != C.RMR_OK {
253 counterName = "TransmitError"
256 m.UpdateStatCounter(counterName)
261 func (m *RMRClient) UpdateStatCounter(name string) {
267 func (m *RMRClient) RegisterMetrics() {
268 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
271 func (m *RMRClient) Wait() {
275 func (m *RMRClient) IsReady() bool {
279 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
281 m.readyCbParams = params
284 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
285 id, ok := RICMessageTypes[name]
289 func (m *RMRClient) GetRicMessageName(id int) (s string) {
290 for k, v := range RICMessageTypes {
299 func (m *RMRClient) GetStat() (r RMRStatistics) {