07ce55bc9b2affef7d9cce846e5f4ce199a7d361
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "github.com/spf13/viper"
41         "strconv"
42         "strings"
43         "time"
44         "unsafe"
45 )
46
47 var RMRCounterOpts = []CounterOpts{
48         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49         {Name: "Received", Help: "The total number of received RMR messages"},
50         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
52 }
53
54 type RMRParams struct {
55         Mtype      int
56         Payload    []byte
57         PayloadLen int
58         Meid       *RMRMeid
59         Xid        string
60         SubId      int
61         Src        string
62         Mbuf       *C.rmr_mbuf_t
63 }
64
65 func NewRMRClient() *RMRClient {
66         p := C.CString(viper.GetString("rmr.protPort"))
67         m := C.int(viper.GetInt("rmr.maxSize"))
68         defer C.free(unsafe.Pointer(p))
69
70         ctx := C.rmr_init(p, m, C.int(0))
71         if ctx == nil {
72                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
73         }
74
75         return &RMRClient{
76                 context:   ctx,
77                 consumers: make([]MessageConsumer, 0),
78                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
79         }
80 }
81
82 func (m *RMRClient) Start(c MessageConsumer) {
83         if c != nil {
84                 m.consumers = append(m.consumers, c)
85         }
86
87         for {
88                 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
89
90                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
91                         break
92                 }
93                 time.Sleep(10 * time.Second)
94         }
95         m.wg.Add(viper.GetInt("rmr.numWorkers"))
96
97         if m.readyCb != nil {
98                 go m.readyCb(m.readyCbParams)
99         }
100
101         for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
102                 go m.Worker("worker-"+strconv.Itoa(w), 0)
103         }
104         m.Wait()
105 }
106
107 func (m *RMRClient) Worker(taskName string, msgSize int) {
108         p := viper.GetString("rmr.protPort")
109         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
110
111         defer m.wg.Done()
112         for {
113                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
114                 if rxBuffer == nil {
115                         m.UpdateStatCounter("ReceiveError")
116                         continue
117                 }
118                 m.UpdateStatCounter("Received")
119
120                 go m.parseMessage(rxBuffer)
121         }
122 }
123
124 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
125         if len(m.consumers) == 0 {
126                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
127                 return
128         }
129
130         params := &RMRParams{}
131         params.Mbuf = rxBuffer
132         params.Mtype = int(rxBuffer.mtype)
133         params.SubId = int(rxBuffer.sub_id)
134         params.Meid = &RMRMeid{}
135
136         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
137         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
138                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
139         }
140
141         xidBuf := make([]byte, int(C.RMR_MAX_XID))
142         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
143                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
144         }
145
146         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
147         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
148                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
149         }
150
151         for _, c := range m.consumers {
152                 cptr := unsafe.Pointer(rxBuffer.payload)
153                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
154                 params.PayloadLen = int(rxBuffer.len)
155
156                 err := c.Consume(params)
157                 if err != nil {
158                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
159                 }
160         }
161 }
162
163 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
164         buf := C.rmr_alloc_msg(m.context, 0)
165         if buf == nil {
166                 Logger.Error("rmrClient: Allocating message buffer failed!")
167         }
168         return buf
169 }
170
171 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
172         if mbuf == nil {
173                 return
174         }
175         C.rmr_free_msg(mbuf)
176 }
177
178 func (m *RMRClient) SendMsg(params *RMRParams) bool {
179         return m.Send(params, false)
180 }
181
182 func (m *RMRClient) SendRts(params *RMRParams) bool {
183         return m.Send(params, true)
184 }
185
186 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
187         txBuffer := params.Mbuf
188         if txBuffer == nil {
189                 txBuffer = m.Allocate()
190         }
191
192         txBuffer.mtype = C.int(params.Mtype)
193         txBuffer.sub_id = C.int(params.SubId)
194         txBuffer.len = C.int(len(params.Payload))
195         if params.PayloadLen != 0 {
196                 txBuffer.len = C.int(params.PayloadLen)
197         }
198         datap := C.CBytes(params.Payload)
199         defer C.free(datap)
200
201         if params != nil {
202                 if params.Meid != nil {
203                         b := make([]byte, int(C.RMR_MAX_MEID))
204                         copy(b, []byte(params.Meid.RanName))
205                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
206                 }
207                 xidLen := len(params.Xid)
208                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
209                         b := make([]byte, int(C.RMR_MAX_MEID))
210                         copy(b, []byte(params.Xid))
211                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
212                 }
213         }
214         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
215
216         return m.SendBuf(txBuffer, isRts)
217 }
218
219 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
220         var (
221                 currBuffer  *C.rmr_mbuf_t
222                 state       bool   = true
223                 counterName string = "Transmitted"
224         )
225
226         txBuffer.state = 0
227         if isRts {
228                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
229         } else {
230                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
231         }
232
233         if currBuffer == nil {
234                 m.UpdateStatCounter("TransmitError")
235                 return false
236         }
237
238         // Just quick retry seems to help for K8s issue
239         for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
240                 if isRts {
241                         currBuffer = C.rmr_rts_msg(m.context, currBuffer)
242                 } else {
243                         currBuffer = C.rmr_send_msg(m.context, currBuffer)
244                 }
245         }
246
247         if currBuffer.state != C.RMR_OK {
248                 state = false
249                 counterName = "TransmitError"
250         }
251
252         m.UpdateStatCounter(counterName)
253         m.Free(currBuffer)
254         return state
255 }
256
257 func (m *RMRClient) UpdateStatCounter(name string) {
258         m.mux.Lock()
259         m.stat[name].Inc()
260         m.mux.Unlock()
261 }
262
263 func (m *RMRClient) RegisterMetrics() {
264         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
265 }
266
267 func (m *RMRClient) Wait() {
268         m.wg.Wait()
269 }
270
271 func (m *RMRClient) IsReady() bool {
272         return m.ready != 0
273 }
274
275 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
276         m.readyCb = cb
277         m.readyCbParams = params
278 }
279
280 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
281         id, ok := RICMessageTypes[name]
282         return id, ok
283 }
284
285 func (m *RMRClient) GetRicMessageName(id int) (s string) {
286         for k, v := range RICMessageTypes {
287                 if id == v {
288                         return k
289                 }
290         }
291         return
292 }
293
294 // To be removed ...
295 func (m *RMRClient) GetStat() (r RMRStatistics) {
296         return
297 }