C rmr context is now protected. Stabilizes rmr usage with multithreads
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "fmt"
68         "github.com/spf13/viper"
69         "strings"
70         "time"
71         "unsafe"
72 )
73
74 var RMRCounterOpts = []CounterOpts{
75         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
76         {Name: "Received", Help: "The total number of received RMR messages"},
77         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
78         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
79 }
80
81 var RMRErrors = map[int]string{
82         C.RMR_OK:             "state is good",
83         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
84         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
85         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
86         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
87         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
88         C.RMR_ERR_CALLFAILED: "unable to send call() message",
89         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
90         C.RMR_ERR_WHID:       "wormhole id was invalid",
91         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
92         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
93         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
94         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
95         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
96         C.RMR_ERR_TRUNC:      "received message likely truncated",
97         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
98         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
99 }
100
101 type RMRParams struct {
102         Mtype      int
103         Payload    []byte
104         PayloadLen int
105         Meid       *RMRMeid
106         Xid        string
107         SubId      int
108         Src        string
109         Mbuf       *C.rmr_mbuf_t
110         Whid       int
111         Callid     int
112         Timeout    int
113         status     int
114 }
115
116 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
117         p := C.CString(protPort)
118         m := C.int(maxSize)
119         c := C.int(threadType)
120         defer C.free(unsafe.Pointer(p))
121         ctx := C.rmr_init(p, m, c)
122         if ctx == nil {
123                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
124         }
125         return &RMRClient{
126                 protPort:  protPort,
127                 context:   ctx,
128                 consumers: make([]MessageConsumer, 0),
129                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
130         }
131 }
132
133 func NewRMRClient() *RMRClient {
134         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
135 }
136
137 func (m *RMRClient) Start(c MessageConsumer) {
138         if c != nil {
139                 m.consumers = append(m.consumers, c)
140         }
141
142         var counter int = 0
143         for {
144                 m.contextMux.Lock()
145                 m.ready = int(C.rmr_ready(m.context))
146                 m.contextMux.Unlock()
147                 if m.ready == 1 {
148                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
149                         break
150                 }
151                 if counter%10 == 0 {
152                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
153                 }
154                 time.Sleep(1 * time.Second)
155                 counter++
156         }
157         m.wg.Add(1)
158
159         if m.readyCb != nil {
160                 go m.readyCb(m.readyCbParams)
161         }
162
163         go func() {
164                 m.contextMux.Lock()
165                 rfd := C.rmr_get_rcvfd(m.context)
166                 m.contextMux.Unlock()
167                 efd := C.init_epoll(rfd)
168
169                 defer m.wg.Done()
170                 for {
171                         if int(C.wait_epoll(efd, rfd)) == 0 {
172                                 continue
173                         }
174                         m.contextMux.Lock()
175                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
176                         m.contextMux.Unlock()
177
178                         if rxBuffer == nil {
179                                 m.LogMBufError("RecvMsg failed", rxBuffer)
180                                 m.UpdateStatCounter("ReceiveError")
181                                 continue
182                         }
183                         m.UpdateStatCounter("Received")
184                         m.parseMessage(rxBuffer)
185                 }
186         }()
187
188         m.wg.Wait()
189 }
190
191 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
192         if len(m.consumers) == 0 {
193                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
194                 return
195         }
196
197         params := &RMRParams{}
198         params.Mbuf = rxBuffer
199         params.Mtype = int(rxBuffer.mtype)
200         params.SubId = int(rxBuffer.sub_id)
201         params.Meid = &RMRMeid{}
202
203         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
204         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
205                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
206         }
207
208         xidBuf := make([]byte, int(C.RMR_MAX_XID))
209         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
210                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
211         }
212
213         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
214         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
215                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
216         }
217
218         // Default case: a single consumer
219         if len(m.consumers) == 1 && m.consumers[0] != nil {
220                 params.PayloadLen = int(rxBuffer.len)
221                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
222                 err := m.consumers[0].Consume(params)
223                 if err != nil {
224                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
225                 }
226                 return
227         }
228
229         // Special case for multiple consumers
230         for _, c := range m.consumers {
231                 cptr := unsafe.Pointer(rxBuffer.payload)
232                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
233                 params.PayloadLen = int(rxBuffer.len)
234                 params.Mtype = int(rxBuffer.mtype)
235                 params.SubId = int(rxBuffer.sub_id)
236
237                 err := c.Consume(params)
238                 if err != nil {
239                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
240                 }
241         }
242 }
243
244 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
245         m.contextMux.Lock()
246         defer m.contextMux.Unlock()
247         buf := C.rmr_alloc_msg(m.context, C.int(size))
248         if buf == nil {
249                 Logger.Error("rmrClient: Allocating message buffer failed!")
250         }
251         return buf
252 }
253
254 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
255         if mbuf == nil {
256                 return
257         }
258         m.contextMux.Lock()
259         defer m.contextMux.Unlock()
260         C.rmr_free_msg(mbuf)
261 }
262
263 func (m *RMRClient) SendMsg(params *RMRParams) bool {
264         return m.Send(params, false)
265 }
266
267 func (m *RMRClient) SendRts(params *RMRParams) bool {
268         return m.Send(params, true)
269 }
270
271 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
272         if params.Mbuf != nil {
273                 m.Free(params.Mbuf)
274                 params.Mbuf = nil
275         }
276
277         payLen := len(params.Payload)
278         if params.PayloadLen != 0 {
279                 payLen = params.PayloadLen
280         }
281
282         txBuffer := m.Allocate(payLen)
283         if txBuffer == nil {
284                 return nil
285         }
286         txBuffer.mtype = C.int(params.Mtype)
287         txBuffer.sub_id = C.int(params.SubId)
288         txBuffer.len = C.int(payLen)
289
290         datap := C.CBytes(params.Payload)
291         defer C.free(datap)
292
293         if params != nil {
294                 if params.Meid != nil {
295                         b := make([]byte, int(C.RMR_MAX_MEID))
296                         copy(b, []byte(params.Meid.RanName))
297                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
298                 }
299                 xidLen := len(params.Xid)
300                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
301                         b := make([]byte, int(C.RMR_MAX_XID))
302                         copy(b, []byte(params.Xid))
303                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
304                 }
305         }
306         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
307         return txBuffer
308 }
309
310 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
311
312         txBuffer := m.CopyBuffer(params)
313         if txBuffer == nil {
314                 return false
315         }
316         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
317         if params.status == int(C.RMR_OK) {
318                 return true
319         }
320         return false
321 }
322
323 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
324         var (
325                 currBuffer  *C.rmr_mbuf_t
326                 counterName string = "Transmitted"
327         )
328
329         m.contextMux.Lock()
330         txBuffer.state = 0
331         if whid != 0 {
332                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
333         } else {
334                 if isRts {
335                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
336                 } else {
337                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
338                 }
339         }
340         m.contextMux.Unlock()
341
342         if currBuffer == nil {
343                 m.UpdateStatCounter("TransmitError")
344                 return m.LogMBufError("SendBuf failed", txBuffer)
345         }
346
347         // Just quick retry seems to help for K8s issue
348         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
349         if maxRetryOnFailure == 0 {
350                 maxRetryOnFailure = 5
351         }
352
353         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
354                 m.contextMux.Lock()
355                 if whid != 0 {
356                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
357                 } else {
358                         if isRts {
359                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
360                         } else {
361                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
362                         }
363                 }
364                 m.contextMux.Unlock()
365         }
366
367         if currBuffer.state != C.RMR_OK {
368                 counterName = "TransmitError"
369                 m.LogMBufError("SendBuf failed", currBuffer)
370         }
371
372         m.UpdateStatCounter(counterName)
373         defer m.Free(currBuffer)
374
375         return int(currBuffer.state)
376 }
377
378 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
379         var (
380                 currBuffer  *C.rmr_mbuf_t
381                 counterName string = "Transmitted"
382         )
383         txBuffer := m.CopyBuffer(params)
384         if txBuffer == nil {
385                 return C.RMR_ERR_INITFAILED, ""
386         }
387
388         txBuffer.state = 0
389
390         m.contextMux.Lock()
391         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
392         m.contextMux.Unlock()
393
394         if currBuffer == nil {
395                 m.UpdateStatCounter("TransmitError")
396                 return m.LogMBufError("SendBuf failed", txBuffer), ""
397         }
398
399         if currBuffer.state != C.RMR_OK {
400                 counterName = "TransmitError"
401                 m.LogMBufError("SendBuf failed", currBuffer)
402         }
403
404         m.UpdateStatCounter(counterName)
405         defer m.Free(currBuffer)
406
407         cptr := unsafe.Pointer(currBuffer.payload)
408         payload := C.GoBytes(cptr, C.int(currBuffer.len))
409
410         return int(currBuffer.state), string(payload)
411 }
412
413 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
414         return m.Wh_open(target)
415 }
416
417 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
418         m.contextMux.Lock()
419         defer m.contextMux.Unlock()
420         endpoint := C.CString(target)
421         return C.rmr_wh_open(m.context, endpoint)
422 }
423
424 func (m *RMRClient) Closewh(whid int) {
425         m.Wh_close(C.rmr_whid_t(whid))
426 }
427
428 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
429         m.contextMux.Lock()
430         defer m.contextMux.Unlock()
431         C.rmr_wh_close(m.context, whid)
432 }
433
434 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
435         if params.status == int(C.RMR_ERR_RETRY) {
436                 return true
437         }
438         return false
439 }
440
441 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
442         if params.status == int(C.RMR_ERR_NOENDPT) {
443                 return true
444         }
445         return false
446 }
447
448 func (m *RMRClient) UpdateStatCounter(name string) {
449         m.mux.Lock()
450         m.stat[name].Inc()
451         m.mux.Unlock()
452 }
453
454 func (m *RMRClient) RegisterMetrics() {
455         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
456 }
457
458 func (m *RMRClient) Wait() {
459         m.wg.Wait()
460 }
461
462 func (m *RMRClient) IsReady() bool {
463         return m.ready != 0
464 }
465
466 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
467         m.readyCb = cb
468         m.readyCbParams = params
469 }
470
471 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
472         id, ok := RICMessageTypes[name]
473         return id, ok
474 }
475
476 func (m *RMRClient) GetRicMessageName(id int) (s string) {
477         for k, v := range RICMessageTypes {
478                 if id == v {
479                         return k
480                 }
481         }
482         return
483 }
484
485 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
486         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
487         return int(mbuf.state)
488 }
489
490 // To be removed ...
491 func (m *RMRClient) GetStat() (r RMRStatistics) {
492         return
493 }