Code violation fix
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "github.com/spf13/viper"
71         "strings"
72         "time"
73         "unsafe"
74 )
75
76 var RMRCounterOpts = []CounterOpts{
77         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78         {Name: "Received", Help: "The total number of received RMR messages"},
79         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
81 }
82
83 var RMRErrors = map[int]string{
84         C.RMR_OK:             "state is good",
85         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
86         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
87         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
88         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
89         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90         C.RMR_ERR_CALLFAILED: "unable to send call() message",
91         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
92         C.RMR_ERR_WHID:       "wormhole id was invalid",
93         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
94         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
96         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
97         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
98         C.RMR_ERR_TRUNC:      "received message likely truncated",
99         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
101 }
102
103 //-----------------------------------------------------------------------------
104 //
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
107         Mtype      int
108         Payload    []byte
109         PayloadLen int
110         Meid       *RMRMeid
111         Xid        string
112         SubId      int
113         Src        string
114         Mbuf       *C.rmr_mbuf_t
115         Whid       int
116         Callid     int
117         Timeout    int
118         status     int
119 }
120
121 func (params *RMRParams) String() string {
122         var b bytes.Buffer
123         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
124         return b.String()
125 }
126
127 //-----------------------------------------------------------------------------
128 //
129 //-----------------------------------------------------------------------------
130 type RMRClientParams struct {
131         ProtPort   string
132         MaxSize    int
133         ThreadType int
134         StatDesc   string
135         LowLatency bool
136         FastAck    bool
137 }
138
139 func (params *RMRClientParams) String() string {
140         return fmt.Sprintf("ProtPort=%s MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
141                 params.ProtPort, params.MaxSize, params.ThreadType, params.StatDesc, params.LowLatency, params.FastAck)
142 }
143
144 //-----------------------------------------------------------------------------
145 //
146 //-----------------------------------------------------------------------------
147 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
148         p := C.CString(params.ProtPort)
149         m := C.int(params.MaxSize)
150         c := C.int(params.ThreadType)
151         defer C.free(unsafe.Pointer(p))
152         ctx := C.rmr_init(p, m, c)
153         if ctx == nil {
154                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
155         }
156
157         Logger.Info("new rmrClient with parameters: %s", params.String())
158
159         if params.LowLatency {
160                 C.rmr_set_low_latency(ctx)
161         }
162         if params.FastAck {
163                 C.rmr_set_fack(ctx)
164         }
165
166         return &RMRClient{
167                 protPort:  params.ProtPort,
168                 context:   ctx,
169                 consumers: make([]MessageConsumer, 0),
170                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
171         }
172 }
173
174 func NewRMRClient() *RMRClient {
175         return NewRMRClientWithParams(
176                 &RMRClientParams{
177                         ProtPort:   viper.GetString("rmr.protPort"),
178                         MaxSize:    viper.GetInt("rmr.maxSize"),
179                         ThreadType: viper.GetInt("rmr.threadType"),
180                         StatDesc:   "RMR",
181                         LowLatency: viper.GetBool("rmr.lowLatency"),
182                         FastAck:    viper.GetBool("rmr.fastAck"),
183                 })
184 }
185
186 func (m *RMRClient) Start(c MessageConsumer) {
187         if c != nil {
188                 m.consumers = append(m.consumers, c)
189         }
190
191         var counter int = 0
192         for {
193                 m.contextMux.Lock()
194                 m.ready = int(C.rmr_ready(m.context))
195                 m.contextMux.Unlock()
196                 if m.ready == 1 {
197                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
198                         break
199                 }
200                 if counter%10 == 0 {
201                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
202                 }
203                 time.Sleep(1 * time.Second)
204                 counter++
205         }
206         m.wg.Add(1)
207
208         if m.readyCb != nil {
209                 go m.readyCb(m.readyCbParams)
210         }
211
212         go func() {
213                 m.contextMux.Lock()
214                 rfd := C.rmr_get_rcvfd(m.context)
215                 m.contextMux.Unlock()
216                 efd := C.init_epoll(rfd)
217
218                 defer m.wg.Done()
219                 for {
220                         if int(C.wait_epoll(efd, rfd)) == 0 {
221                                 continue
222                         }
223                         m.contextMux.Lock()
224                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
225                         m.contextMux.Unlock()
226
227                         if rxBuffer == nil {
228                                 m.LogMBufError("RecvMsg failed", rxBuffer)
229                                 m.UpdateStatCounter("ReceiveError")
230                                 continue
231                         }
232                         m.UpdateStatCounter("Received")
233                         m.parseMessage(rxBuffer)
234                 }
235         }()
236
237         m.wg.Wait()
238 }
239
240 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
241         if len(m.consumers) == 0 {
242                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
243                 return
244         }
245
246         params := &RMRParams{}
247         params.Mbuf = rxBuffer
248         params.Mtype = int(rxBuffer.mtype)
249         params.SubId = int(rxBuffer.sub_id)
250         params.Meid = &RMRMeid{}
251
252         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
253         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
254                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
255         }
256
257         xidBuf := make([]byte, int(C.RMR_MAX_XID))
258         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
259                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
260         }
261
262         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
263         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
264                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
265         }
266
267         // Default case: a single consumer
268         if len(m.consumers) == 1 && m.consumers[0] != nil {
269                 params.PayloadLen = int(rxBuffer.len)
270                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
271                 err := m.consumers[0].Consume(params)
272                 if err != nil {
273                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
274                 }
275                 return
276         }
277
278         // Special case for multiple consumers
279         for _, c := range m.consumers {
280                 cptr := unsafe.Pointer(rxBuffer.payload)
281                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
282                 params.PayloadLen = int(rxBuffer.len)
283                 params.Mtype = int(rxBuffer.mtype)
284                 params.SubId = int(rxBuffer.sub_id)
285
286                 err := c.Consume(params)
287                 if err != nil {
288                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
289                 }
290         }
291 }
292
293 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
294         m.contextMux.Lock()
295         defer m.contextMux.Unlock()
296         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
297         if outbuf == nil {
298                 Logger.Error("rmrClient: Allocating message buffer failed!")
299         }
300         return outbuf
301 }
302
303 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
304         m.contextMux.Lock()
305         defer m.contextMux.Unlock()
306         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
307         if outbuf == nil {
308                 Logger.Error("rmrClient: Allocating message buffer failed!")
309         }
310         return outbuf
311 }
312
313 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
314         if mbuf == nil {
315                 return
316         }
317         m.contextMux.Lock()
318         defer m.contextMux.Unlock()
319         C.rmr_free_msg(mbuf)
320 }
321
322 func (m *RMRClient) SendMsg(params *RMRParams) bool {
323         return m.Send(params, false)
324 }
325
326 func (m *RMRClient) SendRts(params *RMRParams) bool {
327         return m.Send(params, true)
328 }
329
330 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
331         status := m.Send(params, isRts)
332         i := 0
333         for ; i < int(to)*2 && status == false; i++ {
334                 status = m.Send(params, isRts)
335                 if status == false {
336                         time.Sleep(500 * time.Millisecond)
337                 }
338         }
339         if status == false {
340                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
341                 if params.Mbuf != nil {
342                         m.Free(params.Mbuf)
343                         params.Mbuf = nil
344                 }
345         }
346         return
347 }
348
349 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
350
351         if params == nil {
352                 return nil
353         }
354
355         payLen := len(params.Payload)
356         if params.PayloadLen != 0 {
357                 payLen = params.PayloadLen
358         }
359
360         txBuffer := params.Mbuf
361         params.Mbuf = nil
362
363         if txBuffer != nil {
364                 txBuffer = m.ReAllocate(txBuffer, payLen)
365         } else {
366                 txBuffer = m.Allocate(payLen)
367         }
368
369         if txBuffer == nil {
370                 return nil
371         }
372         txBuffer.mtype = C.int(params.Mtype)
373         txBuffer.sub_id = C.int(params.SubId)
374         txBuffer.len = C.int(payLen)
375
376         datap := C.CBytes(params.Payload)
377         defer C.free(datap)
378
379         if params.Meid != nil {
380                 b := make([]byte, int(C.RMR_MAX_MEID))
381                 copy(b, []byte(params.Meid.RanName))
382                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
383         }
384
385         xidLen := len(params.Xid)
386         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
387                 b := make([]byte, int(C.RMR_MAX_XID))
388                 copy(b, []byte(params.Xid))
389                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
390         }
391
392         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
393
394         return txBuffer
395 }
396
397 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
398
399         txBuffer := m.CopyBuffer(params)
400         if txBuffer == nil {
401                 return false
402         }
403         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
404         if params.status == int(C.RMR_OK) {
405                 return true
406         }
407         return false
408 }
409
410 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
411         var (
412                 currBuffer *C.rmr_mbuf_t
413         )
414
415         m.contextMux.Lock()
416         txBuffer.state = 0
417         if whid != 0 {
418                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
419         } else {
420                 if isRts {
421                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
422                 } else {
423                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
424                 }
425         }
426         m.contextMux.Unlock()
427
428         if currBuffer == nil {
429                 m.UpdateStatCounter("TransmitError")
430                 return m.LogMBufError("SendBuf failed", txBuffer)
431         }
432
433         // Just quick retry seems to help for K8s issue
434         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
435         if maxRetryOnFailure == 0 {
436                 maxRetryOnFailure = 5
437         }
438
439         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
440                 m.contextMux.Lock()
441                 if whid != 0 {
442                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
443                 } else {
444                         if isRts {
445                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
446                         } else {
447                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
448                         }
449                 }
450                 m.contextMux.Unlock()
451         }
452
453         if currBuffer == nil {
454                 m.UpdateStatCounter("TransmitError")
455                 m.LogMBufError("SendBuf failed", currBuffer)
456                 return int(C.RMR_ERR_INITFAILED)
457         }
458
459         if currBuffer.state != C.RMR_OK {
460                 m.UpdateStatCounter("TransmitError")
461                 m.LogMBufError("SendBuf failed", currBuffer)
462         } else {
463                 m.UpdateStatCounter("Transmitted")
464         }
465         defer m.Free(currBuffer)
466         return int(currBuffer.state)
467
468 }
469
470 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
471         var (
472                 currBuffer  *C.rmr_mbuf_t
473                 counterName string = "Transmitted"
474         )
475         txBuffer := m.CopyBuffer(params)
476         if txBuffer == nil {
477                 return C.RMR_ERR_INITFAILED, ""
478         }
479
480         txBuffer.state = 0
481
482         m.contextMux.Lock()
483         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
484         m.contextMux.Unlock()
485
486         if currBuffer == nil {
487                 m.UpdateStatCounter("TransmitError")
488                 return m.LogMBufError("SendBuf failed", txBuffer), ""
489         }
490
491         if currBuffer.state != C.RMR_OK {
492                 counterName = "TransmitError"
493                 m.LogMBufError("SendBuf failed", currBuffer)
494         }
495
496         m.UpdateStatCounter(counterName)
497         defer m.Free(currBuffer)
498
499         cptr := unsafe.Pointer(currBuffer.payload)
500         payload := C.GoBytes(cptr, C.int(currBuffer.len))
501
502         return int(currBuffer.state), string(payload)
503 }
504
505 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
506         return m.Wh_open(target)
507 }
508
509 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
510         m.contextMux.Lock()
511         defer m.contextMux.Unlock()
512         endpoint := C.CString(target)
513         return C.rmr_wh_open(m.context, endpoint)
514 }
515
516 func (m *RMRClient) Closewh(whid int) {
517         m.Wh_close(C.rmr_whid_t(whid))
518 }
519
520 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
521         m.contextMux.Lock()
522         defer m.contextMux.Unlock()
523         C.rmr_wh_close(m.context, whid)
524 }
525
526 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
527         if params.status == int(C.RMR_ERR_RETRY) {
528                 return true
529         }
530         return false
531 }
532
533 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
534         if params.status == int(C.RMR_ERR_NOENDPT) {
535                 return true
536         }
537         return false
538 }
539
540 func (m *RMRClient) UpdateStatCounter(name string) {
541         m.mux.Lock()
542         m.stat[name].Inc()
543         m.mux.Unlock()
544 }
545
546 func (m *RMRClient) RegisterMetrics() {
547         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
548 }
549
550 func (m *RMRClient) Wait() {
551         m.wg.Wait()
552 }
553
554 func (m *RMRClient) IsReady() bool {
555         return m.ready != 0
556 }
557
558 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
559         m.readyCb = cb
560         m.readyCbParams = params
561 }
562
563 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
564         id, ok := RICMessageTypes[name]
565         return id, ok
566 }
567
568 func (m *RMRClient) GetRicMessageName(id int) (s string) {
569         for k, v := range RICMessageTypes {
570                 if id == v {
571                         return k
572                 }
573         }
574         return
575 }
576
577 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
578         if mbuf != nil {
579                 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
580                 return int(mbuf.state)
581         }
582         Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
583         return 0
584 }
585
586 // To be removed ...
587 func (m *RMRClient) GetStat() (r RMRStatistics) {
588         return
589 }