9f988dcf1d396acc714a7d13890dd2c6729f0aa7
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "github.com/spf13/viper"
71         "strings"
72         "time"
73         "unsafe"
74 )
75
76 var RMRCounterOpts = []CounterOpts{
77         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78         {Name: "Received", Help: "The total number of received RMR messages"},
79         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
81 }
82
83 var RMRErrors = map[int]string{
84         C.RMR_OK:             "state is good",
85         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
86         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
87         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
88         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
89         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90         C.RMR_ERR_CALLFAILED: "unable to send call() message",
91         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
92         C.RMR_ERR_WHID:       "wormhole id was invalid",
93         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
94         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
96         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
97         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
98         C.RMR_ERR_TRUNC:      "received message likely truncated",
99         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
101 }
102
103 //-----------------------------------------------------------------------------
104 //
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
107         Mtype      int
108         Payload    []byte
109         PayloadLen int
110         Meid       *RMRMeid
111         Xid        string
112         SubId      int
113         Src        string
114         Mbuf       *C.rmr_mbuf_t
115         Whid       int
116         Callid     int
117         Timeout    int
118         status     int
119 }
120
121 func (params *RMRParams) String() string {
122         var b bytes.Buffer
123         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
124         return b.String()
125 }
126
127 //-----------------------------------------------------------------------------
128 //
129 //-----------------------------------------------------------------------------
130 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
131         p := C.CString(protPort)
132         m := C.int(maxSize)
133         c := C.int(threadType)
134         defer C.free(unsafe.Pointer(p))
135         ctx := C.rmr_init(p, m, c)
136         if ctx == nil {
137                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
138         }
139         return &RMRClient{
140                 protPort:  protPort,
141                 context:   ctx,
142                 consumers: make([]MessageConsumer, 0),
143                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
144         }
145 }
146
147 func NewRMRClient() *RMRClient {
148         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
149 }
150
151 func (m *RMRClient) Start(c MessageConsumer) {
152         if c != nil {
153                 m.consumers = append(m.consumers, c)
154         }
155
156         var counter int = 0
157         for {
158                 m.contextMux.Lock()
159                 m.ready = int(C.rmr_ready(m.context))
160                 m.contextMux.Unlock()
161                 if m.ready == 1 {
162                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
163                         break
164                 }
165                 if counter%10 == 0 {
166                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
167                 }
168                 time.Sleep(1 * time.Second)
169                 counter++
170         }
171         m.wg.Add(1)
172
173         if m.readyCb != nil {
174                 go m.readyCb(m.readyCbParams)
175         }
176
177         go func() {
178                 m.contextMux.Lock()
179                 rfd := C.rmr_get_rcvfd(m.context)
180                 m.contextMux.Unlock()
181                 efd := C.init_epoll(rfd)
182
183                 defer m.wg.Done()
184                 for {
185                         if int(C.wait_epoll(efd, rfd)) == 0 {
186                                 continue
187                         }
188                         m.contextMux.Lock()
189                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
190                         m.contextMux.Unlock()
191
192                         if rxBuffer == nil {
193                                 m.LogMBufError("RecvMsg failed", rxBuffer)
194                                 m.UpdateStatCounter("ReceiveError")
195                                 continue
196                         }
197                         m.UpdateStatCounter("Received")
198                         m.parseMessage(rxBuffer)
199                 }
200         }()
201
202         m.wg.Wait()
203 }
204
205 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
206         if len(m.consumers) == 0 {
207                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
208                 return
209         }
210
211         params := &RMRParams{}
212         params.Mbuf = rxBuffer
213         params.Mtype = int(rxBuffer.mtype)
214         params.SubId = int(rxBuffer.sub_id)
215         params.Meid = &RMRMeid{}
216
217         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
218         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
219                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
220         }
221
222         xidBuf := make([]byte, int(C.RMR_MAX_XID))
223         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
224                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
225         }
226
227         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
228         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
229                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
230         }
231
232         // Default case: a single consumer
233         if len(m.consumers) == 1 && m.consumers[0] != nil {
234                 params.PayloadLen = int(rxBuffer.len)
235                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
236                 err := m.consumers[0].Consume(params)
237                 if err != nil {
238                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
239                 }
240                 return
241         }
242
243         // Special case for multiple consumers
244         for _, c := range m.consumers {
245                 cptr := unsafe.Pointer(rxBuffer.payload)
246                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
247                 params.PayloadLen = int(rxBuffer.len)
248                 params.Mtype = int(rxBuffer.mtype)
249                 params.SubId = int(rxBuffer.sub_id)
250
251                 err := c.Consume(params)
252                 if err != nil {
253                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
254                 }
255         }
256 }
257
258 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
259         m.contextMux.Lock()
260         defer m.contextMux.Unlock()
261         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
262         if outbuf == nil {
263                 Logger.Error("rmrClient: Allocating message buffer failed!")
264         }
265         return outbuf
266 }
267
268 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
269         m.contextMux.Lock()
270         defer m.contextMux.Unlock()
271         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
272         if outbuf == nil {
273                 Logger.Error("rmrClient: Allocating message buffer failed!")
274         }
275         return outbuf
276 }
277
278 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
279         if mbuf == nil {
280                 return
281         }
282         m.contextMux.Lock()
283         defer m.contextMux.Unlock()
284         C.rmr_free_msg(mbuf)
285 }
286
287 func (m *RMRClient) SendMsg(params *RMRParams) bool {
288         return m.Send(params, false)
289 }
290
291 func (m *RMRClient) SendRts(params *RMRParams) bool {
292         return m.Send(params, true)
293 }
294
295 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
296         status := m.Send(params, isRts)
297         i := 0
298         for ; i < int(to)*2 && status == false; i++ {
299                 status = m.Send(params, isRts)
300                 if status == false {
301                         time.Sleep(500 * time.Millisecond)
302                 }
303         }
304         if status == false {
305                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
306                 if params.Mbuf != nil {
307                         m.Free(params.Mbuf)
308                         params.Mbuf = nil
309                 }
310         }
311         return
312 }
313
314 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
315
316         payLen := len(params.Payload)
317         if params.PayloadLen != 0 {
318                 payLen = params.PayloadLen
319         }
320
321         txBuffer := params.Mbuf
322         params.Mbuf = nil
323
324         if txBuffer != nil {
325                 txBuffer = m.ReAllocate(txBuffer, payLen)
326         } else {
327                 txBuffer = m.Allocate(payLen)
328         }
329
330         if txBuffer == nil {
331                 return nil
332         }
333         txBuffer.mtype = C.int(params.Mtype)
334         txBuffer.sub_id = C.int(params.SubId)
335         txBuffer.len = C.int(payLen)
336
337         datap := C.CBytes(params.Payload)
338         defer C.free(datap)
339
340         if params != nil {
341                 if params.Meid != nil {
342                         b := make([]byte, int(C.RMR_MAX_MEID))
343                         copy(b, []byte(params.Meid.RanName))
344                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
345                 }
346                 xidLen := len(params.Xid)
347                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
348                         b := make([]byte, int(C.RMR_MAX_XID))
349                         copy(b, []byte(params.Xid))
350                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
351                 }
352         }
353         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
354         return txBuffer
355 }
356
357 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
358
359         txBuffer := m.CopyBuffer(params)
360         if txBuffer == nil {
361                 return false
362         }
363         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
364         if params.status == int(C.RMR_OK) {
365                 return true
366         }
367         return false
368 }
369
370 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
371         var (
372                 currBuffer  *C.rmr_mbuf_t
373                 counterName string = "Transmitted"
374         )
375
376         m.contextMux.Lock()
377         txBuffer.state = 0
378         if whid != 0 {
379                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
380         } else {
381                 if isRts {
382                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
383                 } else {
384                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
385                 }
386         }
387         m.contextMux.Unlock()
388
389         if currBuffer == nil {
390                 m.UpdateStatCounter("TransmitError")
391                 return m.LogMBufError("SendBuf failed", txBuffer)
392         }
393
394         // Just quick retry seems to help for K8s issue
395         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
396         if maxRetryOnFailure == 0 {
397                 maxRetryOnFailure = 5
398         }
399
400         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
401                 m.contextMux.Lock()
402                 if whid != 0 {
403                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
404                 } else {
405                         if isRts {
406                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
407                         } else {
408                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
409                         }
410                 }
411                 m.contextMux.Unlock()
412         }
413
414         if currBuffer.state != C.RMR_OK {
415                 counterName = "TransmitError"
416                 m.LogMBufError("SendBuf failed", currBuffer)
417         }
418
419         m.UpdateStatCounter(counterName)
420         defer m.Free(currBuffer)
421
422         return int(currBuffer.state)
423 }
424
425 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
426         var (
427                 currBuffer  *C.rmr_mbuf_t
428                 counterName string = "Transmitted"
429         )
430         txBuffer := m.CopyBuffer(params)
431         if txBuffer == nil {
432                 return C.RMR_ERR_INITFAILED, ""
433         }
434
435         txBuffer.state = 0
436
437         m.contextMux.Lock()
438         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
439         m.contextMux.Unlock()
440
441         if currBuffer == nil {
442                 m.UpdateStatCounter("TransmitError")
443                 return m.LogMBufError("SendBuf failed", txBuffer), ""
444         }
445
446         if currBuffer.state != C.RMR_OK {
447                 counterName = "TransmitError"
448                 m.LogMBufError("SendBuf failed", currBuffer)
449         }
450
451         m.UpdateStatCounter(counterName)
452         defer m.Free(currBuffer)
453
454         cptr := unsafe.Pointer(currBuffer.payload)
455         payload := C.GoBytes(cptr, C.int(currBuffer.len))
456
457         return int(currBuffer.state), string(payload)
458 }
459
460 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
461         return m.Wh_open(target)
462 }
463
464 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
465         m.contextMux.Lock()
466         defer m.contextMux.Unlock()
467         endpoint := C.CString(target)
468         return C.rmr_wh_open(m.context, endpoint)
469 }
470
471 func (m *RMRClient) Closewh(whid int) {
472         m.Wh_close(C.rmr_whid_t(whid))
473 }
474
475 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
476         m.contextMux.Lock()
477         defer m.contextMux.Unlock()
478         C.rmr_wh_close(m.context, whid)
479 }
480
481 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
482         if params.status == int(C.RMR_ERR_RETRY) {
483                 return true
484         }
485         return false
486 }
487
488 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
489         if params.status == int(C.RMR_ERR_NOENDPT) {
490                 return true
491         }
492         return false
493 }
494
495 func (m *RMRClient) UpdateStatCounter(name string) {
496         m.mux.Lock()
497         m.stat[name].Inc()
498         m.mux.Unlock()
499 }
500
501 func (m *RMRClient) RegisterMetrics() {
502         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
503 }
504
505 func (m *RMRClient) Wait() {
506         m.wg.Wait()
507 }
508
509 func (m *RMRClient) IsReady() bool {
510         return m.ready != 0
511 }
512
513 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
514         m.readyCb = cb
515         m.readyCbParams = params
516 }
517
518 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
519         id, ok := RICMessageTypes[name]
520         return id, ok
521 }
522
523 func (m *RMRClient) GetRicMessageName(id int) (s string) {
524         for k, v := range RICMessageTypes {
525                 if id == v {
526                         return k
527                 }
528         }
529         return
530 }
531
532 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
533         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
534         return int(mbuf.state)
535 }
536
537 // To be removed ...
538 func (m *RMRClient) GetStat() (r RMRStatistics) {
539         return
540 }