2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
40 "github.com/spf13/viper"
47 var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
54 type RMRParams struct {
65 func NewRMRClient() *RMRClient {
66 p := C.CString(viper.GetString("rmr.protPort"))
67 m := C.int(viper.GetInt("rmr.maxSize"))
68 defer C.free(unsafe.Pointer(p))
70 ctx := C.rmr_init(p, m, C.int(0))
72 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
77 consumers: make([]MessageConsumer, 0),
78 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
82 func (m *RMRClient) Start(c MessageConsumer) {
84 m.consumers = append(m.consumers, c)
88 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
90 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
93 time.Sleep(10 * time.Second)
95 m.wg.Add(viper.GetInt("rmr.numWorkers"))
98 go m.readyCb(m.readyCbParams)
101 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
102 go m.Worker("worker-"+strconv.Itoa(w), 0)
107 func (m *RMRClient) Worker(taskName string, msgSize int) {
108 p := viper.GetString("rmr.protPort")
109 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
113 rxBuffer := C.rmr_rcv_msg(m.context, nil)
115 m.UpdateStatCounter("ReceiveError")
118 m.UpdateStatCounter("Received")
120 go m.parseMessage(rxBuffer)
124 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
125 if len(m.consumers) == 0 {
126 Logger.Info("rmrClient: No message handlers defined, message discarded!")
130 params := &RMRParams{}
131 params.Mbuf = rxBuffer
132 params.Mtype = int(rxBuffer.mtype)
133 params.SubId = int(rxBuffer.sub_id)
134 params.Meid = &RMRMeid{}
136 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
137 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
138 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
141 xidBuf := make([]byte, int(C.RMR_MAX_XID))
142 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
143 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
146 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
147 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
148 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
151 for _, c := range m.consumers {
152 cptr := unsafe.Pointer(rxBuffer.payload)
153 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
154 params.PayloadLen = int(rxBuffer.len)
156 err := c.Consume(params)
158 Logger.Warn("rmrClient: Consumer returned error: %v", err)
163 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
164 buf := C.rmr_alloc_msg(m.context, 0)
166 Logger.Error("rmrClient: Allocating message buffer failed!")
171 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
173 Logger.Error("rmrClient: Can't free mbuffer, given nil pointer")
179 func (m *RMRClient) SendMsg(params *RMRParams) bool {
180 return m.Send(params, false)
183 func (m *RMRClient) SendRts(params *RMRParams) bool {
184 return m.Send(params, true)
187 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
193 buf.mtype = C.int(params.Mtype)
194 buf.sub_id = C.int(params.SubId)
195 buf.len = C.int(len(params.Payload))
196 if params.PayloadLen != 0 {
197 buf.len = C.int(params.PayloadLen)
199 datap := C.CBytes(params.Payload)
203 if params.Meid != nil {
204 b := make([]byte, int(C.RMR_MAX_MEID))
205 copy(b, []byte(params.Meid.RanName))
206 C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
208 xidLen := len(params.Xid)
209 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
210 b := make([]byte, int(C.RMR_MAX_MEID))
211 copy(b, []byte(params.Xid))
212 C.rmr_bytes2xact(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
215 C.write_bytes_array(buf.payload, datap, buf.len)
217 return m.SendBuf(buf, isRts)
220 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
221 defer C.rmr_free_msg(txBuffer)
222 var currBuffer *C.rmr_mbuf_t
224 for i := 0; i < 10; i++ {
227 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
229 currBuffer = C.rmr_send_msg(m.context, txBuffer)
232 if currBuffer == nil {
234 } else if currBuffer.state != C.RMR_OK {
235 if currBuffer.state != C.RMR_ERR_RETRY {
236 time.Sleep(100 * time.Microsecond)
237 m.UpdateStatCounter("TransmitError")
239 for j := 0; j < 100 && currBuffer.state == C.RMR_ERR_RETRY; j++ {
240 currBuffer = C.rmr_send_msg(m.context, txBuffer)
244 if currBuffer.state == C.RMR_OK {
245 m.UpdateStatCounter("Transmitted")
249 m.UpdateStatCounter("TransmitError")
253 func (m *RMRClient) UpdateStatCounter(name string) {
259 func (m *RMRClient) RegisterMetrics() {
260 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
263 func (m *RMRClient) Wait() {
267 func (m *RMRClient) IsReady() bool {
271 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
273 m.readyCbParams = params
276 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
277 id, ok := RICMessageTypes[name]
281 func (m *RMRClient) GetRicMessageName(id int) (s string) {
282 for k, v := range RICMessageTypes {
291 func (m *RMRClient) GetStat() (r RMRStatistics) {