Fixed RmrClientParam field visibility
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <unistd.h>
29 #include <rmr/rmr.h>
30 #include <rmr/RIC_message_types.h>
31
32 void write_bytes_array(unsigned char *dst, void *data, int len) {
33     memcpy((void *)dst, (void *)data, len);
34 }
35
36 int init_epoll(int rcv_fd) {
37         struct  epoll_event epe;
38         int epoll_fd = epoll_create1( 0 );
39         epe.events = EPOLLIN;
40         epe.data.fd = rcv_fd;
41         epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42         return epoll_fd;
43 }
44
45 void close_epoll(int epoll_fd) {
46         if(epoll_fd >= 0) {
47                 close(epoll_fd);
48         }
49 }
50
51 int wait_epoll(int epoll_fd,int rcv_fd) {
52         struct  epoll_event events[1];
53         if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54                 if( events[0].data.fd == rcv_fd ) {
55                         return 1;
56                 }
57         }
58         return 0;
59 }
60
61 #cgo CFLAGS: -I../
62 #cgo LDFLAGS: -lrmr_si
63 */
64 import "C"
65
66 import (
67         "bytes"
68         "crypto/md5"
69         "fmt"
70         "github.com/spf13/viper"
71         "strings"
72         "time"
73         "unsafe"
74 )
75
76 var RMRCounterOpts = []CounterOpts{
77         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
78         {Name: "Received", Help: "The total number of received RMR messages"},
79         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
80         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
81 }
82
83 var RMRErrors = map[int]string{
84         C.RMR_OK:             "state is good",
85         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
86         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
87         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
88         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
89         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
90         C.RMR_ERR_CALLFAILED: "unable to send call() message",
91         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
92         C.RMR_ERR_WHID:       "wormhole id was invalid",
93         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
94         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
95         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
96         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
97         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
98         C.RMR_ERR_TRUNC:      "received message likely truncated",
99         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
100         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
101 }
102
103 //-----------------------------------------------------------------------------
104 //
105 //-----------------------------------------------------------------------------
106 type RMRParams struct {
107         Mtype      int
108         Payload    []byte
109         PayloadLen int
110         Meid       *RMRMeid
111         Xid        string
112         SubId      int
113         Src        string
114         Mbuf       *C.rmr_mbuf_t
115         Whid       int
116         Callid     int
117         Timeout    int
118         status     int
119 }
120
121 func (params *RMRParams) String() string {
122         var b bytes.Buffer
123         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
124         return b.String()
125 }
126
127 //-----------------------------------------------------------------------------
128 //
129 //-----------------------------------------------------------------------------
130 type RMRClientParams struct {
131         ProtPort   string
132         MaxSize    int
133         ThreadType int
134         StatDesc   string
135         LowLatency bool
136         FastAck    bool
137 }
138
139 func (params *RMRClientParams) String() string {
140         return fmt.Sprintf("ProtPort=%s MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
141                 params.ProtPort, params.MaxSize, params.ThreadType, params.StatDesc, params.LowLatency, params.FastAck)
142 }
143
144 //-----------------------------------------------------------------------------
145 //
146 //-----------------------------------------------------------------------------
147 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
148         p := C.CString(params.ProtPort)
149         m := C.int(params.MaxSize)
150         c := C.int(params.ThreadType)
151         defer C.free(unsafe.Pointer(p))
152         ctx := C.rmr_init(p, m, c)
153         if ctx == nil {
154                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
155         }
156
157         Logger.Info("new rmrClient with parameters: %s", params.String())
158
159         if params.LowLatency {
160                 C.rmr_set_low_latency(ctx)
161         }
162         if params.FastAck {
163                 C.rmr_set_fack(ctx)
164         }
165
166         return &RMRClient{
167                 protPort:  params.ProtPort,
168                 context:   ctx,
169                 consumers: make([]MessageConsumer, 0),
170                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
171         }
172 }
173
174 func NewRMRClient() *RMRClient {
175         return NewRMRClientWithParams(
176                 &RMRClientParams{
177                         ProtPort:   viper.GetString("rmr.protPort"),
178                         MaxSize:    viper.GetInt("rmr.maxSize"),
179                         ThreadType: viper.GetInt("rmr.threadType"),
180                         StatDesc:   "RMR",
181                         LowLatency: viper.GetBool("rmr.lowLatency"),
182                         FastAck:    viper.GetBool("rmr.fastAck"),
183                 })
184 }
185
186 func (m *RMRClient) Start(c MessageConsumer) {
187         if c != nil {
188                 m.consumers = append(m.consumers, c)
189         }
190
191         var counter int = 0
192         for {
193                 m.contextMux.Lock()
194                 m.ready = int(C.rmr_ready(m.context))
195                 m.contextMux.Unlock()
196                 if m.ready == 1 {
197                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
198                         break
199                 }
200                 if counter%10 == 0 {
201                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
202                 }
203                 time.Sleep(1 * time.Second)
204                 counter++
205         }
206         m.wg.Add(1)
207
208         if m.readyCb != nil {
209                 go m.readyCb(m.readyCbParams)
210         }
211
212         go func() {
213                 m.contextMux.Lock()
214                 rfd := C.rmr_get_rcvfd(m.context)
215                 m.contextMux.Unlock()
216                 efd := C.init_epoll(rfd)
217
218                 defer m.wg.Done()
219                 for {
220                         if int(C.wait_epoll(efd, rfd)) == 0 {
221                                 continue
222                         }
223                         m.contextMux.Lock()
224                         rxBuffer := C.rmr_rcv_msg(m.context, nil)
225                         m.contextMux.Unlock()
226
227                         if rxBuffer == nil {
228                                 m.LogMBufError("RecvMsg failed", rxBuffer)
229                                 m.UpdateStatCounter("ReceiveError")
230                                 continue
231                         }
232                         m.UpdateStatCounter("Received")
233                         m.parseMessage(rxBuffer)
234                 }
235         }()
236
237         m.wg.Wait()
238 }
239
240 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
241         if len(m.consumers) == 0 {
242                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
243                 return
244         }
245
246         params := &RMRParams{}
247         params.Mbuf = rxBuffer
248         params.Mtype = int(rxBuffer.mtype)
249         params.SubId = int(rxBuffer.sub_id)
250         params.Meid = &RMRMeid{}
251
252         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
253         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
254                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
255         }
256
257         xidBuf := make([]byte, int(C.RMR_MAX_XID))
258         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
259                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
260         }
261
262         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
263         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
264                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
265         }
266
267         // Default case: a single consumer
268         if len(m.consumers) == 1 && m.consumers[0] != nil {
269                 params.PayloadLen = int(rxBuffer.len)
270                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
271                 err := m.consumers[0].Consume(params)
272                 if err != nil {
273                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
274                 }
275                 return
276         }
277
278         // Special case for multiple consumers
279         for _, c := range m.consumers {
280                 cptr := unsafe.Pointer(rxBuffer.payload)
281                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
282                 params.PayloadLen = int(rxBuffer.len)
283                 params.Mtype = int(rxBuffer.mtype)
284                 params.SubId = int(rxBuffer.sub_id)
285
286                 err := c.Consume(params)
287                 if err != nil {
288                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
289                 }
290         }
291 }
292
293 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
294         m.contextMux.Lock()
295         defer m.contextMux.Unlock()
296         outbuf := C.rmr_alloc_msg(m.context, C.int(size))
297         if outbuf == nil {
298                 Logger.Error("rmrClient: Allocating message buffer failed!")
299         }
300         return outbuf
301 }
302
303 func (m *RMRClient) ReAllocate(inbuf *C.rmr_mbuf_t, size int) *C.rmr_mbuf_t {
304         m.contextMux.Lock()
305         defer m.contextMux.Unlock()
306         outbuf := C.rmr_realloc_msg(inbuf, C.int(size))
307         if outbuf == nil {
308                 Logger.Error("rmrClient: Allocating message buffer failed!")
309         }
310         return outbuf
311 }
312
313 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
314         if mbuf == nil {
315                 return
316         }
317         m.contextMux.Lock()
318         defer m.contextMux.Unlock()
319         C.rmr_free_msg(mbuf)
320 }
321
322 func (m *RMRClient) SendMsg(params *RMRParams) bool {
323         return m.Send(params, false)
324 }
325
326 func (m *RMRClient) SendRts(params *RMRParams) bool {
327         return m.Send(params, true)
328 }
329
330 func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
331         status := m.Send(params, isRts)
332         i := 0
333         for ; i < int(to)*2 && status == false; i++ {
334                 status = m.Send(params, isRts)
335                 if status == false {
336                         time.Sleep(500 * time.Millisecond)
337                 }
338         }
339         if status == false {
340                 err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
341                 if params.Mbuf != nil {
342                         m.Free(params.Mbuf)
343                         params.Mbuf = nil
344                 }
345         }
346         return
347 }
348
349 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
350
351         if params == nil {
352                 return nil
353         }
354
355         payLen := len(params.Payload)
356         if params.PayloadLen != 0 {
357                 payLen = params.PayloadLen
358         }
359
360         txBuffer := params.Mbuf
361         params.Mbuf = nil
362
363         if txBuffer != nil {
364                 txBuffer = m.ReAllocate(txBuffer, payLen)
365         } else {
366                 txBuffer = m.Allocate(payLen)
367         }
368
369         if txBuffer == nil {
370                 return nil
371         }
372         txBuffer.mtype = C.int(params.Mtype)
373         txBuffer.sub_id = C.int(params.SubId)
374         txBuffer.len = C.int(payLen)
375
376         datap := C.CBytes(params.Payload)
377         defer C.free(datap)
378
379         if params.Meid != nil {
380                 b := make([]byte, int(C.RMR_MAX_MEID))
381                 copy(b, []byte(params.Meid.RanName))
382                 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
383         }
384
385         xidLen := len(params.Xid)
386         if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
387                 b := make([]byte, int(C.RMR_MAX_XID))
388                 copy(b, []byte(params.Xid))
389                 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
390         }
391
392         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
393
394         return txBuffer
395 }
396
397 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
398
399         txBuffer := m.CopyBuffer(params)
400         if txBuffer == nil {
401                 return false
402         }
403         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
404         if params.status == int(C.RMR_OK) {
405                 return true
406         }
407         return false
408 }
409
410 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
411         var (
412                 currBuffer  *C.rmr_mbuf_t
413                 counterName string = "Transmitted"
414         )
415
416         m.contextMux.Lock()
417         txBuffer.state = 0
418         if whid != 0 {
419                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
420         } else {
421                 if isRts {
422                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
423                 } else {
424                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
425                 }
426         }
427         m.contextMux.Unlock()
428
429         if currBuffer == nil {
430                 m.UpdateStatCounter("TransmitError")
431                 return m.LogMBufError("SendBuf failed", txBuffer)
432         }
433
434         // Just quick retry seems to help for K8s issue
435         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
436         if maxRetryOnFailure == 0 {
437                 maxRetryOnFailure = 5
438         }
439
440         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
441                 m.contextMux.Lock()
442                 if whid != 0 {
443                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
444                 } else {
445                         if isRts {
446                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
447                         } else {
448                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
449                         }
450                 }
451                 m.contextMux.Unlock()
452         }
453
454         if currBuffer.state != C.RMR_OK {
455                 counterName = "TransmitError"
456                 m.LogMBufError("SendBuf failed", currBuffer)
457         }
458
459         m.UpdateStatCounter(counterName)
460         defer m.Free(currBuffer)
461
462         return int(currBuffer.state)
463 }
464
465 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
466         var (
467                 currBuffer  *C.rmr_mbuf_t
468                 counterName string = "Transmitted"
469         )
470         txBuffer := m.CopyBuffer(params)
471         if txBuffer == nil {
472                 return C.RMR_ERR_INITFAILED, ""
473         }
474
475         txBuffer.state = 0
476
477         m.contextMux.Lock()
478         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
479         m.contextMux.Unlock()
480
481         if currBuffer == nil {
482                 m.UpdateStatCounter("TransmitError")
483                 return m.LogMBufError("SendBuf failed", txBuffer), ""
484         }
485
486         if currBuffer.state != C.RMR_OK {
487                 counterName = "TransmitError"
488                 m.LogMBufError("SendBuf failed", currBuffer)
489         }
490
491         m.UpdateStatCounter(counterName)
492         defer m.Free(currBuffer)
493
494         cptr := unsafe.Pointer(currBuffer.payload)
495         payload := C.GoBytes(cptr, C.int(currBuffer.len))
496
497         return int(currBuffer.state), string(payload)
498 }
499
500 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
501         return m.Wh_open(target)
502 }
503
504 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
505         m.contextMux.Lock()
506         defer m.contextMux.Unlock()
507         endpoint := C.CString(target)
508         return C.rmr_wh_open(m.context, endpoint)
509 }
510
511 func (m *RMRClient) Closewh(whid int) {
512         m.Wh_close(C.rmr_whid_t(whid))
513 }
514
515 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
516         m.contextMux.Lock()
517         defer m.contextMux.Unlock()
518         C.rmr_wh_close(m.context, whid)
519 }
520
521 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
522         if params.status == int(C.RMR_ERR_RETRY) {
523                 return true
524         }
525         return false
526 }
527
528 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
529         if params.status == int(C.RMR_ERR_NOENDPT) {
530                 return true
531         }
532         return false
533 }
534
535 func (m *RMRClient) UpdateStatCounter(name string) {
536         m.mux.Lock()
537         m.stat[name].Inc()
538         m.mux.Unlock()
539 }
540
541 func (m *RMRClient) RegisterMetrics() {
542         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
543 }
544
545 func (m *RMRClient) Wait() {
546         m.wg.Wait()
547 }
548
549 func (m *RMRClient) IsReady() bool {
550         return m.ready != 0
551 }
552
553 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
554         m.readyCb = cb
555         m.readyCbParams = params
556 }
557
558 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
559         id, ok := RICMessageTypes[name]
560         return id, ok
561 }
562
563 func (m *RMRClient) GetRicMessageName(id int) (s string) {
564         for k, v := range RICMessageTypes {
565                 if id == v {
566                         return k
567                 }
568         }
569         return
570 }
571
572 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
573         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
574         return int(mbuf.state)
575 }
576
577 // To be removed ...
578 func (m *RMRClient) GetStat() (r RMRStatistics) {
579         return
580 }