028cc3c9473aa18af2948b27409775063c729ddd
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "strings"
71         "time"
72         "unsafe"
73
74         "github.com/spf13/viper"
75 )
76
77 var RMRCounterOpts = []CounterOpts{
78         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
79         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80         {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"},
81         {Name: "Received", Help: "The total number of received RMR messages"},
82         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
83         {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"},
84 }
85
86 var RMRGaugeOpts = []CounterOpts{
87         {Name: "Enqueued", Help: "The total number of enqueued in RMR library"},
88         {Name: "Dropped", Help: "The total number of dropped in RMR library"},
89 }
90
91 var RMRErrors = map[int]string{
92         C.RMR_OK:             "state is good",
93         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
94         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
95         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
96         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
97         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
98         C.RMR_ERR_CALLFAILED: "unable to send call() message",
99         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
100         C.RMR_ERR_WHID:       "wormhole id was invalid",
101         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
102         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
103         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
104         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
105         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
106         C.RMR_ERR_TRUNC:      "received message likely truncated",
107         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
108         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
109 }
110
111 //-----------------------------------------------------------------------------
112 //
113 //-----------------------------------------------------------------------------
114 type RMRParams struct {
115         Mtype      int
116         Payload    []byte
117         PayloadLen int
118         Meid       *RMRMeid
119         Xid        string
120         SubId      int
121         Src        string
122         Mbuf       *C.rmr_mbuf_t
123         Whid       int
124         Callid     int
125         Timeout    int
126         status     int
127 }
128
129 func (params *RMRParams) String() string {
130         var b bytes.Buffer
131         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
132         return b.String()
133 }
134
135 //-----------------------------------------------------------------------------
136 //
137 //-----------------------------------------------------------------------------
138 type RMRClientParams struct {
139         StatDesc string
140         RmrData  PortData
141 }
142
143 func (params *RMRClientParams) String() string {
144         return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
145                 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
146                 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
147 }
148
149 //-----------------------------------------------------------------------------
150 //
151 //-----------------------------------------------------------------------------
152 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
153         p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
154         m := C.int(params.RmrData.MaxSize)
155         c := C.int(params.RmrData.ThreadType)
156         defer C.free(unsafe.Pointer(p))
157         ctx := C.rmr_init(p, m, c)
158         if ctx == nil {
159                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
160         }
161
162         Logger.Info("new rmrClient with parameters: %s", params.String())
163
164         if params.RmrData.LowLatency {
165                 C.rmr_set_low_latency(ctx)
166         }
167         if params.RmrData.FastAck {
168                 C.rmr_set_fack(ctx)
169         }
170
171         return &RMRClient{
172                 context:           ctx,
173                 consumers:         make([]MessageConsumer, 0),
174                 statc:             Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
175                 statg:             Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc),
176                 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
177         }
178 }
179
180 func NewRMRClient() *RMRClient {
181         p := GetPortData("rmrdata")
182         if p.Port == 0 || viper.IsSet("rmr.protPort") {
183                 // Old xApp descriptor used, fallback to rmr section
184                 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
185                 p.MaxSize = viper.GetInt("rmr.maxSize")
186                 p.ThreadType = viper.GetInt("rmr.threadType")
187                 p.LowLatency = viper.GetBool("rmr.lowLatency")
188                 p.FastAck = viper.GetBool("rmr.fastAck")
189                 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
190         }
191
192         return NewRMRClientWithParams(
193                 &RMRClientParams{
194                         RmrData:  p,
195                         StatDesc: "RMR",
196                 })
197 }
198
199 func (m *RMRClient) Start(c MessageConsumer) {
200         if c != nil {
201                 m.consumers = append(m.consumers, c)
202         }
203
204         var counter int = 0
205         for {
206                 m.contextMux.Lock()
207                 m.ready = int(C.rmr_ready(m.context))
208                 m.contextMux.Unlock()
209                 if m.ready == 1 {
210                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
211                         break
212                 }
213                 if counter%10 == 0 {
214                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
215                 }
216                 time.Sleep(1 * time.Second)
217                 counter++
218         }
219
220         if m.readyCb != nil {
221                 go m.readyCb(m.readyCbParams)
222         }
223
224         m.wg.Add(1)
225         go func() {
226                 m.contextMux.Lock()
227                 rfd := C.rmr_get_rcvfd(m.context)
228                 m.contextMux.Unlock()
229                 efd := C.init_epoll(rfd)
230
231                 defer m.wg.Done()
232                 for {
233
234                         if int(C.wait_epoll(efd, rfd)) == 0 {
235                                 continue
236                         }
237                         m.contextMux.Lock()
238                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
239                         m.contextMux.Unlock()
240
241                         if rxBuffer == nil {
242                                 m.LogMBufError("RecvMsg failed", rxBuffer)
243                                 m.UpdateStatCounter("ReceiveError")
244                                 continue
245                         }
246                         m.UpdateStatCounter("Received")
247                         m.parseMessage(rxBuffer)
248                 }
249         }()
250
251         m.wg.Add(1)
252         go func() {
253                 defer m.wg.Done()
254                 for {
255                         m.UpdateRmrStats()
256                         time.Sleep(1 * time.Second)
257                 }
258         }()
259
260         m.wg.Wait()
261 }
262
263 func (m *RMRClient) UpdateRmrStats() {
264         param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{}))))
265         m.contextMux.Lock()
266         C.rmr_get_rx_debug_info(m.context, param)
267         m.contextMux.Unlock()
268         m.mux.Lock()
269         m.statg["Enqueued"].Set(float64(param.enqueue))
270         m.statg["Dropped"].Set(float64(param.drop))
271         m.mux.Unlock()
272         C.free(unsafe.Pointer(param))
273 }
274
275 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
276         if len(m.consumers) == 0 {
277                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
278                 return
279         }
280
281         params := &RMRParams{}
282         params.Mbuf = rxBuffer
283         params.Mtype = int(rxBuffer.mtype)
284         params.SubId = int(rxBuffer.sub_id)
285         params.Meid = &RMRMeid{}
286
287         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
288         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
289                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
290         }
291
292         xidBuf := make([]byte, int(C.RMR_MAX_XID))
293         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
294                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
295         }
296
297         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
298         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
299                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
300         }
301
302         // Default case: a single consumer
303         if len(m.consumers) == 1 && m.consumers[0] != nil {
304                 params.PayloadLen = int(rxBuffer.len)
305                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
306                 err := m.consumers[0].Consume(params)
307                 if err != nil {
308                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
309                 }
310                 return
311         }
312
313         /*
314                 // Special case for multiple consumers
315                 for _, c := range m.consumers {
316                         cptr := unsafe.Pointer(rxBuffer.payload)
317                         params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
318                         params.PayloadLen = int(rxBuffer.len)
319                         params.Mtype = int(rxBuffer.mtype)
320                         params.SubId = int(rxBuffer.sub_id)
321
322                         err := c.Consume(params)
323                         if err != nil {
324                                 Logger.Warn("rmrClient: Consumer returned error: %v", err)
325                         }
326                 }
327         */
328 }
329
330 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
331         m.contextMux.Lock()
332         defer m.contextMux.Unlock()
333         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
334         if outbuf == nil {
335                 Logger.Error("rmrClient: Allocating message buffer failed!")
336         }
337         return outbuf
338 }
339
340 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
341         m.contextMux.Lock()
342         defer m.contextMux.Unlock()
343         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
344         if outbuf == nil {
345                 Logger.Error("rmrClient: Allocating message buffer failed!")
346         }
347         return outbuf
348 }
349
350 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
351         if mbuf == nil {
352                 return
353         }
354         m.contextMux.Lock()
355         defer m.contextMux.Unlock()
356         C.rmr_free_msg(mbuf)
357 }
358
359 func (m *RMRClient) SendMsg(params *RMRParams) bool {
360         return m.Send(params, false)
361 }
362
363 func (m *RMRClient) SendRts(params *RMRParams) bool {
364         return m.Send(params, true)
365 }
366
367 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
368         status := m.Send(params, isRts)
369         i := 0
370         for ; i < int(to)*2 && status == false; i++ {
371                 status = m.Send(params, isRts)
372                 if status == false {
373                         m.UpdateStatCounter("SendWithRetryRetry")
374                         time.Sleep(500 * time.Millisecond)
375                 }
376         }
377         if status == false {
378                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
379                 if params.Mbuf != nil {
380                         m.Free(params.Mbuf)
381                         params.Mbuf = nil
382                 }
383         }
384         return
385 }
386
387 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
388
389         if params == nil {
390                 return nil
391         }
392
393         payLen := len(params.Payload)
394         if params.PayloadLen != 0 {
395                 payLen = params.PayloadLen
396         }
397
398         txBuffer := params.Mbuf
399         params.Mbuf = nil
400
401         if txBuffer != nil {
402                 txBuffer = m.ReAllocate(txBuffer, payLen)
403         } else {
404                 txBuffer = m.Allocate(payLen)
405         }
406
407         if txBuffer == nil {
408                 return nil
409         }
410         txBuffer.mtype = C.int(params.Mtype)
411         txBuffer.sub_id = C.int(params.SubId)
412         txBuffer.len = C.int(payLen)
413
414         datap := C.CBytes(params.Payload)
415         defer C.free(datap)
416
417         if params.Meid != nil {
418                 b := make([]byte, int(C.RMR_MAX_MEID))
419                 copy(b, []byte(params.Meid.RanName))
420                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
421         }
422
423         xidLen := len(params.Xid)
424         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
425                 b := make([]byte, int(C.RMR_MAX_XID))
426                 copy(b, []byte(params.Xid))
427                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
428         }
429
430         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
431
432         return txBuffer
433 }
434
435 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
436
437         txBuffer := m.CopyBuffer(params)
438         if txBuffer == nil {
439                 return false
440         }
441         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
442         if params.status == int(C.RMR_OK) {
443                 return true
444         }
445         return false
446 }
447
448 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
449         var (
450                 currBuffer *C.rmr_mbuf_t
451         )
452
453         m.contextMux.Lock()
454         txBuffer.state = 0
455         if whid != 0 {
456                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
457         } else {
458                 if isRts {
459                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
460                 } else {
461                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
462                 }
463         }
464         m.contextMux.Unlock()
465
466         if currBuffer == nil {
467                 m.UpdateStatCounter("TransmitError")
468                 return m.LogMBufError("SendBuf failed", txBuffer)
469         }
470
471         // Just quick retry seems to help for K8s issue
472         if m.maxRetryOnFailure == 0 {
473                 m.maxRetryOnFailure = 5
474         }
475
476         for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
477                 m.contextMux.Lock()
478                 if whid != 0 {
479                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
480                 } else {
481                         if isRts {
482                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
483                         } else {
484                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
485                         }
486                 }
487                 m.contextMux.Unlock()
488                 m.UpdateStatCounter("TransmitRetry")
489         }
490
491         if currBuffer == nil {
492                 m.UpdateStatCounter("TransmitError")
493                 m.LogMBufError("SendBuf failed", currBuffer)
494                 return int(C.RMR_ERR_INITFAILED)
495         }
496
497         if currBuffer.state != C.RMR_OK {
498                 m.UpdateStatCounter("TransmitError")
499                 m.LogMBufError("SendBuf failed", currBuffer)
500         } else {
501                 m.UpdateStatCounter("Transmitted")
502         }
503         defer m.Free(currBuffer)
504         return int(currBuffer.state)
505
506 }
507
508 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
509         var (
510                 currBuffer  *C.rmr_mbuf_t
511                 counterName string = "Transmitted"
512         )
513         txBuffer := m.CopyBuffer(params)
514         if txBuffer == nil {
515                 return C.RMR_ERR_INITFAILED, ""
516         }
517
518         txBuffer.state = 0
519
520         m.contextMux.Lock()
521         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
522         m.contextMux.Unlock()
523
524         if currBuffer == nil {
525                 m.UpdateStatCounter("TransmitError")
526                 return m.LogMBufError("SendBuf failed", txBuffer), ""
527         }
528
529         if currBuffer.state != C.RMR_OK {
530                 counterName = "TransmitError"
531                 m.LogMBufError("SendBuf failed", currBuffer)
532         }
533
534         m.UpdateStatCounter(counterName)
535         defer m.Free(currBuffer)
536
537         cptr := unsafe.Pointer(currBuffer.payload)
538         payload := C.GoBytes(cptr, C.int(currBuffer.len))
539
540         return int(currBuffer.state), string(payload)
541 }
542
543 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
544         return m.Wh_open(target)
545 }
546
547 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
548         m.contextMux.Lock()
549         defer m.contextMux.Unlock()
550         endpoint := C.CString(target)
551         return C.rmr_wh_open(m.context, endpoint)
552 }
553
554 func (m *RMRClient) Closewh(whid int) {
555         m.Wh_close(C.rmr_whid_t(whid))
556 }
557
558 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
559         m.contextMux.Lock()
560         defer m.contextMux.Unlock()
561         C.rmr_wh_close(m.context, whid)
562 }
563
564 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
565         if params.status == int(C.RMR_ERR_RETRY) {
566                 return true
567         }
568         return false
569 }
570
571 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
572         if params.status == int(C.RMR_ERR_NOENDPT) {
573                 return true
574         }
575         return false
576 }
577
578 func (m *RMRClient) UpdateStatCounter(name string) {
579         m.mux.Lock()
580         m.statc[name].Inc()
581         m.mux.Unlock()
582 }
583
584 func (m *RMRClient) RegisterMetrics() {
585         m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
586         m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR")
587 }
588
589 func (m *RMRClient) Wait() {
590         m.wg.Wait()
591 }
592
593 func (m *RMRClient) IsReady() bool {
594         return m.ready != 0
595 }
596
597 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
598         m.readyCb = cb
599         m.readyCbParams = params
600 }
601
602 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
603         id, ok := RICMessageTypes[name]
604         return id, ok
605 }
606
607 func (m *RMRClient) GetRicMessageName(id int) (s string) {
608         for k, v := range RICMessageTypes {
609                 if id == v {
610                         return k
611                 }
612         }
613         return
614 }
615
616 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
617         if mbuf != nil {
618                 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
619                 return int(mbuf.state)
620         }
621         Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
622         return 0
623 }