f2d09dacc63be5606904e2d58296be0ca4c3ec90
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "strings"
71         "time"
72         "unsafe"
73 )
74
75 var RMRCounterOpts = []CounterOpts{
76         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
77         {Name: "Received", Help: "The total number of received RMR messages"},
78         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
79         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
80 }
81
82 var RMRErrors = map[int]string{
83         C.RMR_OK:             "state is good",
84         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
85         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
86         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
87         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
88         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
89         C.RMR_ERR_CALLFAILED: "unable to send call() message",
90         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
91         C.RMR_ERR_WHID:       "wormhole id was invalid",
92         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
93         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
94         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
95         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
96         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
97         C.RMR_ERR_TRUNC:      "received message likely truncated",
98         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
99         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
100 }
101
102 //-----------------------------------------------------------------------------
103 //
104 //-----------------------------------------------------------------------------
105 type RMRParams struct {
106         Mtype      int
107         Payload    []byte
108         PayloadLen int
109         Meid       *RMRMeid
110         Xid        string
111         SubId      int
112         Src        string
113         Mbuf       *C.rmr_mbuf_t
114         Whid       int
115         Callid     int
116         Timeout    int
117         status     int
118 }
119
120 func (params *RMRParams) String() string {
121         var b bytes.Buffer
122         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
123         return b.String()
124 }
125
126 //-----------------------------------------------------------------------------
127 //
128 //-----------------------------------------------------------------------------
129 type RMRClientParams struct {
130         StatDesc string
131         RmrData  PortData
132 }
133
134 func (params *RMRClientParams) String() string {
135         return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
136                 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc, params.RmrData.LowLatency, params.RmrData.FastAck)
137 }
138
139 //-----------------------------------------------------------------------------
140 //
141 //-----------------------------------------------------------------------------
142 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
143         p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
144         m := C.int(params.RmrData.MaxSize)
145         c := C.int(params.RmrData.ThreadType)
146         defer C.free(unsafe.Pointer(p))
147         ctx := C.rmr_init(p, m, c)
148         if ctx == nil {
149                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
150         }
151
152         Logger.Info("new rmrClient with parameters: %s", params.String())
153
154         if params.RmrData.LowLatency {
155                 C.rmr_set_low_latency(ctx)
156         }
157         if params.RmrData.FastAck {
158                 C.rmr_set_fack(ctx)
159         }
160
161         return &RMRClient{
162                 context:           ctx,
163                 consumers:         make([]MessageConsumer, 0),
164                 stat:              Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
165                 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
166         }
167 }
168
169 func NewRMRClient() *RMRClient {
170         p := GetPortData("rmr-data")
171         return NewRMRClientWithParams(
172                 &RMRClientParams{
173                         RmrData:  p,
174                         StatDesc: "RMR",
175                 })
176 }
177
178 func (m *RMRClient) Start(c MessageConsumer) {
179         if c != nil {
180                 m.consumers = append(m.consumers, c)
181         }
182
183         var counter int = 0
184         for {
185                 m.contextMux.Lock()
186                 m.ready = int(C.rmr_ready(m.context))
187                 m.contextMux.Unlock()
188                 if m.ready == 1 {
189                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
190                         break
191                 }
192                 if counter%10 == 0 {
193                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
194                 }
195                 time.Sleep(1 * time.Second)
196                 counter++
197         }
198         m.wg.Add(1)
199
200         if m.readyCb != nil {
201                 go m.readyCb(m.readyCbParams)
202         }
203
204         go func() {
205                 m.contextMux.Lock()
206                 rfd := C.rmr_get_rcvfd(m.context)
207                 m.contextMux.Unlock()
208                 efd := C.init_epoll(rfd)
209
210                 defer m.wg.Done()
211                 for {
212                         if int(C.wait_epoll(efd, rfd)) == 0 {
213                                 continue
214                         }
215                         m.contextMux.Lock()
216                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
217                         m.contextMux.Unlock()
218
219                         if rxBuffer == nil {
220                                 m.LogMBufError("RecvMsg failed", rxBuffer)
221                                 m.UpdateStatCounter("ReceiveError")
222                                 continue
223                         }
224                         m.UpdateStatCounter("Received")
225                         m.parseMessage(rxBuffer)
226                 }
227         }()
228
229         m.wg.Wait()
230 }
231
232 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
233         if len(m.consumers) == 0 {
234                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
235                 return
236         }
237
238         params := &RMRParams{}
239         params.Mbuf = rxBuffer
240         params.Mtype = int(rxBuffer.mtype)
241         params.SubId = int(rxBuffer.sub_id)
242         params.Meid = &RMRMeid{}
243
244         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
245         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
246                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
247         }
248
249         xidBuf := make([]byte, int(C.RMR_MAX_XID))
250         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
251                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
252         }
253
254         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
255         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
256                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
257         }
258
259         // Default case: a single consumer
260         if len(m.consumers) == 1 && m.consumers[0] != nil {
261                 params.PayloadLen = int(rxBuffer.len)
262                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
263                 err := m.consumers[0].Consume(params)
264                 if err != nil {
265                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
266                 }
267                 return
268         }
269
270         // Special case for multiple consumers
271         for _, c := range m.consumers {
272                 cptr := unsafe.Pointer(rxBuffer.payload)
273                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
274                 params.PayloadLen = int(rxBuffer.len)
275                 params.Mtype = int(rxBuffer.mtype)
276                 params.SubId = int(rxBuffer.sub_id)
277
278                 err := c.Consume(params)
279                 if err != nil {
280                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
281                 }
282         }
283 }
284
285 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
286         m.contextMux.Lock()
287         defer m.contextMux.Unlock()
288         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
289         if outbuf == nil {
290                 Logger.Error("rmrClient: Allocating message buffer failed!")
291         }
292         return outbuf
293 }
294
295 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
296         m.contextMux.Lock()
297         defer m.contextMux.Unlock()
298         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
299         if outbuf == nil {
300                 Logger.Error("rmrClient: Allocating message buffer failed!")
301         }
302         return outbuf
303 }
304
305 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
306         if mbuf == nil {
307                 return
308         }
309         m.contextMux.Lock()
310         defer m.contextMux.Unlock()
311         C.rmr_free_msg(mbuf)
312 }
313
314 func (m *RMRClient) SendMsg(params *RMRParams) bool {
315         return m.Send(params, false)
316 }
317
318 func (m *RMRClient) SendRts(params *RMRParams) bool {
319         return m.Send(params, true)
320 }
321
322 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
323         status := m.Send(params, isRts)
324         i := 0
325         for ; i < int(to)*2 && status == false; i++ {
326                 status = m.Send(params, isRts)
327                 if status == false {
328                         time.Sleep(500 * time.Millisecond)
329                 }
330         }
331         if status == false {
332                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
333                 if params.Mbuf != nil {
334                         m.Free(params.Mbuf)
335                         params.Mbuf = nil
336                 }
337         }
338         return
339 }
340
341 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
342
343         if params == nil {
344                 return nil
345         }
346
347         payLen := len(params.Payload)
348         if params.PayloadLen != 0 {
349                 payLen = params.PayloadLen
350         }
351
352         txBuffer := params.Mbuf
353         params.Mbuf = nil
354
355         if txBuffer != nil {
356                 txBuffer = m.ReAllocate(txBuffer, payLen)
357         } else {
358                 txBuffer = m.Allocate(payLen)
359         }
360
361         if txBuffer == nil {
362                 return nil
363         }
364         txBuffer.mtype = C.int(params.Mtype)
365         txBuffer.sub_id = C.int(params.SubId)
366         txBuffer.len = C.int(payLen)
367
368         datap := C.CBytes(params.Payload)
369         defer C.free(datap)
370
371         if params.Meid != nil {
372                 b := make([]byte, int(C.RMR_MAX_MEID))
373                 copy(b, []byte(params.Meid.RanName))
374                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
375         }
376
377         xidLen := len(params.Xid)
378         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
379                 b := make([]byte, int(C.RMR_MAX_XID))
380                 copy(b, []byte(params.Xid))
381                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
382         }
383
384         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
385
386         return txBuffer
387 }
388
389 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
390
391         txBuffer := m.CopyBuffer(params)
392         if txBuffer == nil {
393                 return false
394         }
395         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
396         if params.status == int(C.RMR_OK) {
397                 return true
398         }
399         return false
400 }
401
402 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
403         var (
404                 currBuffer *C.rmr_mbuf_t
405         )
406
407         m.contextMux.Lock()
408         txBuffer.state = 0
409         if whid != 0 {
410                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
411         } else {
412                 if isRts {
413                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
414                 } else {
415                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
416                 }
417         }
418         m.contextMux.Unlock()
419
420         if currBuffer == nil {
421                 m.UpdateStatCounter("TransmitError")
422                 return m.LogMBufError("SendBuf failed", txBuffer)
423         }
424
425         // Just quick retry seems to help for K8s issue
426         if m.maxRetryOnFailure == 0 {
427                 m.maxRetryOnFailure = 5
428         }
429
430         for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
431                 m.contextMux.Lock()
432                 if whid != 0 {
433                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
434                 } else {
435                         if isRts {
436                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
437                         } else {
438                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
439                         }
440                 }
441                 m.contextMux.Unlock()
442         }
443
444         if currBuffer == nil {
445                 m.UpdateStatCounter("TransmitError")
446                 m.LogMBufError("SendBuf failed", currBuffer)
447                 return int(C.RMR_ERR_INITFAILED)
448         }
449
450         if currBuffer.state != C.RMR_OK {
451                 m.UpdateStatCounter("TransmitError")
452                 m.LogMBufError("SendBuf failed", currBuffer)
453         } else {
454                 m.UpdateStatCounter("Transmitted")
455         }
456         defer m.Free(currBuffer)
457         return int(currBuffer.state)
458
459 }
460
461 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
462         var (
463                 currBuffer  *C.rmr_mbuf_t
464                 counterName string = "Transmitted"
465         )
466         txBuffer := m.CopyBuffer(params)
467         if txBuffer == nil {
468                 return C.RMR_ERR_INITFAILED, ""
469         }
470
471         txBuffer.state = 0
472
473         m.contextMux.Lock()
474         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
475         m.contextMux.Unlock()
476
477         if currBuffer == nil {
478                 m.UpdateStatCounter("TransmitError")
479                 return m.LogMBufError("SendBuf failed", txBuffer), ""
480         }
481
482         if currBuffer.state != C.RMR_OK {
483                 counterName = "TransmitError"
484                 m.LogMBufError("SendBuf failed", currBuffer)
485         }
486
487         m.UpdateStatCounter(counterName)
488         defer m.Free(currBuffer)
489
490         cptr := unsafe.Pointer(currBuffer.payload)
491         payload := C.GoBytes(cptr, C.int(currBuffer.len))
492
493         return int(currBuffer.state), string(payload)
494 }
495
496 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
497         return m.Wh_open(target)
498 }
499
500 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
501         m.contextMux.Lock()
502         defer m.contextMux.Unlock()
503         endpoint := C.CString(target)
504         return C.rmr_wh_open(m.context, endpoint)
505 }
506
507 func (m *RMRClient) Closewh(whid int) {
508         m.Wh_close(C.rmr_whid_t(whid))
509 }
510
511 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
512         m.contextMux.Lock()
513         defer m.contextMux.Unlock()
514         C.rmr_wh_close(m.context, whid)
515 }
516
517 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
518         if params.status == int(C.RMR_ERR_RETRY) {
519                 return true
520         }
521         return false
522 }
523
524 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
525         if params.status == int(C.RMR_ERR_NOENDPT) {
526                 return true
527         }
528         return false
529 }
530
531 func (m *RMRClient) UpdateStatCounter(name string) {
532         m.mux.Lock()
533         m.stat[name].Inc()
534         m.mux.Unlock()
535 }
536
537 func (m *RMRClient) RegisterMetrics() {
538         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
539 }
540
541 func (m *RMRClient) Wait() {
542         m.wg.Wait()
543 }
544
545 func (m *RMRClient) IsReady() bool {
546         return m.ready != 0
547 }
548
549 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
550         m.readyCb = cb
551         m.readyCbParams = params
552 }
553
554 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
555         id, ok := RICMessageTypes[name]
556         return id, ok
557 }
558
559 func (m *RMRClient) GetRicMessageName(id int) (s string) {
560         for k, v := range RICMessageTypes {
561                 if id == v {
562                         return k
563                 }
564         }
565         return
566 }
567
568 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
569         if mbuf != nil {
570                 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
571                 return int(mbuf.state)
572         }
573         Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
574         return 0
575 }