2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
40 "github.com/spf13/viper"
47 type RMRMbuf C.rmr_mbuf_t
49 var RMRCounterOpts = []CounterOpts{
50 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
51 {Name: "Received", Help: "The total number of received RMR messages"},
52 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
53 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
56 type RMRParams struct {
67 func NewRMRClient() *RMRClient {
68 p := C.CString(viper.GetString("rmr.protPort"))
69 m := C.int(viper.GetInt("rmr.maxSize"))
70 defer C.free(unsafe.Pointer(p))
72 ctx := C.rmr_init(p, m, C.int(0))
74 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
79 consumers: make([]MessageConsumer, 0),
80 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
84 func (m *RMRClient) Start(c MessageConsumer) {
86 m.consumers = append(m.consumers, c)
90 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
92 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
95 time.Sleep(10 * time.Second)
97 m.wg.Add(viper.GetInt("rmr.numWorkers"))
100 go m.readyCb(m.readyCbParams)
103 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
104 go m.Worker("worker-"+strconv.Itoa(w), 0)
109 func (m *RMRClient) Worker(taskName string, msgSize int) {
110 p := viper.GetString("rmr.protPort")
111 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
115 rxBuffer := (*RMRMbuf)(C.rmr_rcv_msg(m.context, nil))
117 m.UpdateStatCounter("ReceiveError")
120 m.UpdateStatCounter("Received")
122 go m.parseMessage(rxBuffer)
126 func (m *RMRClient) parseMessage(rxBuffer *RMRMbuf) {
127 if len(m.consumers) == 0 {
128 Logger.Info("rmrClient: No message handlers defined, message discarded!")
132 params := &RMRParams{}
133 params.Mbuf = rxBuffer
134 params.Mtype = int(rxBuffer.mtype)
135 params.SubId = int(rxBuffer.sub_id)
136 params.Meid = &RMRMeid{}
138 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
139 if meidCstr := C.rmr_get_meid((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
140 params.Meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
141 params.Meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
144 xidBuf := make([]byte, int(C.RMR_MAX_XID))
145 if xidCstr := C.rmr_get_xact((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
146 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
149 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
150 if srcStr := C.rmr_get_src((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
151 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
154 for _, c := range m.consumers {
155 cptr := unsafe.Pointer(rxBuffer.payload)
156 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
157 params.PayloadLen = int(rxBuffer.len)
159 err := c.Consume(params)
161 Logger.Warn("rmrClient: Consumer returned error: %v", err)
166 func (m *RMRClient) Allocate() *RMRMbuf {
167 buf := C.rmr_alloc_msg(m.context, 0)
169 Logger.Error("rmrClient: Allocating message buffer failed!")
172 return (*RMRMbuf)(buf)
175 func (m *RMRClient) SendMsg(params *RMRParams) bool {
176 return m.Send(params, false)
179 func (m *RMRClient) SendRts(params *RMRParams) bool {
180 return m.Send(params, true)
183 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
189 buf.mtype = C.int(params.Mtype)
190 buf.sub_id = C.int(params.SubId)
191 buf.len = C.int(len(params.Payload))
192 datap := C.CBytes(params.Payload)
196 if params.Meid != nil {
197 b := make([]byte, int(C.RMR_MAX_MEID))
198 copy(b, []byte(params.Meid.PlmnID))
199 copy(b[16:], []byte(params.Meid.EnbID))
200 C.rmr_bytes2meid((*C.rmr_mbuf_t)(buf), (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
202 xidLen := len(params.Xid)
203 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
204 b := make([]byte, int(C.RMR_MAX_MEID))
205 copy(b, []byte(params.Xid))
206 C.rmr_bytes2xact((*C.rmr_mbuf_t)(buf), (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
209 C.write_bytes_array(buf.payload, datap, buf.len)
211 return m.SendBuf(buf, isRts)
214 func (m *RMRClient) SendBuf(txBuffer *RMRMbuf, isRts bool) bool {
215 for i := 0; i < 10; i++ {
218 txBuffer = (*RMRMbuf)(C.rmr_rts_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
220 txBuffer = (*RMRMbuf)(C.rmr_send_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
225 } else if txBuffer.state != C.RMR_OK {
226 if txBuffer.state != C.RMR_ERR_RETRY {
227 time.Sleep(100 * time.Microsecond)
228 m.UpdateStatCounter("TransmitError")
230 for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
231 txBuffer = (*RMRMbuf)(C.rmr_send_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
235 if txBuffer.state == C.RMR_OK {
236 m.UpdateStatCounter("Transmitted")
240 m.UpdateStatCounter("TransmitError")
244 func (m *RMRClient) UpdateStatCounter(name string) {
250 func (m *RMRClient) RegisterMetrics() {
251 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
254 func (m *RMRClient) Wait() {
258 func (m *RMRClient) IsReady() bool {
262 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
264 m.readyCbParams = params
267 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
268 id, ok := RICMessageTypes[name]
272 func (m *RMRClient) GetRicMessageName(id int) (s string) {
273 for k, v := range RICMessageTypes {
282 func (m *RMRClient) GetStat() (r RMRStatistics) {