ef8aea5d8c50d32156c735b2126e8393bf3bdbea
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "github.com/spf13/viper"
71         "strings"
72         "time"
73         "unsafe"
74 )
75
76 var RMRCounterOpts = []CounterOpts{
77         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78         {Name: "Received", Help: "The total number of received RMR messages"},
79         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
81 }
82
83 var RMRErrors = map[int]string{
84         C.RMR_OK:             "state is good",
85         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
86         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
87         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
88         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
89         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90         C.RMR_ERR_CALLFAILED: "unable to send call() message",
91         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
92         C.RMR_ERR_WHID:       "wormhole id was invalid",
93         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
94         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
96         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
97         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
98         C.RMR_ERR_TRUNC:      "received message likely truncated",
99         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
101 }
102
103 //-----------------------------------------------------------------------------
104 //
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
107         Mtype      int
108         Payload    []byte
109         PayloadLen int
110         Meid       *RMRMeid
111         Xid        string
112         SubId      int
113         Src        string
114         Mbuf       *C.rmr_mbuf_t
115         Whid       int
116         Callid     int
117         Timeout    int
118         status     int
119 }
120
121 func (params *RMRParams) String() string {
122         var b bytes.Buffer
123         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
124         return b.String()
125 }
126
127 //-----------------------------------------------------------------------------
128 //
129 //-----------------------------------------------------------------------------
130 type RMRClientParams struct {
131         StatDesc string
132         RmrData  PortData
133 }
134
135 func (params *RMRClientParams) String() string {
136         return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
137                 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
138                 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
139 }
140
141 //-----------------------------------------------------------------------------
142 //
143 //-----------------------------------------------------------------------------
144 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
145         p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
146         m := C.int(params.RmrData.MaxSize)
147         c := C.int(params.RmrData.ThreadType)
148         defer C.free(unsafe.Pointer(p))
149         ctx := C.rmr_init(p, m, c)
150         if ctx == nil {
151                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
152         }
153
154         Logger.Info("new rmrClient with parameters: %s", params.String())
155
156         if params.RmrData.LowLatency {
157                 C.rmr_set_low_latency(ctx)
158         }
159         if params.RmrData.FastAck {
160                 C.rmr_set_fack(ctx)
161         }
162
163         return &RMRClient{
164                 context:           ctx,
165                 consumers:         make([]MessageConsumer, 0),
166                 stat:              Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
167                 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
168         }
169 }
170
171 func NewRMRClient() *RMRClient {
172         p := GetPortData("rmr-data")
173         if p.Port == 0 || viper.IsSet("rmr.protPort") {
174                 // Old xApp descriptor used, fallback to rmr section
175                 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
176                 p.MaxSize = viper.GetInt("rmr.maxSize")
177                 p.ThreadType = viper.GetInt("rmr.threadType")
178                 p.LowLatency = viper.GetBool("rmr.lowLatency")
179                 p.FastAck = viper.GetBool("rmr.fastAck")
180                 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
181         }
182
183         return NewRMRClientWithParams(
184                 &RMRClientParams{
185                         RmrData:  p,
186                         StatDesc: "RMR",
187                 })
188 }
189
190 func (m *RMRClient) Start(c MessageConsumer) {
191         if c != nil {
192                 m.consumers = append(m.consumers, c)
193         }
194
195         var counter int = 0
196         for {
197                 m.contextMux.Lock()
198                 m.ready = int(C.rmr_ready(m.context))
199                 m.contextMux.Unlock()
200                 if m.ready == 1 {
201                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
202                         break
203                 }
204                 if counter%10 == 0 {
205                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
206                 }
207                 time.Sleep(1 * time.Second)
208                 counter++
209         }
210         m.wg.Add(1)
211
212         if m.readyCb != nil {
213                 go m.readyCb(m.readyCbParams)
214         }
215
216         go func() {
217                 m.contextMux.Lock()
218                 rfd := C.rmr_get_rcvfd(m.context)
219                 m.contextMux.Unlock()
220                 efd := C.init_epoll(rfd)
221
222                 defer m.wg.Done()
223                 for {
224                         if int(C.wait_epoll(efd, rfd)) == 0 {
225                                 continue
226                         }
227                         m.contextMux.Lock()
228                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
229                         m.contextMux.Unlock()
230
231                         if rxBuffer == nil {
232                                 m.LogMBufError("RecvMsg failed", rxBuffer)
233                                 m.UpdateStatCounter("ReceiveError")
234                                 continue
235                         }
236                         m.UpdateStatCounter("Received")
237                         m.parseMessage(rxBuffer)
238                 }
239         }()
240
241         m.wg.Wait()
242 }
243
244 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
245         if len(m.consumers) == 0 {
246                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
247                 return
248         }
249
250         params := &RMRParams{}
251         params.Mbuf = rxBuffer
252         params.Mtype = int(rxBuffer.mtype)
253         params.SubId = int(rxBuffer.sub_id)
254         params.Meid = &RMRMeid{}
255
256         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
257         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
258                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
259         }
260
261         xidBuf := make([]byte, int(C.RMR_MAX_XID))
262         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
263                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
264         }
265
266         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
267         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
268                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
269         }
270
271         // Default case: a single consumer
272         if len(m.consumers) == 1 && m.consumers[0] != nil {
273                 params.PayloadLen = int(rxBuffer.len)
274                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
275                 err := m.consumers[0].Consume(params)
276                 if err != nil {
277                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
278                 }
279                 return
280         }
281
282         // Special case for multiple consumers
283         for _, c := range m.consumers {
284                 cptr := unsafe.Pointer(rxBuffer.payload)
285                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
286                 params.PayloadLen = int(rxBuffer.len)
287                 params.Mtype = int(rxBuffer.mtype)
288                 params.SubId = int(rxBuffer.sub_id)
289
290                 err := c.Consume(params)
291                 if err != nil {
292                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
293                 }
294         }
295 }
296
297 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
298         m.contextMux.Lock()
299         defer m.contextMux.Unlock()
300         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
301         if outbuf == nil {
302                 Logger.Error("rmrClient: Allocating message buffer failed!")
303         }
304         return outbuf
305 }
306
307 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
308         m.contextMux.Lock()
309         defer m.contextMux.Unlock()
310         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
311         if outbuf == nil {
312                 Logger.Error("rmrClient: Allocating message buffer failed!")
313         }
314         return outbuf
315 }
316
317 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
318         if mbuf == nil {
319                 return
320         }
321         m.contextMux.Lock()
322         defer m.contextMux.Unlock()
323         C.rmr_free_msg(mbuf)
324 }
325
326 func (m *RMRClient) SendMsg(params *RMRParams) bool {
327         return m.Send(params, false)
328 }
329
330 func (m *RMRClient) SendRts(params *RMRParams) bool {
331         return m.Send(params, true)
332 }
333
334 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
335         status := m.Send(params, isRts)
336         i := 0
337         for ; i < int(to)*2 && status == false; i++ {
338                 status = m.Send(params, isRts)
339                 if status == false {
340                         time.Sleep(500 * time.Millisecond)
341                 }
342         }
343         if status == false {
344                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
345                 if params.Mbuf != nil {
346                         m.Free(params.Mbuf)
347                         params.Mbuf = nil
348                 }
349         }
350         return
351 }
352
353 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
354
355         if params == nil {
356                 return nil
357         }
358
359         payLen := len(params.Payload)
360         if params.PayloadLen != 0 {
361                 payLen = params.PayloadLen
362         }
363
364         txBuffer := params.Mbuf
365         params.Mbuf = nil
366
367         if txBuffer != nil {
368                 txBuffer = m.ReAllocate(txBuffer, payLen)
369         } else {
370                 txBuffer = m.Allocate(payLen)
371         }
372
373         if txBuffer == nil {
374                 return nil
375         }
376         txBuffer.mtype = C.int(params.Mtype)
377         txBuffer.sub_id = C.int(params.SubId)
378         txBuffer.len = C.int(payLen)
379
380         datap := C.CBytes(params.Payload)
381         defer C.free(datap)
382
383         if params.Meid != nil {
384                 b := make([]byte, int(C.RMR_MAX_MEID))
385                 copy(b, []byte(params.Meid.RanName))
386                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
387         }
388
389         xidLen := len(params.Xid)
390         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
391                 b := make([]byte, int(C.RMR_MAX_XID))
392                 copy(b, []byte(params.Xid))
393                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
394         }
395
396         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
397
398         return txBuffer
399 }
400
401 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
402
403         txBuffer := m.CopyBuffer(params)
404         if txBuffer == nil {
405                 return false
406         }
407         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
408         if params.status == int(C.RMR_OK) {
409                 return true
410         }
411         return false
412 }
413
414 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
415         var (
416                 currBuffer *C.rmr_mbuf_t
417         )
418
419         m.contextMux.Lock()
420         txBuffer.state = 0
421         if whid != 0 {
422                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
423         } else {
424                 if isRts {
425                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
426                 } else {
427                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
428                 }
429         }
430         m.contextMux.Unlock()
431
432         if currBuffer == nil {
433                 m.UpdateStatCounter("TransmitError")
434                 return m.LogMBufError("SendBuf failed", txBuffer)
435         }
436
437         // Just quick retry seems to help for K8s issue
438         if m.maxRetryOnFailure == 0 {
439                 m.maxRetryOnFailure = 5
440         }
441
442         for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
443                 m.contextMux.Lock()
444                 if whid != 0 {
445                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
446                 } else {
447                         if isRts {
448                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
449                         } else {
450                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
451                         }
452                 }
453                 m.contextMux.Unlock()
454         }
455
456         if currBuffer == nil {
457                 m.UpdateStatCounter("TransmitError")
458                 m.LogMBufError("SendBuf failed", currBuffer)
459                 return int(C.RMR_ERR_INITFAILED)
460         }
461
462         if currBuffer.state != C.RMR_OK {
463                 m.UpdateStatCounter("TransmitError")
464                 m.LogMBufError("SendBuf failed", currBuffer)
465         } else {
466                 m.UpdateStatCounter("Transmitted")
467         }
468         defer m.Free(currBuffer)
469         return int(currBuffer.state)
470
471 }
472
473 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
474         var (
475                 currBuffer  *C.rmr_mbuf_t
476                 counterName string = "Transmitted"
477         )
478         txBuffer := m.CopyBuffer(params)
479         if txBuffer == nil {
480                 return C.RMR_ERR_INITFAILED, ""
481         }
482
483         txBuffer.state = 0
484
485         m.contextMux.Lock()
486         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
487         m.contextMux.Unlock()
488
489         if currBuffer == nil {
490                 m.UpdateStatCounter("TransmitError")
491                 return m.LogMBufError("SendBuf failed", txBuffer), ""
492         }
493
494         if currBuffer.state != C.RMR_OK {
495                 counterName = "TransmitError"
496                 m.LogMBufError("SendBuf failed", currBuffer)
497         }
498
499         m.UpdateStatCounter(counterName)
500         defer m.Free(currBuffer)
501
502         cptr := unsafe.Pointer(currBuffer.payload)
503         payload := C.GoBytes(cptr, C.int(currBuffer.len))
504
505         return int(currBuffer.state), string(payload)
506 }
507
508 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
509         return m.Wh_open(target)
510 }
511
512 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
513         m.contextMux.Lock()
514         defer m.contextMux.Unlock()
515         endpoint := C.CString(target)
516         return C.rmr_wh_open(m.context, endpoint)
517 }
518
519 func (m *RMRClient) Closewh(whid int) {
520         m.Wh_close(C.rmr_whid_t(whid))
521 }
522
523 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
524         m.contextMux.Lock()
525         defer m.contextMux.Unlock()
526         C.rmr_wh_close(m.context, whid)
527 }
528
529 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
530         if params.status == int(C.RMR_ERR_RETRY) {
531                 return true
532         }
533         return false
534 }
535
536 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
537         if params.status == int(C.RMR_ERR_NOENDPT) {
538                 return true
539         }
540         return false
541 }
542
543 func (m *RMRClient) UpdateStatCounter(name string) {
544         m.mux.Lock()
545         m.stat[name].Inc()
546         m.mux.Unlock()
547 }
548
549 func (m *RMRClient) RegisterMetrics() {
550         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
551 }
552
553 func (m *RMRClient) Wait() {
554         m.wg.Wait()
555 }
556
557 func (m *RMRClient) IsReady() bool {
558         return m.ready != 0
559 }
560
561 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
562         m.readyCb = cb
563         m.readyCbParams = params
564 }
565
566 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
567         id, ok := RICMessageTypes[name]
568         return id, ok
569 }
570
571 func (m *RMRClient) GetRicMessageName(id int) (s string) {
572         for k, v := range RICMessageTypes {
573                 if id == v {
574                         return k
575                 }
576         }
577         return
578 }
579
580 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
581         if mbuf != nil {
582                 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
583                 return int(mbuf.state)
584         }
585         Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
586         return 0
587 }