94070242c02ab2ab52604035f630ed1cf70e1a68
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "strings"
71         "time"
72         "unsafe"
73 )
74
75 var RMRCounterOpts = []CounterOpts{
76         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
77         {Name: "Received", Help: "The total number of received RMR messages"},
78         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
79         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
80 }
81
82 var RMRErrors = map[int]string{
83         C.RMR_OK:             "state is good",
84         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
85         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
86         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
87         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
88         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
89         C.RMR_ERR_CALLFAILED: "unable to send call() message",
90         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
91         C.RMR_ERR_WHID:       "wormhole id was invalid",
92         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
93         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
94         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
95         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
96         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
97         C.RMR_ERR_TRUNC:      "received message likely truncated",
98         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
99         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
100 }
101
102 //-----------------------------------------------------------------------------
103 //
104 //-----------------------------------------------------------------------------
105 type RMRParams struct {
106         Mtype      int
107         Payload    []byte
108         PayloadLen int
109         Meid       *RMRMeid
110         Xid        string
111         SubId      int
112         Src        string
113         Mbuf       *C.rmr_mbuf_t
114         Whid       int
115         Callid     int
116         Timeout    int
117         status     int
118 }
119
120 func (params *RMRParams) String() string {
121         var b bytes.Buffer
122         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
123         return b.String()
124 }
125
126 //-----------------------------------------------------------------------------
127 //
128 //-----------------------------------------------------------------------------
129 type RMRClientParams struct {
130         StatDesc string
131         RmrData  PortData
132 }
133
134 func (params *RMRClientParams) String() string {
135         return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t Policies=%v",
136                 params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc,
137                 params.RmrData.LowLatency, params.RmrData.FastAck, params.RmrData.Policies)
138 }
139
140 //-----------------------------------------------------------------------------
141 //
142 //-----------------------------------------------------------------------------
143 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
144         p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
145         m := C.int(params.RmrData.MaxSize)
146         c := C.int(params.RmrData.ThreadType)
147         defer C.free(unsafe.Pointer(p))
148         ctx := C.rmr_init(p, m, c)
149         if ctx == nil {
150                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
151         }
152
153         Logger.Info("new rmrClient with parameters: %s", params.String())
154
155         if params.RmrData.LowLatency {
156                 C.rmr_set_low_latency(ctx)
157         }
158         if params.RmrData.FastAck {
159                 C.rmr_set_fack(ctx)
160         }
161
162         return &RMRClient{
163                 context:           ctx,
164                 consumers:         make([]MessageConsumer, 0),
165                 stat:              Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
166                 maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
167         }
168 }
169
170 func NewRMRClient() *RMRClient {
171         p := GetPortData("rmr-data")
172         return NewRMRClientWithParams(
173                 &RMRClientParams{
174                         RmrData:  p,
175                         StatDesc: "RMR",
176                 })
177 }
178
179 func (m *RMRClient) Start(c MessageConsumer) {
180         if c != nil {
181                 m.consumers = append(m.consumers, c)
182         }
183
184         var counter int = 0
185         for {
186                 m.contextMux.Lock()
187                 m.ready = int(C.rmr_ready(m.context))
188                 m.contextMux.Unlock()
189                 if m.ready == 1 {
190                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
191                         break
192                 }
193                 if counter%10 == 0 {
194                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
195                 }
196                 time.Sleep(1 * time.Second)
197                 counter++
198         }
199         m.wg.Add(1)
200
201         if m.readyCb != nil {
202                 go m.readyCb(m.readyCbParams)
203         }
204
205         go func() {
206                 m.contextMux.Lock()
207                 rfd := C.rmr_get_rcvfd(m.context)
208                 m.contextMux.Unlock()
209                 efd := C.init_epoll(rfd)
210
211                 defer m.wg.Done()
212                 for {
213                         if int(C.wait_epoll(efd, rfd)) == 0 {
214                                 continue
215                         }
216                         m.contextMux.Lock()
217                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
218                         m.contextMux.Unlock()
219
220                         if rxBuffer == nil {
221                                 m.LogMBufError("RecvMsg failed", rxBuffer)
222                                 m.UpdateStatCounter("ReceiveError")
223                                 continue
224                         }
225                         m.UpdateStatCounter("Received")
226                         m.parseMessage(rxBuffer)
227                 }
228         }()
229
230         m.wg.Wait()
231 }
232
233 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
234         if len(m.consumers) == 0 {
235                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
236                 return
237         }
238
239         params := &RMRParams{}
240         params.Mbuf = rxBuffer
241         params.Mtype = int(rxBuffer.mtype)
242         params.SubId = int(rxBuffer.sub_id)
243         params.Meid = &RMRMeid{}
244
245         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
246         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
247                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
248         }
249
250         xidBuf := make([]byte, int(C.RMR_MAX_XID))
251         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
252                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
253         }
254
255         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
256         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
257                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
258         }
259
260         // Default case: a single consumer
261         if len(m.consumers) == 1 && m.consumers[0] != nil {
262                 params.PayloadLen = int(rxBuffer.len)
263                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
264                 err := m.consumers[0].Consume(params)
265                 if err != nil {
266                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
267                 }
268                 return
269         }
270
271         // Special case for multiple consumers
272         for _, c := range m.consumers {
273                 cptr := unsafe.Pointer(rxBuffer.payload)
274                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
275                 params.PayloadLen = int(rxBuffer.len)
276                 params.Mtype = int(rxBuffer.mtype)
277                 params.SubId = int(rxBuffer.sub_id)
278
279                 err := c.Consume(params)
280                 if err != nil {
281                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
282                 }
283         }
284 }
285
286 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
287         m.contextMux.Lock()
288         defer m.contextMux.Unlock()
289         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
290         if outbuf == nil {
291                 Logger.Error("rmrClient: Allocating message buffer failed!")
292         }
293         return outbuf
294 }
295
296 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
297         m.contextMux.Lock()
298         defer m.contextMux.Unlock()
299         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
300         if outbuf == nil {
301                 Logger.Error("rmrClient: Allocating message buffer failed!")
302         }
303         return outbuf
304 }
305
306 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
307         if mbuf == nil {
308                 return
309         }
310         m.contextMux.Lock()
311         defer m.contextMux.Unlock()
312         C.rmr_free_msg(mbuf)
313 }
314
315 func (m *RMRClient) SendMsg(params *RMRParams) bool {
316         return m.Send(params, false)
317 }
318
319 func (m *RMRClient) SendRts(params *RMRParams) bool {
320         return m.Send(params, true)
321 }
322
323 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
324         status := m.Send(params, isRts)
325         i := 0
326         for ; i < int(to)*2 && status == false; i++ {
327                 status = m.Send(params, isRts)
328                 if status == false {
329                         time.Sleep(500 * time.Millisecond)
330                 }
331         }
332         if status == false {
333                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
334                 if params.Mbuf != nil {
335                         m.Free(params.Mbuf)
336                         params.Mbuf = nil
337                 }
338         }
339         return
340 }
341
342 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
343
344         if params == nil {
345                 return nil
346         }
347
348         payLen := len(params.Payload)
349         if params.PayloadLen != 0 {
350                 payLen = params.PayloadLen
351         }
352
353         txBuffer := params.Mbuf
354         params.Mbuf = nil
355
356         if txBuffer != nil {
357                 txBuffer = m.ReAllocate(txBuffer, payLen)
358         } else {
359                 txBuffer = m.Allocate(payLen)
360         }
361
362         if txBuffer == nil {
363                 return nil
364         }
365         txBuffer.mtype = C.int(params.Mtype)
366         txBuffer.sub_id = C.int(params.SubId)
367         txBuffer.len = C.int(payLen)
368
369         datap := C.CBytes(params.Payload)
370         defer C.free(datap)
371
372         if params.Meid != nil {
373                 b := make([]byte, int(C.RMR_MAX_MEID))
374                 copy(b, []byte(params.Meid.RanName))
375                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
376         }
377
378         xidLen := len(params.Xid)
379         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
380                 b := make([]byte, int(C.RMR_MAX_XID))
381                 copy(b, []byte(params.Xid))
382                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
383         }
384
385         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
386
387         return txBuffer
388 }
389
390 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
391
392         txBuffer := m.CopyBuffer(params)
393         if txBuffer == nil {
394                 return false
395         }
396         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
397         if params.status == int(C.RMR_OK) {
398                 return true
399         }
400         return false
401 }
402
403 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
404         var (
405                 currBuffer *C.rmr_mbuf_t
406         )
407
408         m.contextMux.Lock()
409         txBuffer.state = 0
410         if whid != 0 {
411                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
412         } else {
413                 if isRts {
414                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
415                 } else {
416                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
417                 }
418         }
419         m.contextMux.Unlock()
420
421         if currBuffer == nil {
422                 m.UpdateStatCounter("TransmitError")
423                 return m.LogMBufError("SendBuf failed", txBuffer)
424         }
425
426         // Just quick retry seems to help for K8s issue
427         if m.maxRetryOnFailure == 0 {
428                 m.maxRetryOnFailure = 5
429         }
430
431         for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
432                 m.contextMux.Lock()
433                 if whid != 0 {
434                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
435                 } else {
436                         if isRts {
437                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
438                         } else {
439                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
440                         }
441                 }
442                 m.contextMux.Unlock()
443         }
444
445         if currBuffer == nil {
446                 m.UpdateStatCounter("TransmitError")
447                 m.LogMBufError("SendBuf failed", currBuffer)
448                 return int(C.RMR_ERR_INITFAILED)
449         }
450
451         if currBuffer.state != C.RMR_OK {
452                 m.UpdateStatCounter("TransmitError")
453                 m.LogMBufError("SendBuf failed", currBuffer)
454         } else {
455                 m.UpdateStatCounter("Transmitted")
456         }
457         defer m.Free(currBuffer)
458         return int(currBuffer.state)
459
460 }
461
462 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
463         var (
464                 currBuffer  *C.rmr_mbuf_t
465                 counterName string = "Transmitted"
466         )
467         txBuffer := m.CopyBuffer(params)
468         if txBuffer == nil {
469                 return C.RMR_ERR_INITFAILED, ""
470         }
471
472         txBuffer.state = 0
473
474         m.contextMux.Lock()
475         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
476         m.contextMux.Unlock()
477
478         if currBuffer == nil {
479                 m.UpdateStatCounter("TransmitError")
480                 return m.LogMBufError("SendBuf failed", txBuffer), ""
481         }
482
483         if currBuffer.state != C.RMR_OK {
484                 counterName = "TransmitError"
485                 m.LogMBufError("SendBuf failed", currBuffer)
486         }
487
488         m.UpdateStatCounter(counterName)
489         defer m.Free(currBuffer)
490
491         cptr := unsafe.Pointer(currBuffer.payload)
492         payload := C.GoBytes(cptr, C.int(currBuffer.len))
493
494         return int(currBuffer.state), string(payload)
495 }
496
497 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
498         return m.Wh_open(target)
499 }
500
501 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
502         m.contextMux.Lock()
503         defer m.contextMux.Unlock()
504         endpoint := C.CString(target)
505         return C.rmr_wh_open(m.context, endpoint)
506 }
507
508 func (m *RMRClient) Closewh(whid int) {
509         m.Wh_close(C.rmr_whid_t(whid))
510 }
511
512 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
513         m.contextMux.Lock()
514         defer m.contextMux.Unlock()
515         C.rmr_wh_close(m.context, whid)
516 }
517
518 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
519         if params.status == int(C.RMR_ERR_RETRY) {
520                 return true
521         }
522         return false
523 }
524
525 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
526         if params.status == int(C.RMR_ERR_NOENDPT) {
527                 return true
528         }
529         return false
530 }
531
532 func (m *RMRClient) UpdateStatCounter(name string) {
533         m.mux.Lock()
534         m.stat[name].Inc()
535         m.mux.Unlock()
536 }
537
538 func (m *RMRClient) RegisterMetrics() {
539         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
540 }
541
542 func (m *RMRClient) Wait() {
543         m.wg.Wait()
544 }
545
546 func (m *RMRClient) IsReady() bool {
547         return m.ready != 0
548 }
549
550 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
551         m.readyCb = cb
552         m.readyCbParams = params
553 }
554
555 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
556         id, ok := RICMessageTypes[name]
557         return id, ok
558 }
559
560 func (m *RMRClient) GetRicMessageName(id int) (s string) {
561         for k, v := range RICMessageTypes {
562                 if id == v {
563                         return k
564                 }
565         }
566         return
567 }
568
569 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
570         if mbuf != nil {
571                 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
572                 return int(mbuf.state)
573         }
574         Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
575         return 0
576 }