2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
40 "github.com/spf13/viper"
47 var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 type RMRStatistics struct{}
57 type RMRClient struct {
58 context unsafe.Pointer
62 stat map[string]Counter
63 consumers []MessageConsumer
67 type MessageConsumer interface {
68 Consume(mtype int, sid int, len int, payload []byte) error
71 func NewRMRClient() *RMRClient {
72 p := C.CString(viper.GetString("rmr.protPort"))
73 m := C.int(viper.GetInt("rmr.maxSize"))
74 defer C.free(unsafe.Pointer(p))
76 ctx := C.rmr_init(p, m, C.int(0))
78 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
83 consumers: make([]MessageConsumer, 0),
84 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
88 func (m *RMRClient) Start(c MessageConsumer) {
90 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
92 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
95 time.Sleep(10 * time.Second)
97 m.wg.Add(viper.GetInt("rmr.numWorkers"))
100 m.consumers = append(m.consumers, c)
103 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
104 go m.Worker("worker-"+strconv.Itoa(w), 0)
107 if m.readyCb != nil {
114 func (m *RMRClient) Worker(taskName string, msgSize int) {
115 p := viper.GetString("rmr.protPort")
116 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
120 rxBuffer := C.rmr_rcv_msg(m.context, nil)
122 m.UpdateStatCounter("ReceiveError")
125 m.UpdateStatCounter("Received")
127 go m.parseMessage(rxBuffer)
131 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
132 if len(m.consumers) == 0 {
133 Logger.Info("rmrClient: No message handlers defined, message discarded!")
137 for _, c := range m.consumers {
138 cptr := unsafe.Pointer(rxBuffer.payload)
139 payload := C.GoBytes(cptr, C.int(rxBuffer.len))
141 err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), int(rxBuffer.len), payload)
143 Logger.Warn("rmrClient: Consumer returned error: %v", err)
148 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
149 buf := C.rmr_alloc_msg(m.context, 0)
151 Logger.Error("rmrClient: Allocating message buffer failed!")
157 func (m *RMRClient) Send(mtype int, sid int, len int, payload []byte) bool {
160 buf.mtype = C.int(mtype)
161 buf.sub_id = C.int(sid)
163 datap := C.CBytes(payload)
166 C.write_bytes_array(buf.payload, datap, C.int(len))
168 return m.SendBuf(buf)
171 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool {
172 for i := 0; i < 10; i++ {
174 txBuffer := C.rmr_send_msg(m.context, txBuffer)
177 } else if txBuffer.state != C.RMR_OK {
178 if txBuffer.state != C.RMR_ERR_RETRY {
179 time.Sleep(100 * time.Microsecond)
180 m.UpdateStatCounter("TransmitError")
182 for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
183 txBuffer = C.rmr_send_msg(m.context, txBuffer)
187 if txBuffer.state == C.RMR_OK {
188 m.UpdateStatCounter("Transmitted")
192 m.UpdateStatCounter("TransmitError")
196 func (m *RMRClient) UpdateStatCounter(name string) {
202 func (m *RMRClient) RegisterMetrics() {
203 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
207 func (m *RMRClient) GetStat() (r RMRStatistics) {
211 func (m *RMRClient) Wait() {
215 func (m *RMRClient) IsReady() bool {
219 func (m *RMRClient) SetReadyCB(cb ReadyCB) {
223 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
224 id, ok := RICMessageTypes[name]
228 func (m *RMRClient) GetRicMessageName(id int) (s string) {
229 for k, v := range RICMessageTypes {