baseimage with go 1.18 and go.mod updates
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32
33 void write_bytes_array(unsigned char *dst, void *data, int len) {
34     memcpy((void *)dst, (void *)data, len);
35 }
36
37 int init_epoll(int rcv_fd) {
38         struct  epoll_event epe;
39         int epoll_fd = epoll_create1( 0 );
40         epe.events = EPOLLIN;
41         epe.data.fd = rcv_fd;
42         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
43         return epoll_fd;
44 }
45
46 void close_epoll(int epoll_fd) {
47         if(epoll_fd >= 0) {
48                 close(epoll_fd);
49         }
50 }
51
52 int wait_epoll(int epoll_fd,int rcv_fd) {
53         struct  epoll_event events[1];
54         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
55                 if( events[0].data.fd == rcv_fd ) {
56                         return 1;
57                 }
58         }
59         return 0;
60 }
61
62 #cgo CFLAGS: -I../
63 #cgo LDFLAGS: -lrmr_si
64 */
65 import "C"
66
67 import (
68         "bytes"
69         "crypto/md5"
70         "fmt"
71         "strings"
72         "time"
73         "unsafe"
74
75         "github.com/spf13/viper"
76 )
77
78 var RMRCounterOpts = []CounterOpts{
79         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
80         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
81         {Name: "TransmitRetry", Help: "The total number of transmit retries on failure"},
82         {Name: "Received", Help: "The total number of received RMR messages"},
83         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
84         {Name: "SendWithRetryRetry", Help: "SendWithRetry service retries"},
85 }
86
87 var RMRGaugeOpts = []CounterOpts{
88         {Name: "Enqueued", Help: "The total number of enqueued in RMR library"},
89         {Name: "Dropped", Help: "The total number of dropped in RMR library"},
90 }
91
92 var RMRErrors = map[int]string{
93         C.RMR_OK:             "state is good",
94         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
95         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
96         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
97         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
98         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
99         C.RMR_ERR_CALLFAILED: "unable to send call() message",
100         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
101         C.RMR_ERR_WHID:       "wormhole id was invalid",
102         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
103         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
104         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
105         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
106         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
107         C.RMR_ERR_TRUNC:      "received message likely truncated",
108         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
109         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
110 }
111
112 // -----------------------------------------------------------------------------
113 //
114 // -----------------------------------------------------------------------------
115 type RMRParams struct {
116         Mtype      int
117         Payload    []byte
118         PayloadLen int
119         Meid       *RMRMeid
120         Xid        string
121         SubId      int
122         Src        string
123         Mbuf       *C.rmr_mbuf_t
124         Whid       int
125         Callid     int
126         Timeout    int
127         status     int
128 }
129
130 func (params *RMRParams) String() string {
131         var b bytes.Buffer
132         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
133         return b.String()
134 }
135
136 // -----------------------------------------------------------------------------
137 //
138 // -----------------------------------------------------------------------------
139 type RMRClientParams struct {
140         StatDesc string
141         RmrData  PortData
142 }
143
144 func (params *RMRClientParams) String() string {
145         return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
146                 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
147                 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
148 }
149
150 // -----------------------------------------------------------------------------
151 //
152 // -----------------------------------------------------------------------------
153 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
154         p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
155         m := C.int(params.RmrData.MaxSize)
156         c := C.int(params.RmrData.ThreadType)
157         defer C.free(unsafe.Pointer(p))
158         ctx := C.rmr_init(p, m, c)
159         if ctx == nil {
160                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
161         }
162
163         Logger.Info("new rmrClient with parameters: %s", params.String())
164
165         if params.RmrData.LowLatency {
166                 C.rmr_set_low_latency(ctx)
167         }
168         if params.RmrData.FastAck {
169                 C.rmr_set_fack(ctx)
170         }
171
172         return &RMRClient{
173                 context:           ctx,
174                 consumers:         make([]MessageConsumer, 0),
175                 statc:             Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
176                 statg:             Metric.RegisterGaugeGroup(RMRGaugeOpts, params.StatDesc),
177                 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
178         }
179 }
180
181 func NewRMRClient() *RMRClient {
182         p := GetPortData("rmrdata")
183         if p.Port == 0 || viper.IsSet("rmr.protPort") {
184                 // Old xApp descriptor used, fallback to rmr section
185                 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
186                 p.MaxSize = viper.GetInt("rmr.maxSize")
187                 p.ThreadType = viper.GetInt("rmr.threadType")
188                 p.LowLatency = viper.GetBool("rmr.lowLatency")
189                 p.FastAck = viper.GetBool("rmr.fastAck")
190                 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
191         }
192
193         return NewRMRClientWithParams(
194                 &RMRClientParams{
195                         RmrData:  p,
196                         StatDesc: "RMR",
197                 })
198 }
199
200 func (m *RMRClient) Start(c MessageConsumer) {
201         if c != nil {
202                 m.consumers = append(m.consumers, c)
203         }
204
205         var counter int = 0
206         for {
207                 m.contextMux.Lock()
208                 m.ready = int(C.rmr_ready(m.context))
209                 m.contextMux.Unlock()
210                 if m.ready == 1 {
211                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
212                         break
213                 }
214                 if counter%10 == 0 {
215                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
216                 }
217                 time.Sleep(1 * time.Second)
218                 counter++
219         }
220
221         if m.readyCb != nil {
222                 go m.readyCb(m.readyCbParams)
223         }
224
225         m.wg.Add(1)
226         go func() {
227                 m.contextMux.Lock()
228                 rfd := C.rmr_get_rcvfd(m.context)
229                 m.contextMux.Unlock()
230                 efd := C.init_epoll(rfd)
231
232                 defer m.wg.Done()
233                 for {
234
235                         if int(C.wait_epoll(efd, rfd)) == 0 {
236                                 continue
237                         }
238                         m.contextMux.Lock()
239                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
240                         m.contextMux.Unlock()
241
242                         if rxBuffer == nil {
243                                 m.LogMBufError("RecvMsg failed", rxBuffer)
244                                 m.UpdateStatCounter("ReceiveError")
245                                 continue
246                         }
247                         m.UpdateStatCounter("Received")
248                         m.parseMessage(rxBuffer)
249                 }
250         }()
251
252         m.wg.Add(1)
253         go func() {
254                 defer m.wg.Done()
255                 for {
256                         m.UpdateRmrStats()
257                         time.Sleep(1 * time.Second)
258                 }
259         }()
260
261         m.wg.Wait()
262 }
263
264 func (m *RMRClient) UpdateRmrStats() {
265         param := (*C.rmr_rx_debug_t)(C.malloc(C.size_t(unsafe.Sizeof(C.rmr_rx_debug_t{}))))
266         m.contextMux.Lock()
267         C.rmr_get_rx_debug_info(m.context, param)
268         m.contextMux.Unlock()
269         m.mux.Lock()
270         m.statg["Enqueued"].Set(float64(param.enqueue))
271         m.statg["Dropped"].Set(float64(param.drop))
272         m.mux.Unlock()
273         C.free(unsafe.Pointer(param))
274 }
275
276 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
277         if len(m.consumers) == 0 {
278                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
279                 return
280         }
281
282         params := &RMRParams{}
283         params.Mbuf = rxBuffer
284         params.Mtype = int(rxBuffer.mtype)
285         params.SubId = int(rxBuffer.sub_id)
286         params.Meid = &RMRMeid{}
287
288         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
289         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
290                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
291         }
292
293         xidBuf := make([]byte, int(C.RMR_MAX_XID))
294         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
295                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
296         }
297
298         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
299         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
300                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
301         }
302
303         // Default case: a single consumer
304         if len(m.consumers) == 1 && m.consumers[0] != nil {
305                 params.PayloadLen = int(rxBuffer.len)
306                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
307                 err := m.consumers[0].Consume(params)
308                 if err != nil {
309                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
310                 }
311                 return
312         }
313
314         /*
315                 // Special case for multiple consumers
316                 for _, c := range m.consumers {
317                         cptr := unsafe.Pointer(rxBuffer.payload)
318                         params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
319                         params.PayloadLen = int(rxBuffer.len)
320                         params.Mtype = int(rxBuffer.mtype)
321                         params.SubId = int(rxBuffer.sub_id)
322
323                         err := c.Consume(params)
324                         if err != nil {
325                                 Logger.Warn("rmrClient: Consumer returned error: %v", err)
326                         }
327                 }
328         */
329 }
330
331 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
332         m.contextMux.Lock()
333         defer m.contextMux.Unlock()
334         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
335         if outbuf == nil {
336                 Logger.Error("rmrClient: Allocating message buffer failed!")
337         }
338         return outbuf
339 }
340
341 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
342         m.contextMux.Lock()
343         defer m.contextMux.Unlock()
344         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
345         if outbuf == nil {
346                 Logger.Error("rmrClient: Allocating message buffer failed!")
347         }
348         return outbuf
349 }
350
351 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
352         if mbuf == nil {
353                 return
354         }
355         m.contextMux.Lock()
356         defer m.contextMux.Unlock()
357         C.rmr_free_msg(mbuf)
358 }
359
360 func (m *RMRClient) SendMsg(params *RMRParams) bool {
361         return m.Send(params, false)
362 }
363
364 func (m *RMRClient) SendRts(params *RMRParams) bool {
365         return m.Send(params, true)
366 }
367
368 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
369         status := m.Send(params, isRts)
370         i := 0
371         for ; i < int(to)*2 && status == false; i++ {
372                 status = m.Send(params, isRts)
373                 if status == false {
374                         m.UpdateStatCounter("SendWithRetryRetry")
375                         time.Sleep(500 * time.Millisecond)
376                 }
377         }
378         if status == false {
379                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
380                 if params.Mbuf != nil {
381                         m.Free(params.Mbuf)
382                         params.Mbuf = nil
383                 }
384         }
385         return
386 }
387
388 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
389
390         if params == nil {
391                 return nil
392         }
393
394         payLen := len(params.Payload)
395         if params.PayloadLen != 0 {
396                 payLen = params.PayloadLen
397         }
398
399         txBuffer := params.Mbuf
400         params.Mbuf = nil
401
402         if txBuffer != nil {
403                 txBuffer = m.ReAllocate(txBuffer, payLen)
404         } else {
405                 txBuffer = m.Allocate(payLen)
406         }
407
408         if txBuffer == nil {
409                 return nil
410         }
411         txBuffer.mtype = C.int(params.Mtype)
412         txBuffer.sub_id = C.int(params.SubId)
413         txBuffer.len = C.int(payLen)
414
415         datap := C.CBytes(params.Payload)
416         defer C.free(datap)
417
418         if params.Meid != nil {
419                 b := make([]byte, int(C.RMR_MAX_MEID))
420                 copy(b, []byte(params.Meid.RanName))
421                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
422         }
423
424         xidLen := len(params.Xid)
425         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
426                 b := make([]byte, int(C.RMR_MAX_XID))
427                 copy(b, []byte(params.Xid))
428                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
429         }
430
431         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
432
433         return txBuffer
434 }
435
436 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
437
438         txBuffer := m.CopyBuffer(params)
439         if txBuffer == nil {
440                 return false
441         }
442         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
443         if params.status == int(C.RMR_OK) {
444                 return true
445         }
446         return false
447 }
448
449 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
450         var (
451                 currBuffer *C.rmr_mbuf_t
452         )
453
454         m.contextMux.Lock()
455         txBuffer.state = 0
456         if whid != 0 {
457                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
458         } else {
459                 if isRts {
460                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
461                 } else {
462                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
463                 }
464         }
465         m.contextMux.Unlock()
466
467         if currBuffer == nil {
468                 m.UpdateStatCounter("TransmitError")
469                 return m.LogMBufError("SendBuf failed", txBuffer)
470         }
471
472         // Just quick retry seems to help for K8s issue
473         if m.maxRetryOnFailure == 0 {
474                 m.maxRetryOnFailure = 5
475         }
476
477         for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
478                 m.contextMux.Lock()
479                 if whid != 0 {
480                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
481                 } else {
482                         if isRts {
483                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
484                         } else {
485                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
486                         }
487                 }
488                 m.contextMux.Unlock()
489                 m.UpdateStatCounter("TransmitRetry")
490         }
491
492         if currBuffer == nil {
493                 m.UpdateStatCounter("TransmitError")
494                 m.LogMBufError("SendBuf failed", currBuffer)
495                 return int(C.RMR_ERR_INITFAILED)
496         }
497
498         if currBuffer.state != C.RMR_OK {
499                 m.UpdateStatCounter("TransmitError")
500                 m.LogMBufError("SendBuf failed", currBuffer)
501         } else {
502                 m.UpdateStatCounter("Transmitted")
503         }
504         defer m.Free(currBuffer)
505         return int(currBuffer.state)
506
507 }
508
509 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
510         var (
511                 currBuffer  *C.rmr_mbuf_t
512                 counterName string = "Transmitted"
513         )
514         txBuffer := m.CopyBuffer(params)
515         if txBuffer == nil {
516                 return C.RMR_ERR_INITFAILED, ""
517         }
518
519         txBuffer.state = 0
520
521         m.contextMux.Lock()
522         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
523         m.contextMux.Unlock()
524
525         if currBuffer == nil {
526                 m.UpdateStatCounter("TransmitError")
527                 return m.LogMBufError("SendBuf failed", txBuffer), ""
528         }
529
530         if currBuffer.state != C.RMR_OK {
531                 counterName = "TransmitError"
532                 m.LogMBufError("SendBuf failed", currBuffer)
533         }
534
535         m.UpdateStatCounter(counterName)
536         defer m.Free(currBuffer)
537
538         cptr := unsafe.Pointer(currBuffer.payload)
539         payload := C.GoBytes(cptr, C.int(currBuffer.len))
540
541         return int(currBuffer.state), string(payload)
542 }
543
544 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
545         return m.Wh_open(target)
546 }
547
548 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
549         m.contextMux.Lock()
550         defer m.contextMux.Unlock()
551         endpoint := C.CString(target)
552         return C.rmr_wh_open(m.context, endpoint)
553 }
554
555 func (m *RMRClient) Closewh(whid int) {
556         m.Wh_close(C.rmr_whid_t(whid))
557 }
558
559 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
560         m.contextMux.Lock()
561         defer m.contextMux.Unlock()
562         C.rmr_wh_close(m.context, whid)
563 }
564
565 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
566         if params.status == int(C.RMR_ERR_RETRY) {
567                 return true
568         }
569         return false
570 }
571
572 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
573         if params.status == int(C.RMR_ERR_NOENDPT) {
574                 return true
575         }
576         return false
577 }
578
579 func (m *RMRClient) UpdateStatCounter(name string) {
580         m.mux.Lock()
581         m.statc[name].Inc()
582         m.mux.Unlock()
583 }
584
585 func (m *RMRClient) RegisterMetrics() {
586         m.statc = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
587         m.statg = Metric.RegisterGaugeGroup(RMRGaugeOpts, "RMR")
588 }
589
590 func (m *RMRClient) Wait() {
591         m.wg.Wait()
592 }
593
594 func (m *RMRClient) IsReady() bool {
595         return m.ready != 0
596 }
597
598 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
599         m.readyCb = cb
600         m.readyCbParams = params
601 }
602
603 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
604         id, ok := RICMessageTypes[name]
605         return id, ok
606 }
607
608 func (m *RMRClient) GetRicMessageName(id int) (s string) {
609         for k, v := range RICMessageTypes {
610                 if id == v {
611                         return k
612                 }
613         }
614         return
615 }
616
617 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
618         if mbuf != nil {
619                 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
620                 return int(mbuf.state)
621         }
622         Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
623         return 0
624 }