Some code and testing improvement
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "fmt"
41         "github.com/spf13/viper"
42         "strconv"
43         "strings"
44         "time"
45         "unsafe"
46 )
47
48 var RMRCounterOpts = []CounterOpts{
49         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50         {Name: "Received", Help: "The total number of received RMR messages"},
51         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53 }
54
55 var RMRErrors = map[int]string{
56         C.RMR_OK:             "state is good",
57         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
58         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
59         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
60         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
61         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62         C.RMR_ERR_CALLFAILED: "unable to send call() message",
63         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
64         C.RMR_ERR_WHID:       "wormhole id was invalid",
65         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
66         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
68         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
69         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
70         C.RMR_ERR_TRUNC:      "received message likely truncated",
71         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
73 }
74
75 type RMRParams struct {
76         Mtype      int
77         Payload    []byte
78         PayloadLen int
79         Meid       *RMRMeid
80         Xid        string
81         SubId      int
82         Src        string
83         Mbuf       *C.rmr_mbuf_t
84         status     int
85 }
86
87 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
88         p := C.CString(protPort)
89         m := C.int(maxSize)
90         defer C.free(unsafe.Pointer(p))
91
92         ctx := C.rmr_init(p, m, C.int(0))
93         if ctx == nil {
94                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
95         }
96
97         return &RMRClient{
98                 protPort:   protPort,
99                 numWorkers: numWorkers,
100                 context:    ctx,
101                 consumers:  make([]MessageConsumer, 0),
102                 stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
103         }
104 }
105
106 func NewRMRClient() *RMRClient {
107         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
108 }
109
110 func (m *RMRClient) Start(c MessageConsumer) {
111         if c != nil {
112                 m.consumers = append(m.consumers, c)
113         }
114
115         var counter int = 0
116         for {
117                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
118                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
119                         break
120                 }
121                 if counter%10 == 0 {
122                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
123                 }
124                 time.Sleep(1 * time.Second)
125                 counter++
126         }
127         m.wg.Add(m.numWorkers)
128
129         if m.readyCb != nil {
130                 go m.readyCb(m.readyCbParams)
131         }
132
133         for w := 0; w < m.numWorkers; w++ {
134                 go m.Worker("worker-"+strconv.Itoa(w), 0)
135         }
136         m.Wait()
137 }
138
139 func (m *RMRClient) Worker(taskName string, msgSize int) {
140         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
141
142         defer m.wg.Done()
143         for {
144                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
145                 if rxBuffer == nil {
146                         m.LogMBufError("RecvMsg failed", rxBuffer)
147                         m.UpdateStatCounter("ReceiveError")
148                         continue
149                 }
150                 m.UpdateStatCounter("Received")
151
152                 go m.parseMessage(rxBuffer)
153         }
154 }
155
156 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
157         if len(m.consumers) == 0 {
158                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
159                 return
160         }
161
162         params := &RMRParams{}
163         params.Mbuf = rxBuffer
164         params.Mtype = int(rxBuffer.mtype)
165         params.SubId = int(rxBuffer.sub_id)
166         params.Meid = &RMRMeid{}
167
168         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
169         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
170                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
171         }
172
173         xidBuf := make([]byte, int(C.RMR_MAX_XID))
174         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
175                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
176         }
177
178         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
179         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
180                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
181         }
182
183         for _, c := range m.consumers {
184                 cptr := unsafe.Pointer(rxBuffer.payload)
185                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
186                 params.PayloadLen = int(rxBuffer.len)
187
188                 err := c.Consume(params)
189                 if err != nil {
190                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
191                 }
192         }
193 }
194
195 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
196         buf := C.rmr_alloc_msg(m.context, 0)
197         if buf == nil {
198                 Logger.Error("rmrClient: Allocating message buffer failed!")
199         }
200         return buf
201 }
202
203 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
204         if mbuf == nil {
205                 return
206         }
207         C.rmr_free_msg(mbuf)
208 }
209
210 func (m *RMRClient) SendMsg(params *RMRParams) bool {
211         return m.Send(params, false)
212 }
213
214 func (m *RMRClient) SendRts(params *RMRParams) bool {
215         return m.Send(params, true)
216 }
217
218 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
219         txBuffer := params.Mbuf
220         if txBuffer == nil {
221                 txBuffer = m.Allocate()
222                 if txBuffer == nil {
223                         return false
224                 }
225         }
226
227         txBuffer.mtype = C.int(params.Mtype)
228         txBuffer.sub_id = C.int(params.SubId)
229         txBuffer.len = C.int(len(params.Payload))
230         if params.PayloadLen != 0 {
231                 txBuffer.len = C.int(params.PayloadLen)
232         }
233         datap := C.CBytes(params.Payload)
234         defer C.free(datap)
235
236         if params != nil {
237                 if params.Meid != nil {
238                         b := make([]byte, int(C.RMR_MAX_MEID))
239                         copy(b, []byte(params.Meid.RanName))
240                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
241                 }
242                 xidLen := len(params.Xid)
243                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
244                         b := make([]byte, int(C.RMR_MAX_XID))
245                         copy(b, []byte(params.Xid))
246                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
247                 }
248         }
249         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
250
251         params.status = m.SendBuf(txBuffer, isRts)
252         if params.status == int(C.RMR_OK) {
253                 return true
254         }
255         return false
256 }
257
258 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
259         var (
260                 currBuffer  *C.rmr_mbuf_t
261                 counterName string = "Transmitted"
262         )
263
264         txBuffer.state = 0
265         if isRts {
266                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
267         } else {
268                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
269         }
270
271         if currBuffer == nil {
272                 m.UpdateStatCounter("TransmitError")
273                 return m.LogMBufError("SendBuf failed", txBuffer)
274         }
275
276         // Just quick retry seems to help for K8s issue
277         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
278         if maxRetryOnFailure == 0 {
279                 maxRetryOnFailure = 5
280         }
281
282         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
283                 if isRts {
284                         currBuffer = C.rmr_rts_msg(m.context, currBuffer)
285                 } else {
286                         currBuffer = C.rmr_send_msg(m.context, currBuffer)
287                 }
288         }
289
290         if currBuffer.state != C.RMR_OK {
291                 counterName = "TransmitError"
292                 m.LogMBufError("SendBuf failed", currBuffer)
293         }
294
295         m.UpdateStatCounter(counterName)
296         m.Free(currBuffer)
297
298         return int(currBuffer.state)
299 }
300
301 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
302         if params.status == int(C.RMR_ERR_RETRY) {
303                 return true
304         }
305         return false
306 }
307
308 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
309         if params.status == int(C.RMR_ERR_NOENDPT) {
310                 return true
311         }
312         return false
313 }
314
315 func (m *RMRClient) UpdateStatCounter(name string) {
316         m.mux.Lock()
317         m.stat[name].Inc()
318         m.mux.Unlock()
319 }
320
321 func (m *RMRClient) RegisterMetrics() {
322         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
323 }
324
325 func (m *RMRClient) Wait() {
326         m.wg.Wait()
327 }
328
329 func (m *RMRClient) IsReady() bool {
330         return m.ready != 0
331 }
332
333 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
334         m.readyCb = cb
335         m.readyCbParams = params
336 }
337
338 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
339         id, ok := RICMessageTypes[name]
340         return id, ok
341 }
342
343 func (m *RMRClient) GetRicMessageName(id int) (s string) {
344         for k, v := range RICMessageTypes {
345                 if id == v {
346                         return k
347                 }
348         }
349         return
350 }
351
352 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
353         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
354         return int(mbuf.state)
355 }
356
357 // To be removed ...
358 func (m *RMRClient) GetStat() (r RMRStatistics) {
359         return
360 }