2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 #include <rmr/RIC_message_types.h>
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
35 #cgo LDFLAGS: -lrmr_nng -lnng
40 "github.com/spf13/viper"
47 var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
55 type RMRStatistics struct{}
57 type RMRClient struct {
58 context unsafe.Pointer
62 stat map[string]Counter
63 consumers []MessageConsumer
66 type MessageConsumer interface {
67 Consume(mtype int, sid int, len int, payload []byte) error
70 func NewRMRClient() *RMRClient {
72 r.consumers = make([]MessageConsumer, 0)
74 p := C.CString(viper.GetString("rmr.protPort"))
75 m := C.int(viper.GetInt("rmr.maxSize"))
76 defer C.free(unsafe.Pointer(p))
78 r.context = C.rmr_init(p, m, C.int(0))
80 Logger.Fatal("rmrClient: Initializing RMR context failed, bailing out!")
86 func (m *RMRClient) Start(c MessageConsumer) {
90 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
92 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
95 time.Sleep(10 * time.Second)
97 m.wg.Add(viper.GetInt("rmr.numWorkers"))
100 m.consumers = append(m.consumers, c)
103 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
104 go m.Worker("worker-"+strconv.Itoa(w), 0)
110 func (m *RMRClient) Worker(taskName string, msgSize int) {
111 p := viper.GetString("rmr.protPort")
112 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
116 rxBuffer := C.rmr_rcv_msg(m.context, nil)
118 m.UpdateStatCounter("ReceiveError")
121 m.UpdateStatCounter("Received")
123 go m.parseMessage(rxBuffer)
127 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
128 if len(m.consumers) == 0 {
129 Logger.Info("rmrClient: No message handlers defined, message discarded!")
133 for _, c := range m.consumers {
134 cptr := unsafe.Pointer(rxBuffer.payload)
135 payload := C.GoBytes(cptr, C.int(rxBuffer.len))
137 err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), int(rxBuffer.len), payload)
139 Logger.Warn("rmrClient: Consumer returned error: %v", err)
144 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
145 buf := C.rmr_alloc_msg(m.context, 0)
147 Logger.Fatal("rmrClient: Allocating message buffer failed!")
153 func (m *RMRClient) Send(mtype int, sid int, len int, payload []byte) bool {
156 buf.mtype = C.int(mtype)
157 buf.sub_id = C.int(sid)
159 datap := C.CBytes(payload)
162 C.write_bytes_array(buf.payload, datap, C.int(len))
164 return m.SendBuf(buf)
167 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool {
168 for i := 0; i < 10; i++ {
170 txBuffer := C.rmr_send_msg(m.context, txBuffer)
173 } else if txBuffer.state != C.RMR_OK {
174 if txBuffer.state != C.RMR_ERR_RETRY {
175 time.Sleep(100 * time.Microsecond)
176 m.UpdateStatCounter("TransmitError")
178 for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
179 txBuffer = C.rmr_send_msg(m.context, txBuffer)
183 if txBuffer.state == C.RMR_OK {
184 m.UpdateStatCounter("Transmitted")
188 m.UpdateStatCounter("TransmitError")
192 func (m *RMRClient) UpdateStatCounter(name string) {
198 func (m *RMRClient) RegisterMetrics() {
199 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
203 func (m *RMRClient) GetStat() (r RMRStatistics) {
207 func (m *RMRClient) Wait() {
211 func (m *RMRClient) IsReady() bool {
215 func (m *RMRClient) GetRicMessageId(mid string) int {
216 return RICMessageTypes[mid]