2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
40 "github.com/spf13/viper"
47 var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
54 type RMRParams struct {
65 func NewRMRClient() *RMRClient {
66 p := C.CString(viper.GetString("rmr.protPort"))
67 m := C.int(viper.GetInt("rmr.maxSize"))
68 defer C.free(unsafe.Pointer(p))
70 ctx := C.rmr_init(p, m, C.int(0))
72 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
77 consumers: make([]MessageConsumer, 0),
78 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
82 func (m *RMRClient) Start(c MessageConsumer) {
84 m.consumers = append(m.consumers, c)
88 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
90 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
93 time.Sleep(10 * time.Second)
95 m.wg.Add(viper.GetInt("rmr.numWorkers"))
98 go m.readyCb(m.readyCbParams)
101 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
102 go m.Worker("worker-"+strconv.Itoa(w), 0)
107 func (m *RMRClient) Worker(taskName string, msgSize int) {
108 p := viper.GetString("rmr.protPort")
109 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
113 rxBuffer := C.rmr_rcv_msg(m.context, nil)
115 m.UpdateStatCounter("ReceiveError")
118 m.UpdateStatCounter("Received")
120 go m.parseMessage(rxBuffer)
124 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
125 if len(m.consumers) == 0 {
126 Logger.Info("rmrClient: No message handlers defined, message discarded!")
130 params := &RMRParams{}
131 params.Mbuf = rxBuffer
132 params.Mtype = int(rxBuffer.mtype)
133 params.SubId = int(rxBuffer.sub_id)
134 params.Meid = &RMRMeid{}
136 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
137 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
138 params.Meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
139 params.Meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
142 xidBuf := make([]byte, int(C.RMR_MAX_XID))
143 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
144 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
147 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
148 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
149 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
152 for _, c := range m.consumers {
153 cptr := unsafe.Pointer(rxBuffer.payload)
154 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
155 params.PayloadLen = int(rxBuffer.len)
157 err := c.Consume(params)
159 Logger.Warn("rmrClient: Consumer returned error: %v", err)
164 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
165 buf := C.rmr_alloc_msg(m.context, 0)
167 Logger.Error("rmrClient: Allocating message buffer failed!")
173 func (m *RMRClient) SendMsg(params *RMRParams) bool {
174 return m.Send(params, false)
177 func (m *RMRClient) SendRts(params *RMRParams) bool {
178 return m.Send(params, true)
181 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
187 buf.mtype = C.int(params.Mtype)
188 buf.sub_id = C.int(params.SubId)
189 buf.len = C.int(len(params.Payload))
190 datap := C.CBytes(params.Payload)
194 if params.Meid != nil {
195 b := make([]byte, int(C.RMR_MAX_MEID))
196 copy(b, []byte(params.Meid.PlmnID))
197 copy(b[16:], []byte(params.Meid.EnbID))
198 C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
200 xidLen := len(params.Xid)
201 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
202 b := make([]byte, int(C.RMR_MAX_MEID))
203 copy(b, []byte(params.Xid))
204 C.rmr_bytes2xact(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
207 C.write_bytes_array(buf.payload, datap, buf.len)
209 return m.SendBuf(buf, isRts)
212 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
213 for i := 0; i < 10; i++ {
216 txBuffer = C.rmr_rts_msg(m.context, txBuffer)
218 txBuffer = C.rmr_send_msg(m.context, txBuffer)
223 } else if txBuffer.state != C.RMR_OK {
224 if txBuffer.state != C.RMR_ERR_RETRY {
225 time.Sleep(100 * time.Microsecond)
226 m.UpdateStatCounter("TransmitError")
228 for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
229 txBuffer = C.rmr_send_msg(m.context, txBuffer)
233 if txBuffer.state == C.RMR_OK {
234 m.UpdateStatCounter("Transmitted")
238 m.UpdateStatCounter("TransmitError")
242 func (m *RMRClient) UpdateStatCounter(name string) {
248 func (m *RMRClient) RegisterMetrics() {
249 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
252 func (m *RMRClient) Wait() {
256 func (m *RMRClient) IsReady() bool {
260 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
262 m.readyCbParams = params
265 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
266 id, ok := RICMessageTypes[name]
270 func (m *RMRClient) GetRicMessageName(id int) (s string) {
271 for k, v := range RICMessageTypes {
280 func (m *RMRClient) GetStat() (r RMRStatistics) {