59df7436cf6c16623a3de18cbfbb56fc6695d1d5
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32
33 void write_bytes_array(unsigned char *dst, void *data, int len) {
34     memcpy((void *)dst, (void *)data, len);
35 }
36
37 int init_epoll(int rcv_fd) {
38         struct  epoll_event epe;
39         int epoll_fd = epoll_create1( 0 );
40         epe.events = EPOLLIN;
41         epe.data.fd = rcv_fd;
42         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
43         return epoll_fd;
44 }
45
46 void close_epoll(int epoll_fd) {
47         if(epoll_fd >= 0) {
48                 close(epoll_fd);
49         }
50 }
51
52 int wait_epoll(int epoll_fd,int rcv_fd) {
53         struct  epoll_event events[1];
54         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
55                 if( events[0].data.fd == rcv_fd ) {
56                         return 1;
57                 }
58         }
59         return 0;
60 }
61
62 #cgo CFLAGS: -I../
63 #cgo LDFLAGS: -lrmr_si
64 */
65 import "C"
66
67 import (
68         "bytes"
69         "crypto/md5"
70         "fmt"
71         "strings"
72         "time"
73         "unsafe"
74
75         "github.com/spf13/viper"
76 )
77
78 var RMRCounterOpts = []CounterOpts{
79         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
80         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
81         {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"},
82         {Name: "Received", Help: "The total number of received RMR messages"},
83         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
84         {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"},
85 }
86
87 var RMRGaugeOpts = []CounterOpts{
88         {Name: "Enqueued", Help: "The total number of enqueued in RMR library"},
89         {Name: "Dropped", Help: "The total number of dropped in RMR library"},
90 }
91
92 var RMRErrors = map[int]string{
93         C.RMR_OK:             "state is good",
94         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
95         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
96         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
97         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
98         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
99         C.RMR_ERR_CALLFAILED: "unable to send call() message",
100         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
101         C.RMR_ERR_WHID:       "wormhole id was invalid",
102         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
103         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
104         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
105         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
106         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
107         C.RMR_ERR_TRUNC:      "received message likely truncated",
108         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
109         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
110 }
111
112 // -----------------------------------------------------------------------------
113 //
114 // -----------------------------------------------------------------------------
115 type RMRParams struct {
116         Mtype      int
117         Payload    []byte
118         PayloadLen int
119         Meid       *RMRMeid
120         Xid        string
121         SubId      int
122         Src        string
123         Mbuf       *C.rmr_mbuf_t
124         Whid       int
125         Callid     int
126         Timeout    int
127         status     int
128 }
129
130 func (params *RMRParams) String() string {
131         var b bytes.Buffer
132         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
133         return b.String()
134 }
135
136 // -----------------------------------------------------------------------------
137 //
138 // -----------------------------------------------------------------------------
139 type RMRClientParams struct {
140         StatDesc string
141         RmrData  PortData
142 }
143
144 func (params *RMRClientParams) String() string {
145         return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
146                 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
147                 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
148 }
149
150 // -----------------------------------------------------------------------------
151 //
152 // -----------------------------------------------------------------------------
153 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
154         p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
155         m := C.int(params.RmrData.MaxSize)
156         c := C.int(params.RmrData.ThreadType)
157         defer C.free(unsafe.Pointer(p))
158         ctx := C.rmr_init(p, m, c)
159         if ctx == nil {
160                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
161         }
162
163         Logger.Info("new rmrClient with parameters: %s", params.String())
164
165         if params.RmrData.LowLatency {
166                 C.rmr_set_low_latency(ctx)
167         }
168         if params.RmrData.FastAck {
169                 C.rmr_set_fack(ctx)
170         }
171
172         return &RMRClient{
173                 context:           ctx,
174                 consumers:         make([]MessageConsumer, 0),
175                 statc:             Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
176                 statg:             Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc),
177                 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
178         }
179 }
180
181 func NewRMRClient() *RMRClient {
182         p := GetPortData("rmrdata")
183         if p.Port == 0 || viper.IsSet("rmr.protPort") {
184                 // Old xApp descriptor used, fallback to rmr section
185                 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
186                 p.MaxSize = viper.GetInt("rmr.maxSize")
187                 p.ThreadType = viper.GetInt("rmr.threadType")
188                 p.LowLatency = viper.GetBool("rmr.lowLatency")
189                 p.FastAck = viper.GetBool("rmr.fastAck")
190                 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
191         }
192
193         return NewRMRClientWithParams(
194                 &RMRClientParams{
195                         RmrData:  p,
196                         StatDesc: "RMR",
197                 })
198 }
199
200 func (m *RMRClient) Start(c MessageConsumer) {
201         if c != nil {
202                 m.consumers = append(m.consumers, c)
203         }
204
205         var counter int = 0
206         for {
207                 m.contextMux.Lock()
208                 m.ready = int(C.rmr_ready(m.context))
209                 m.contextMux.Unlock()
210                 if m.ready == 1 {
211                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
212                         break
213                 }
214                 if counter%10 == 0 {
215                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
216                 }
217                 time.Sleep(1 * time.Second)
218                 counter++
219         }
220
221         if m.readyCb != nil {
222                 go m.readyCb(m.readyCbParams)
223         }
224
225         m.wg.Add(1)
226         go func() {
227                 m.contextMux.Lock()
228                 rfd := C.rmr_get_rcvfd(m.context)
229                 m.contextMux.Unlock()
230                 efd := C.init_epoll(rfd)
231
232                 defer m.wg.Done()
233                 for {
234
235                         if int(C.wait_epoll(efd, rfd)) == 0 {
236                                 continue
237                         }
238                         m.contextMux.Lock()
239                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
240                         m.contextMux.Unlock()
241
242                         if rxBuffer == nil {
243                                 m.LogMBufError("RecvMsg failed", rxBuffer)
244                                 m.UpdateStatCounter("ReceiveError")
245                                 continue
246                         }
247                         m.UpdateStatCounter("Received")
248                         m.parseMessage(rxBuffer)
249                 }
250         }()
251
252         m.wg.Add(1)
253         go func() {
254                 defer m.wg.Done()
255                 for {
256                         m.UpdateRmrStats()
257                         time.Sleep(1 * time.Second)
258                 }
259         }()
260
261         m.wg.Wait()
262 }
263
264 func (m *RMRClient) UpdateRmrStats() {
265         param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{}))))
266         m.contextMux.Lock()
267         C.rmr_get_rx_debug_info(m.context, param)
268         m.contextMux.Unlock()
269         m.mux.Lock()
270         m.statg["Enqueued"].Set(float64(param.enqueue))
271         m.statg["Dropped"].Set(float64(param.drop))
272         m.mux.Unlock()
273         C.free(unsafe.Pointer(param))
274 }
275
276 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
277         if len(m.consumers) == 0 {
278                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
279                 return
280         }
281
282         params := &RMRParams{}
283         params.Mbuf = rxBuffer
284         params.Mtype = int(rxBuffer.mtype)
285         params.SubId = int(rxBuffer.sub_id)
286         params.Meid = &RMRMeid{}
287
288         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
289         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
290                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
291         }
292
293         xidBuf := make([]byte, int(C.RMR_MAX_XID))
294         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
295                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
296         }
297
298         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
299         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
300                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
301         }
302
303         // Default case: a single consumer
304         if len(m.consumers) == 1 && m.consumers[0] != nil {
305                 params.PayloadLen = int(rxBuffer.len)
306                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
307                 err := m.consumers[0].Consume(params)
308                 if err != nil {
309                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
310                 }
311                 return
312         }
313
314         /*
315                 // Special case for multiple consumers
316                 for _, c := range m.consumers {
317                         cptr := unsafe.Pointer(rxBuffer.payload)
318                         params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
319                         params.PayloadLen = int(rxBuffer.len)
320                         params.Mtype = int(rxBuffer.mtype)
321                         params.SubId = int(rxBuffer.sub_id)
322
323                         err := c.Consume(params)
324                         if err != nil {
325                                 Logger.Warn("rmrClient: Consumer returned error: %v", err)
326                         }
327                 }
328         */
329 }
330
331 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
332         m.contextMux.Lock()
333         defer m.contextMux.Unlock()
334         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
335         if outbuf == nil {
336                 Logger.Error("rmrClient: Allocating message buffer failed!")
337         }
338         return outbuf
339 }
340
341 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
342         m.contextMux.Lock()
343         defer m.contextMux.Unlock()
344         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
345         if outbuf == nil {
346                 Logger.Error("rmrClient: Allocating message buffer failed!")
347         }
348         return outbuf
349 }
350
351 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
352         if mbuf == nil {
353                 return
354         }
355         m.contextMux.Lock()
356         defer m.contextMux.Unlock()
357         C.rmr_free_msg(mbuf)
358 }
359
360 func (m *RMRClient) SendMsg(params *RMRParams) bool {
361         return m.Send(params, false)
362 }
363
364 func (m *RMRClient) SendRts(params *RMRParams) bool {
365         return m.Send(params, true)
366 }
367
368 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
369         status := m.Send(params, isRts)
370         i := 0
371         for ; i < int(to)*2 && status == false; i++ {
372                 status = m.Send(params, isRts)
373                 if status == false {
374                         m.UpdateStatCounter("SendWithRetryRetry")
375                         time.Sleep(500 * time.Millisecond)
376                 }
377         }
378         if status == false {
379                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
380                 if params.Mbuf != nil {
381                         m.Free(params.Mbuf)
382                         params.Mbuf = nil
383                 }
384         }
385         return
386 }
387
388 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
389
390         if params == nil {
391                 return nil
392         }
393
394         payLen := len(params.Payload)
395         if params.PayloadLen != 0 {
396                 payLen = params.PayloadLen
397         }
398
399         txBuffer := params.Mbuf
400         params.Mbuf = nil
401
402         if txBuffer != nil {
403                 txBuffer = m.ReAllocate(txBuffer, payLen)
404         } else {
405                 txBuffer = m.Allocate(payLen)
406         }
407
408         if txBuffer == nil {
409                 return nil
410         }
411         txBuffer.mtype = C.int(params.Mtype)
412         txBuffer.sub_id = C.int(params.SubId)
413         txBuffer.len = C.int(payLen)
414
415         datap := C.CBytes(params.Payload)
416         defer C.free(datap)
417
418         if params.Meid != nil {
419                 b := make([]byte, int(C.RMR_MAX_MEID))
420                 copy(b, []byte(params.Meid.RanName))
421                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
422         }
423
424         xidLen := len(params.Xid)
425         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
426                 b := make([]byte, int(C.RMR_MAX_XID))
427                 copy(b, []byte(params.Xid))
428                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
429         }
430
431         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
432
433         return txBuffer
434 }
435
436 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
437
438         txBuffer := m.CopyBuffer(params)
439         if txBuffer == nil {
440                 return false
441         }
442         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
443         if params.status == int(C.RMR_OK) {
444                 return true
445         }
446         return false
447 }
448
449 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
450         txBuffer.state = 0
451
452         // Just quick retry seems to help for K8s issue
453         if m.maxRetryOnFailure == 0 {
454                 m.maxRetryOnFailure = 5
455         }
456
457         for j := 0; j <= m.maxRetryOnFailure; j++ {
458                 m.contextMux.Lock()
459                 if whid != 0 {
460                         txBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
461                 } else {
462                         if isRts {
463                                 txBuffer = C.rmr_rts_msg(m.context, txBuffer)
464                         } else {
465                                 txBuffer = C.rmr_send_msg(m.context, txBuffer)
466                         }
467                 }
468                 m.contextMux.Unlock()
469                 if j+1 <= m.maxRetryOnFailure && txBuffer != nil && txBuffer.state == C.RMR_ERR_RETRY {
470                         m.UpdateStatCounter("TransmitRetry")
471                         continue
472                 }
473                 break
474         }
475
476         if txBuffer == nil {
477                 m.UpdateStatCounter("TransmitError")
478                 m.LogMBufError("SendBuf failed", txBuffer)
479                 return int(C.RMR_ERR_INITFAILED)
480         }
481
482         if txBuffer.state != C.RMR_OK {
483                 m.UpdateStatCounter("TransmitError")
484                 m.LogMBufError("SendBuf failed", txBuffer)
485         } else {
486                 m.UpdateStatCounter("Transmitted")
487         }
488         defer m.Free(txBuffer)
489         return int(txBuffer.state)
490
491 }
492
493 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
494         var (
495                 currBuffer  *C.rmr_mbuf_t
496                 counterName string = "Transmitted"
497         )
498         txBuffer := m.CopyBuffer(params)
499         if txBuffer == nil {
500                 return C.RMR_ERR_INITFAILED, ""
501         }
502
503         txBuffer.state = 0
504
505         m.contextMux.Lock()
506         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
507         m.contextMux.Unlock()
508
509         if currBuffer == nil {
510                 m.UpdateStatCounter("TransmitError")
511                 return m.LogMBufError("SendBuf failed", txBuffer), ""
512         }
513
514         if currBuffer.state != C.RMR_OK {
515                 counterName = "TransmitError"
516                 m.LogMBufError("SendBuf failed", currBuffer)
517         }
518
519         m.UpdateStatCounter(counterName)
520         defer m.Free(currBuffer)
521
522         cptr := unsafe.Pointer(currBuffer.payload)
523         payload := C.GoBytes(cptr, C.int(currBuffer.len))
524
525         return int(currBuffer.state), string(payload)
526 }
527
528 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
529         return m.Wh_open(target)
530 }
531
532 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
533         m.contextMux.Lock()
534         defer m.contextMux.Unlock()
535         endpoint := C.CString(target)
536         return C.rmr_wh_open(m.context, endpoint)
537 }
538
539 func (m *RMRClient) Closewh(whid int) {
540         m.Wh_close(C.rmr_whid_t(whid))
541 }
542
543 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
544         m.contextMux.Lock()
545         defer m.contextMux.Unlock()
546         C.rmr_wh_close(m.context, whid)
547 }
548
549 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
550         if params.status == int(C.RMR_ERR_RETRY) {
551                 return true
552         }
553         return false
554 }
555
556 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
557         if params.status == int(C.RMR_ERR_NOENDPT) {
558                 return true
559         }
560         return false
561 }
562
563 func (m *RMRClient) UpdateStatCounter(name string) {
564         m.mux.Lock()
565         m.statc[name].Inc()
566         m.mux.Unlock()
567 }
568
569 func (m *RMRClient) RegisterMetrics() {
570         m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
571         m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR")
572 }
573
574 func (m *RMRClient) Wait() {
575         m.wg.Wait()
576 }
577
578 func (m *RMRClient) IsReady() bool {
579         return m.ready != 0
580 }
581
582 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
583         m.readyCb = cb
584         m.readyCbParams = params
585 }
586
587 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
588         id, ok := RICMessageTypes[name]
589         return id, ok
590 }
591
592 func (m *RMRClient) GetRicMessageName(id int) (s string) {
593         for k, v := range RICMessageTypes {
594                 if id == v {
595                         return k
596                 }
597         }
598         return
599 }
600
601 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
602         if mbuf != nil {
603                 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
604                 return int(mbuf.state)
605         }
606         Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
607         return 0
608 }