e910ec1fbc7bd45786af901df3dd6be75b3b3af9
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "github.com/spf13/viper"
41         "strconv"
42         "sync"
43         "time"
44         "unsafe"
45 )
46
47 var RMRCounterOpts = []CounterOpts{
48         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49         {Name: "Received", Help: "The total number of received RMR messages"},
50         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
52 }
53
54 // To be removed ...
55 type RMRStatistics struct{}
56
57 type RMRClient struct {
58         context   unsafe.Pointer
59         ready     int
60         wg        sync.WaitGroup
61         mux       sync.Mutex
62         stat      map[string]Counter
63         consumers []MessageConsumer
64         readyCb   ReadyCB
65 }
66
67 type MessageConsumer interface {
68         Consume(mtype int, sid int, len int, payload []byte) error
69 }
70
71 func NewRMRClient() *RMRClient {
72         p := C.CString(viper.GetString("rmr.protPort"))
73         m := C.int(viper.GetInt("rmr.maxSize"))
74         defer C.free(unsafe.Pointer(p))
75
76         ctx := C.rmr_init(p, m, C.int(0))
77         if ctx == nil {
78                 Logger.Fatal("rmrClient: Initializing RMR context failed, bailing out!")
79         }
80
81         return &RMRClient{
82                 context: ctx,
83                 consumers: make([]MessageConsumer, 0),
84                 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
85         }
86 }
87
88 func (m *RMRClient) Start(c MessageConsumer) {
89         for {
90                 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
91
92                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
93                         break
94                 }
95                 time.Sleep(10 * time.Second)
96         }
97         m.wg.Add(viper.GetInt("rmr.numWorkers"))
98
99         if c != nil {
100                 m.consumers = append(m.consumers, c)
101         }
102
103         for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
104                 go m.Worker("worker-"+strconv.Itoa(w), 0)
105         }
106
107         if m.readyCb != nil {
108                 m.readyCb()
109         }
110
111         m.Wait()
112 }
113
114 func (m *RMRClient) Worker(taskName string, msgSize int) {
115         p := viper.GetString("rmr.protPort")
116         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
117
118         defer m.wg.Done()
119         for {
120                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
121                 if rxBuffer == nil {
122                         m.UpdateStatCounter("ReceiveError")
123                         continue
124                 }
125                 m.UpdateStatCounter("Received")
126
127                 go m.parseMessage(rxBuffer)
128         }
129 }
130
131 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
132         if len(m.consumers) == 0 {
133                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
134                 return
135         }
136
137         for _, c := range m.consumers {
138                 cptr := unsafe.Pointer(rxBuffer.payload)
139                 payload := C.GoBytes(cptr, C.int(rxBuffer.len))
140
141                 err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), int(rxBuffer.len), payload)
142                 if err != nil {
143                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
144                 }
145         }
146 }
147
148 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
149         buf := C.rmr_alloc_msg(m.context, 0)
150         if buf == nil {
151                 Logger.Fatal("rmrClient: Allocating message buffer failed!")
152         }
153
154         return buf
155 }
156
157 func (m *RMRClient) Send(mtype int, sid int, len int, payload []byte) bool {
158         buf := m.Allocate()
159
160         buf.mtype = C.int(mtype)
161         buf.sub_id = C.int(sid)
162         buf.len = C.int(len)
163         datap := C.CBytes(payload)
164         defer C.free(datap)
165
166         C.write_bytes_array(buf.payload, datap, C.int(len))
167
168         return m.SendBuf(buf)
169 }
170
171 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool {
172         for i := 0; i < 10; i++ {
173                 txBuffer.state = 0
174                 txBuffer := C.rmr_send_msg(m.context, txBuffer)
175                 if txBuffer == nil {
176                         break
177                 } else if txBuffer.state != C.RMR_OK {
178                         if txBuffer.state != C.RMR_ERR_RETRY {
179                                 time.Sleep(100 * time.Microsecond)
180                                 m.UpdateStatCounter("TransmitError")
181                         }
182                         for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
183                                 txBuffer = C.rmr_send_msg(m.context, txBuffer)
184                         }
185                 }
186
187                 if txBuffer.state == C.RMR_OK {
188                         m.UpdateStatCounter("Transmitted")
189                         return true
190                 }
191         }
192         m.UpdateStatCounter("TransmitError")
193         return false
194 }
195
196 func (m *RMRClient) UpdateStatCounter(name string) {
197         m.mux.Lock()
198         m.stat[name].Inc()
199         m.mux.Unlock()
200 }
201
202 func (m *RMRClient) RegisterMetrics() {
203         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
204 }
205
206 // To be removed ...
207 func (m *RMRClient) GetStat() (r RMRStatistics) {
208         return
209 }
210
211 func (m *RMRClient) Wait() {
212         m.wg.Wait()
213 }
214
215 func (m *RMRClient) IsReady() bool {
216         return m.ready != 0
217 }
218
219 func (m *RMRClient) GetRicMessageId(mid string) int {
220         return RICMessageTypes[mid]
221 }
222
223 func (m *RMRClient) SetReadyCB(cb ReadyCB) {
224         m.readyCb = cb
225 }