9342828c8482ef2174399bfb4c090d302fc178fa
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "strings"
71         "time"
72         "unsafe"
73
74         "github.com/spf13/viper"
75 )
76
77 var RMRCounterOpts = []CounterOpts{
78         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
79         {Name: "Received", Help: "The total number of received RMR messages"},
80         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
81         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
82 }
83
84 var RMRErrors = map[int]string{
85         C.RMR_OK:             "state is good",
86         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
87         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
88         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
89         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
90         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
91         C.RMR_ERR_CALLFAILED: "unable to send call() message",
92         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
93         C.RMR_ERR_WHID:       "wormhole id was invalid",
94         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
95         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
96         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
97         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
98         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
99         C.RMR_ERR_TRUNC:      "received message likely truncated",
100         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
101         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
102 }
103
104 //-----------------------------------------------------------------------------
105 //
106 //-----------------------------------------------------------------------------
107 type RMRParams struct {
108         Mtype      int
109         Payload    []byte
110         PayloadLen int
111         Meid       *RMRMeid
112         Xid        string
113         SubId      int
114         Src        string
115         Mbuf       *C.rmr_mbuf_t
116         Whid       int
117         Callid     int
118         Timeout    int
119         status     int
120 }
121
122 func (params *RMRParams) String() string {
123         var b bytes.Buffer
124         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
125         return b.String()
126 }
127
128 //-----------------------------------------------------------------------------
129 //
130 //-----------------------------------------------------------------------------
131 type RMRClientParams struct {
132         StatDesc string
133         RmrData  PortData
134 }
135
136 func (params *RMRClientParams) String() string {
137         return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
138                 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
139                 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
140 }
141
142 //-----------------------------------------------------------------------------
143 //
144 //-----------------------------------------------------------------------------
145 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
146         p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
147         m := C.int(params.RmrData.MaxSize)
148         c := C.int(params.RmrData.ThreadType)
149         defer C.free(unsafe.Pointer(p))
150         ctx := C.rmr_init(p, m, c)
151         if ctx == nil {
152                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
153         }
154
155         Logger.Info("new rmrClient with parameters: %s", params.String())
156
157         if params.RmrData.LowLatency {
158                 C.rmr_set_low_latency(ctx)
159         }
160         if params.RmrData.FastAck {
161                 C.rmr_set_fack(ctx)
162         }
163
164         return &RMRClient{
165                 context:           ctx,
166                 consumers:         make([]MessageConsumer, 0),
167                 stat:              Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
168                 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
169         }
170 }
171
172 func NewRMRClient() *RMRClient {
173         p := GetPortData("rmrdata")
174         if p.Port == 0 || viper.IsSet("rmr.protPort") {
175                 // Old xApp descriptor used, fallback to rmr section
176                 fmt.Sscanf(viper.GetString("rmr.protPort"), "tcp:%d", &p.Port)
177                 p.MaxSize = viper.GetInt("rmr.maxSize")
178                 p.ThreadType = viper.GetInt("rmr.threadType")
179                 p.LowLatency = viper.GetBool("rmr.lowLatency")
180                 p.FastAck = viper.GetBool("rmr.fastAck")
181                 p.MaxRetryOnFailure = viper.GetInt("rmr.maxRetryOnFailure")
182         }
183
184         return NewRMRClientWithParams(
185                 &RMRClientParams{
186                         RmrData:  p,
187                         StatDesc: "RMR",
188                 })
189 }
190
191 func (m *RMRClient) Start(c MessageConsumer) {
192         if c != nil {
193                 m.consumers = append(m.consumers, c)
194         }
195
196         var counter int = 0
197         for {
198                 m.contextMux.Lock()
199                 m.ready = int(C.rmr_ready(m.context))
200                 m.contextMux.Unlock()
201                 if m.ready == 1 {
202                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
203                         break
204                 }
205                 if counter%10 == 0 {
206                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
207                 }
208                 time.Sleep(1 * time.Second)
209                 counter++
210         }
211         m.wg.Add(1)
212
213         if m.readyCb != nil {
214                 go m.readyCb(m.readyCbParams)
215         }
216
217         go func() {
218                 m.contextMux.Lock()
219                 rfd := C.rmr_get_rcvfd(m.context)
220                 m.contextMux.Unlock()
221                 efd := C.init_epoll(rfd)
222
223                 defer m.wg.Done()
224                 for {
225                         if int(C.wait_epoll(efd, rfd)) == 0 {
226                                 continue
227                         }
228                         m.contextMux.Lock()
229                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
230                         m.contextMux.Unlock()
231
232                         if rxBuffer == nil {
233                                 m.LogMBufError("RecvMsg failed", rxBuffer)
234                                 m.UpdateStatCounter("ReceiveError")
235                                 continue
236                         }
237                         m.UpdateStatCounter("Received")
238                         m.parseMessage(rxBuffer)
239                 }
240         }()
241
242         m.wg.Wait()
243 }
244
245 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
246         if len(m.consumers) == 0 {
247                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
248                 return
249         }
250
251         params := &RMRParams{}
252         params.Mbuf = rxBuffer
253         params.Mtype = int(rxBuffer.mtype)
254         params.SubId = int(rxBuffer.sub_id)
255         params.Meid = &RMRMeid{}
256
257         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
258         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
259                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
260         }
261
262         xidBuf := make([]byte, int(C.RMR_MAX_XID))
263         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
264                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
265         }
266
267         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
268         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
269                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
270         }
271
272         // Default case: a single consumer
273         if len(m.consumers) == 1 && m.consumers[0] != nil {
274                 params.PayloadLen = int(rxBuffer.len)
275                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
276                 err := m.consumers[0].Consume(params)
277                 if err != nil {
278                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
279                 }
280                 return
281         }
282
283         /*
284                 // Special case for multiple consumers
285                 for _, c := range m.consumers {
286                         cptr := unsafe.Pointer(rxBuffer.payload)
287                         params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
288                         params.PayloadLen = int(rxBuffer.len)
289                         params.Mtype = int(rxBuffer.mtype)
290                         params.SubId = int(rxBuffer.sub_id)
291
292                         err := c.Consume(params)
293                         if err != nil {
294                                 Logger.Warn("rmrClient: Consumer returned error: %v", err)
295                         }
296                 }
297         */
298 }
299
300 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
301         m.contextMux.Lock()
302         defer m.contextMux.Unlock()
303         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
304         if outbuf == nil {
305                 Logger.Error("rmrClient: Allocating message buffer failed!")
306         }
307         return outbuf
308 }
309
310 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
311         m.contextMux.Lock()
312         defer m.contextMux.Unlock()
313         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
314         if outbuf == nil {
315                 Logger.Error("rmrClient: Allocating message buffer failed!")
316         }
317         return outbuf
318 }
319
320 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
321         if mbuf == nil {
322                 return
323         }
324         m.contextMux.Lock()
325         defer m.contextMux.Unlock()
326         C.rmr_free_msg(mbuf)
327 }
328
329 func (m *RMRClient) SendMsg(params *RMRParams) bool {
330         return m.Send(params, false)
331 }
332
333 func (m *RMRClient) SendRts(params *RMRParams) bool {
334         return m.Send(params, true)
335 }
336
337 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
338         status := m.Send(params, isRts)
339         i := 0
340         for ; i < int(to)*2 && status == false; i++ {
341                 status = m.Send(params, isRts)
342                 if status == false {
343                         time.Sleep(500 * time.Millisecond)
344                 }
345         }
346         if status == false {
347                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
348                 if params.Mbuf != nil {
349                         m.Free(params.Mbuf)
350                         params.Mbuf = nil
351                 }
352         }
353         return
354 }
355
356 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
357
358         if params == nil {
359                 return nil
360         }
361
362         payLen := len(params.Payload)
363         if params.PayloadLen != 0 {
364                 payLen = params.PayloadLen
365         }
366
367         txBuffer := params.Mbuf
368         params.Mbuf = nil
369
370         if txBuffer != nil {
371                 txBuffer = m.ReAllocate(txBuffer, payLen)
372         } else {
373                 txBuffer = m.Allocate(payLen)
374         }
375
376         if txBuffer == nil {
377                 return nil
378         }
379         txBuffer.mtype = C.int(params.Mtype)
380         txBuffer.sub_id = C.int(params.SubId)
381         txBuffer.len = C.int(payLen)
382
383         datap := C.CBytes(params.Payload)
384         defer C.free(datap)
385
386         if params.Meid != nil {
387                 b := make([]byte, int(C.RMR_MAX_MEID))
388                 copy(b, []byte(params.Meid.RanName))
389                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
390         }
391
392         xidLen := len(params.Xid)
393         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
394                 b := make([]byte, int(C.RMR_MAX_XID))
395                 copy(b, []byte(params.Xid))
396                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
397         }
398
399         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
400
401         return txBuffer
402 }
403
404 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
405
406         txBuffer := m.CopyBuffer(params)
407         if txBuffer == nil {
408                 return false
409         }
410         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
411         if params.status == int(C.RMR_OK) {
412                 return true
413         }
414         return false
415 }
416
417 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
418         var (
419                 currBuffer *C.rmr_mbuf_t
420         )
421
422         m.contextMux.Lock()
423         txBuffer.state = 0
424         if whid != 0 {
425                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
426         } else {
427                 if isRts {
428                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
429                 } else {
430                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
431                 }
432         }
433         m.contextMux.Unlock()
434
435         if currBuffer == nil {
436                 m.UpdateStatCounter("TransmitError")
437                 return m.LogMBufError("SendBuf failed", txBuffer)
438         }
439
440         // Just quick retry seems to help for K8s issue
441         if m.maxRetryOnFailure == 0 {
442                 m.maxRetryOnFailure = 5
443         }
444
445         for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
446                 m.contextMux.Lock()
447                 if whid != 0 {
448                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
449                 } else {
450                         if isRts {
451                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
452                         } else {
453                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
454                         }
455                 }
456                 m.contextMux.Unlock()
457         }
458
459         if currBuffer == nil {
460                 m.UpdateStatCounter("TransmitError")
461                 m.LogMBufError("SendBuf failed", currBuffer)
462                 return int(C.RMR_ERR_INITFAILED)
463         }
464
465         if currBuffer.state != C.RMR_OK {
466                 m.UpdateStatCounter("TransmitError")
467                 m.LogMBufError("SendBuf failed", currBuffer)
468         } else {
469                 m.UpdateStatCounter("Transmitted")
470         }
471         defer m.Free(currBuffer)
472         return int(currBuffer.state)
473
474 }
475
476 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
477         var (
478                 currBuffer  *C.rmr_mbuf_t
479                 counterName string = "Transmitted"
480         )
481         txBuffer := m.CopyBuffer(params)
482         if txBuffer == nil {
483                 return C.RMR_ERR_INITFAILED, ""
484         }
485
486         txBuffer.state = 0
487
488         m.contextMux.Lock()
489         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
490         m.contextMux.Unlock()
491
492         if currBuffer == nil {
493                 m.UpdateStatCounter("TransmitError")
494                 return m.LogMBufError("SendBuf failed", txBuffer), ""
495         }
496
497         if currBuffer.state != C.RMR_OK {
498                 counterName = "TransmitError"
499                 m.LogMBufError("SendBuf failed", currBuffer)
500         }
501
502         m.UpdateStatCounter(counterName)
503         defer m.Free(currBuffer)
504
505         cptr := unsafe.Pointer(currBuffer.payload)
506         payload := C.GoBytes(cptr, C.int(currBuffer.len))
507
508         return int(currBuffer.state), string(payload)
509 }
510
511 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
512         return m.Wh_open(target)
513 }
514
515 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
516         m.contextMux.Lock()
517         defer m.contextMux.Unlock()
518         endpoint := C.CString(target)
519         return C.rmr_wh_open(m.context, endpoint)
520 }
521
522 func (m *RMRClient) Closewh(whid int) {
523         m.Wh_close(C.rmr_whid_t(whid))
524 }
525
526 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
527         m.contextMux.Lock()
528         defer m.contextMux.Unlock()
529         C.rmr_wh_close(m.context, whid)
530 }
531
532 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
533         if params.status == int(C.RMR_ERR_RETRY) {
534                 return true
535         }
536         return false
537 }
538
539 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
540         if params.status == int(C.RMR_ERR_NOENDPT) {
541                 return true
542         }
543         return false
544 }
545
546 func (m *RMRClient) UpdateStatCounter(name string) {
547         m.mux.Lock()
548         m.stat[name].Inc()
549         m.mux.Unlock()
550 }
551
552 func (m *RMRClient) RegisterMetrics() {
553         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
554 }
555
556 func (m *RMRClient) Wait() {
557         m.wg.Wait()
558 }
559
560 func (m *RMRClient) IsReady() bool {
561         return m.ready != 0
562 }
563
564 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
565         m.readyCb = cb
566         m.readyCbParams = params
567 }
568
569 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
570         id, ok := RICMessageTypes[name]
571         return id, ok
572 }
573
574 func (m *RMRClient) GetRicMessageName(id int) (s string) {
575         for k, v := range RICMessageTypes {
576                 if id == v {
577                         return k
578                 }
579         }
580         return
581 }
582
583 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
584         if mbuf != nil {
585                 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
586                 return int(mbuf.state)
587         }
588         Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
589         return 0
590 }