2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
40 "github.com/spf13/viper"
47 var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
54 func NewRMRClient() *RMRClient {
55 p := C.CString(viper.GetString("rmr.protPort"))
56 m := C.int(viper.GetInt("rmr.maxSize"))
57 defer C.free(unsafe.Pointer(p))
59 ctx := C.rmr_init(p, m, C.int(0))
61 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
66 consumers: make([]MessageConsumer, 0),
67 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
71 func (m *RMRClient) Start(c MessageConsumer) {
73 m.consumers = append(m.consumers, c)
77 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
79 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
82 time.Sleep(10 * time.Second)
84 m.wg.Add(viper.GetInt("rmr.numWorkers"))
87 go m.readyCb(m.readyCbParams)
90 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
91 go m.Worker("worker-"+strconv.Itoa(w), 0)
96 func (m *RMRClient) Worker(taskName string, msgSize int) {
97 p := viper.GetString("rmr.protPort")
98 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
102 rxBuffer := C.rmr_rcv_msg(m.context, nil)
104 m.UpdateStatCounter("ReceiveError")
107 m.UpdateStatCounter("Received")
109 go m.parseMessage(rxBuffer)
113 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
114 if len(m.consumers) == 0 {
115 Logger.Info("rmrClient: No message handlers defined, message discarded!")
120 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
121 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
122 meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
123 meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
126 for _, c := range m.consumers {
127 cptr := unsafe.Pointer(rxBuffer.payload)
128 payload := C.GoBytes(cptr, C.int(rxBuffer.len))
130 err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), payload, meid)
132 Logger.Warn("rmrClient: Consumer returned error: %v", err)
137 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
138 buf := C.rmr_alloc_msg(m.context, 0)
140 Logger.Error("rmrClient: Allocating message buffer failed!")
146 func (m *RMRClient) Send(mtype int, sid int, payload []byte, meid *RMRMeid) bool {
149 buf.mtype = C.int(mtype)
150 buf.sub_id = C.int(sid)
151 buf.len = C.int(len(payload))
152 datap := C.CBytes(payload)
156 b := make([]byte, int(C.RMR_MAX_MEID))
157 copy(b, []byte(meid.PlmnID))
158 copy(b[16:], []byte(meid.EnbID))
159 C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
161 C.write_bytes_array(buf.payload, datap, buf.len)
163 return m.SendBuf(buf)
166 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool {
167 for i := 0; i < 10; i++ {
169 txBuffer := C.rmr_send_msg(m.context, txBuffer)
172 } else if txBuffer.state != C.RMR_OK {
173 if txBuffer.state != C.RMR_ERR_RETRY {
174 time.Sleep(100 * time.Microsecond)
175 m.UpdateStatCounter("TransmitError")
177 for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
178 txBuffer = C.rmr_send_msg(m.context, txBuffer)
182 if txBuffer.state == C.RMR_OK {
183 m.UpdateStatCounter("Transmitted")
187 m.UpdateStatCounter("TransmitError")
191 func (m *RMRClient) UpdateStatCounter(name string) {
197 func (m *RMRClient) RegisterMetrics() {
198 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
201 func (m *RMRClient) Wait() {
205 func (m *RMRClient) IsReady() bool {
209 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
211 m.readyCbParams = params
214 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
215 id, ok := RICMessageTypes[name]
219 func (m *RMRClient) GetRicMessageName(id int) (s string) {
220 for k, v := range RICMessageTypes {
229 func (m *RMRClient) GetStat() (r RMRStatistics) {