2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
40 "github.com/spf13/viper"
47 var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
54 func NewRMRClient() *RMRClient {
55 p := C.CString(viper.GetString("rmr.protPort"))
56 m := C.int(viper.GetInt("rmr.maxSize"))
57 defer C.free(unsafe.Pointer(p))
59 ctx := C.rmr_init(p, m, C.int(0))
61 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
66 consumers: make([]MessageConsumer, 0),
67 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
71 func (m *RMRClient) Start(c MessageConsumer) {
73 m.consumers = append(m.consumers, c)
77 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
79 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
82 time.Sleep(10 * time.Second)
84 m.wg.Add(viper.GetInt("rmr.numWorkers"))
87 go m.readyCb(m.readyCbParams)
90 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
91 go m.Worker("worker-"+strconv.Itoa(w), 0)
96 func (m *RMRClient) Worker(taskName string, msgSize int) {
97 p := viper.GetString("rmr.protPort")
98 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
102 rxBuffer := C.rmr_rcv_msg(m.context, nil)
104 m.UpdateStatCounter("ReceiveError")
107 m.UpdateStatCounter("Received")
109 go m.parseMessage(rxBuffer)
113 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
114 if len(m.consumers) == 0 {
115 Logger.Info("rmrClient: No message handlers defined, message discarded!")
120 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
121 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
122 meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
123 meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
126 for _, c := range m.consumers {
127 cptr := unsafe.Pointer(rxBuffer.payload)
128 payload := C.GoBytes(cptr, C.int(rxBuffer.len))
130 err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), payload, meid)
132 Logger.Warn("rmrClient: Consumer returned error: %v", err)
137 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
138 buf := C.rmr_alloc_msg(m.context, 0)
140 Logger.Error("rmrClient: Allocating message buffer failed!")
146 func (m *RMRClient) Send(mtype int, sid int, payload []byte, meid *RMRMeid) bool {
147 return m.SendMsg(mtype, sid, meid, payload, len(payload))
150 func (m *RMRClient) SendMsg(mtype int, sid int, meid *RMRMeid, payload []byte, payloadLen int) bool {
153 buf.mtype = C.int(mtype)
154 buf.sub_id = C.int(sid)
155 buf.len = C.int(payloadLen)
156 datap := C.CBytes(payload)
160 b := make([]byte, int(C.RMR_MAX_MEID))
161 copy(b, []byte(meid.PlmnID))
162 copy(b[16:], []byte(meid.EnbID))
163 C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
165 C.write_bytes_array(buf.payload, datap, buf.len)
167 return m.SendBuf(buf)
170 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool {
171 for i := 0; i < 10; i++ {
173 txBuffer := C.rmr_send_msg(m.context, txBuffer)
176 } else if txBuffer.state != C.RMR_OK {
177 if txBuffer.state != C.RMR_ERR_RETRY {
178 time.Sleep(100 * time.Microsecond)
179 m.UpdateStatCounter("TransmitError")
181 for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
182 txBuffer = C.rmr_send_msg(m.context, txBuffer)
186 if txBuffer.state == C.RMR_OK {
187 m.UpdateStatCounter("Transmitted")
191 m.UpdateStatCounter("TransmitError")
195 func (m *RMRClient) UpdateStatCounter(name string) {
201 func (m *RMRClient) RegisterMetrics() {
202 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
205 func (m *RMRClient) Wait() {
209 func (m *RMRClient) IsReady() bool {
213 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
215 m.readyCbParams = params
218 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
219 id, ok := RICMessageTypes[name]
223 func (m *RMRClient) GetRicMessageName(id int) (s string) {
224 for k, v := range RICMessageTypes {
233 func (m *RMRClient) GetStat() (r RMRStatistics) {