e27d46e9bd4433cfe65278845ef7f59f7efbf49a
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "github.com/spf13/viper"
71         "strings"
72         "time"
73         "unsafe"
74 )
75
76 var RMRCounterOpts = []CounterOpts{
77         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78         {Name: "Received", Help: "The total number of received RMR messages"},
79         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
81 }
82
83 var RMRErrors = map[int]string{
84         C.RMR_OK:             "state is good",
85         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
86         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
87         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
88         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
89         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90         C.RMR_ERR_CALLFAILED: "unable to send call() message",
91         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
92         C.RMR_ERR_WHID:       "wormhole id was invalid",
93         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
94         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
96         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
97         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
98         C.RMR_ERR_TRUNC:      "received message likely truncated",
99         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
101 }
102
103 //-----------------------------------------------------------------------------
104 //
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
107         Mtype      int
108         Payload    []byte
109         PayloadLen int
110         Meid       *RMRMeid
111         Xid        string
112         SubId      int
113         Src        string
114         Mbuf       *C.rmr_mbuf_t
115         Whid       int
116         Callid     int
117         Timeout    int
118         status     int
119 }
120
121 func (params *RMRParams) String() string {
122         var b bytes.Buffer
123         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
124         return b.String()
125 }
126
127 //-----------------------------------------------------------------------------
128 //
129 //-----------------------------------------------------------------------------
130 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
131         p := C.CString(protPort)
132         m := C.int(maxSize)
133         c := C.int(threadType)
134         defer C.free(unsafe.Pointer(p))
135         ctx := C.rmr_init(p, m, c)
136         if ctx == nil {
137                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
138         }
139         return &RMRClient{
140                 protPort:  protPort,
141                 context:   ctx,
142                 consumers: make([]MessageConsumer, 0),
143                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
144         }
145 }
146
147 func NewRMRClient() *RMRClient {
148         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
149 }
150
151 func (m *RMRClient) Start(c MessageConsumer) {
152         if c != nil {
153                 m.consumers = append(m.consumers, c)
154         }
155
156         var counter int = 0
157         for {
158                 m.contextMux.Lock()
159                 m.ready = int(C.rmr_ready(m.context))
160                 m.contextMux.Unlock()
161                 if m.ready == 1 {
162                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
163                         break
164                 }
165                 if counter%10 == 0 {
166                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
167                 }
168                 time.Sleep(1 * time.Second)
169                 counter++
170         }
171         m.wg.Add(1)
172
173         if m.readyCb != nil {
174                 go m.readyCb(m.readyCbParams)
175         }
176
177         go func() {
178                 m.contextMux.Lock()
179                 rfd := C.rmr_get_rcvfd(m.context)
180                 m.contextMux.Unlock()
181                 efd := C.init_epoll(rfd)
182
183                 defer m.wg.Done()
184                 for {
185                         if int(C.wait_epoll(efd, rfd)) == 0 {
186                                 continue
187                         }
188                         m.contextMux.Lock()
189                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
190                         m.contextMux.Unlock()
191
192                         if rxBuffer == nil {
193                                 m.LogMBufError("RecvMsg failed", rxBuffer)
194                                 m.UpdateStatCounter("ReceiveError")
195                                 continue
196                         }
197                         m.UpdateStatCounter("Received")
198                         m.parseMessage(rxBuffer)
199                 }
200         }()
201
202         m.wg.Wait()
203 }
204
205 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
206         if len(m.consumers) == 0 {
207                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
208                 return
209         }
210
211         params := &RMRParams{}
212         params.Mbuf = rxBuffer
213         params.Mtype = int(rxBuffer.mtype)
214         params.SubId = int(rxBuffer.sub_id)
215         params.Meid = &RMRMeid{}
216
217         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
218         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
219                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
220         }
221
222         xidBuf := make([]byte, int(C.RMR_MAX_XID))
223         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
224                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
225         }
226
227         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
228         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
229                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
230         }
231
232         // Default case: a single consumer
233         if len(m.consumers) == 1 && m.consumers[0] != nil {
234                 params.PayloadLen = int(rxBuffer.len)
235                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
236                 err := m.consumers[0].Consume(params)
237                 if err != nil {
238                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
239                 }
240                 return
241         }
242
243         // Special case for multiple consumers
244         for _, c := range m.consumers {
245                 cptr := unsafe.Pointer(rxBuffer.payload)
246                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
247                 params.PayloadLen = int(rxBuffer.len)
248                 params.Mtype = int(rxBuffer.mtype)
249                 params.SubId = int(rxBuffer.sub_id)
250
251                 err := c.Consume(params)
252                 if err != nil {
253                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
254                 }
255         }
256 }
257
258 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
259         m.contextMux.Lock()
260         defer m.contextMux.Unlock()
261         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
262         if outbuf == nil {
263                 Logger.Error("rmrClient: Allocating message buffer failed!")
264         }
265         return outbuf
266 }
267
268 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
269         m.contextMux.Lock()
270         defer m.contextMux.Unlock()
271         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
272         if outbuf == nil {
273                 Logger.Error("rmrClient: Allocating message buffer failed!")
274         }
275         return outbuf
276 }
277
278 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
279         if mbuf == nil {
280                 return
281         }
282         m.contextMux.Lock()
283         defer m.contextMux.Unlock()
284         C.rmr_free_msg(mbuf)
285 }
286
287 func (m *RMRClient) SendMsg(params *RMRParams) bool {
288         return m.Send(params, false)
289 }
290
291 func (m *RMRClient) SendRts(params *RMRParams) bool {
292         return m.Send(params, true)
293 }
294
295 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
296         status := m.Send(params, isRts)
297         i := 0
298         for ; i < int(to)*2 && status == false; i++ {
299                 status = m.Send(params, isRts)
300                 if status == false {
301                         time.Sleep(500 * time.Millisecond)
302                 }
303         }
304         if status == false {
305                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
306                 if params.Mbuf != nil {
307                         m.Free(params.Mbuf)
308                         params.Mbuf = nil
309                 }
310         }
311         return
312 }
313
314 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
315
316         if params == nil {
317                 return nil
318         }
319
320         payLen := len(params.Payload)
321         if params.PayloadLen != 0 {
322                 payLen = params.PayloadLen
323         }
324
325         txBuffer := params.Mbuf
326         params.Mbuf = nil
327
328         if txBuffer != nil {
329                 txBuffer = m.ReAllocate(txBuffer, payLen)
330         } else {
331                 txBuffer = m.Allocate(payLen)
332         }
333
334         if txBuffer == nil {
335                 return nil
336         }
337         txBuffer.mtype = C.int(params.Mtype)
338         txBuffer.sub_id = C.int(params.SubId)
339         txBuffer.len = C.int(payLen)
340
341         datap := C.CBytes(params.Payload)
342         defer C.free(datap)
343
344         if params.Meid != nil {
345                 b := make([]byte, int(C.RMR_MAX_MEID))
346                 copy(b, []byte(params.Meid.RanName))
347                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
348         }
349
350         xidLen := len(params.Xid)
351         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
352                 b := make([]byte, int(C.RMR_MAX_XID))
353                 copy(b, []byte(params.Xid))
354                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
355         }
356
357         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
358
359         return txBuffer
360 }
361
362 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
363
364         txBuffer := m.CopyBuffer(params)
365         if txBuffer == nil {
366                 return false
367         }
368         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
369         if params.status == int(C.RMR_OK) {
370                 return true
371         }
372         return false
373 }
374
375 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
376         var (
377                 currBuffer  *C.rmr_mbuf_t
378                 counterName string = "Transmitted"
379         )
380
381         m.contextMux.Lock()
382         txBuffer.state = 0
383         if whid != 0 {
384                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
385         } else {
386                 if isRts {
387                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
388                 } else {
389                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
390                 }
391         }
392         m.contextMux.Unlock()
393
394         if currBuffer == nil {
395                 m.UpdateStatCounter("TransmitError")
396                 return m.LogMBufError("SendBuf failed", txBuffer)
397         }
398
399         // Just quick retry seems to help for K8s issue
400         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
401         if maxRetryOnFailure == 0 {
402                 maxRetryOnFailure = 5
403         }
404
405         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
406                 m.contextMux.Lock()
407                 if whid != 0 {
408                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
409                 } else {
410                         if isRts {
411                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
412                         } else {
413                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
414                         }
415                 }
416                 m.contextMux.Unlock()
417         }
418
419         if currBuffer.state != C.RMR_OK {
420                 counterName = "TransmitError"
421                 m.LogMBufError("SendBuf failed", currBuffer)
422         }
423
424         m.UpdateStatCounter(counterName)
425         defer m.Free(currBuffer)
426
427         return int(currBuffer.state)
428 }
429
430 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
431         var (
432                 currBuffer  *C.rmr_mbuf_t
433                 counterName string = "Transmitted"
434         )
435         txBuffer := m.CopyBuffer(params)
436         if txBuffer == nil {
437                 return C.RMR_ERR_INITFAILED, ""
438         }
439
440         txBuffer.state = 0
441
442         m.contextMux.Lock()
443         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
444         m.contextMux.Unlock()
445
446         if currBuffer == nil {
447                 m.UpdateStatCounter("TransmitError")
448                 return m.LogMBufError("SendBuf failed", txBuffer), ""
449         }
450
451         if currBuffer.state != C.RMR_OK {
452                 counterName = "TransmitError"
453                 m.LogMBufError("SendBuf failed", currBuffer)
454         }
455
456         m.UpdateStatCounter(counterName)
457         defer m.Free(currBuffer)
458
459         cptr := unsafe.Pointer(currBuffer.payload)
460         payload := C.GoBytes(cptr, C.int(currBuffer.len))
461
462         return int(currBuffer.state), string(payload)
463 }
464
465 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
466         return m.Wh_open(target)
467 }
468
469 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
470         m.contextMux.Lock()
471         defer m.contextMux.Unlock()
472         endpoint := C.CString(target)
473         return C.rmr_wh_open(m.context, endpoint)
474 }
475
476 func (m *RMRClient) Closewh(whid int) {
477         m.Wh_close(C.rmr_whid_t(whid))
478 }
479
480 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
481         m.contextMux.Lock()
482         defer m.contextMux.Unlock()
483         C.rmr_wh_close(m.context, whid)
484 }
485
486 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
487         if params.status == int(C.RMR_ERR_RETRY) {
488                 return true
489         }
490         return false
491 }
492
493 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
494         if params.status == int(C.RMR_ERR_NOENDPT) {
495                 return true
496         }
497         return false
498 }
499
500 func (m *RMRClient) UpdateStatCounter(name string) {
501         m.mux.Lock()
502         m.stat[name].Inc()
503         m.mux.Unlock()
504 }
505
506 func (m *RMRClient) RegisterMetrics() {
507         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
508 }
509
510 func (m *RMRClient) Wait() {
511         m.wg.Wait()
512 }
513
514 func (m *RMRClient) IsReady() bool {
515         return m.ready != 0
516 }
517
518 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
519         m.readyCb = cb
520         m.readyCbParams = params
521 }
522
523 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
524         id, ok := RICMessageTypes[name]
525         return id, ok
526 }
527
528 func (m *RMRClient) GetRicMessageName(id int) (s string) {
529         for k, v := range RICMessageTypes {
530                 if id == v {
531                         return k
532                 }
533         }
534         return
535 }
536
537 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
538         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
539         return int(mbuf.state)
540 }
541
542 // To be removed ...
543 func (m *RMRClient) GetStat() (r RMRStatistics) {
544         return
545 }