Added support for si95 and updated RMR version
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_si
36 */
37 import "C"
38
39 import (
40         "fmt"
41         "github.com/spf13/viper"
42         "strconv"
43         "strings"
44         "time"
45         "unsafe"
46 )
47
48 var RMRCounterOpts = []CounterOpts{
49         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50         {Name: "Received", Help: "The total number of received RMR messages"},
51         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53 }
54
55 var RMRErrors = map[int]string{
56         C.RMR_OK:             "state is good",
57         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
58         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
59         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
60         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
61         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62         C.RMR_ERR_CALLFAILED: "unable to send call() message",
63         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
64         C.RMR_ERR_WHID:       "wormhole id was invalid",
65         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
66         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
68         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
69         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
70         C.RMR_ERR_TRUNC:      "received message likely truncated",
71         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
73 }
74
75 type RMRParams struct {
76         Mtype      int
77         Payload    []byte
78         PayloadLen int
79         Meid       *RMRMeid
80         Xid        string
81         SubId      int
82         Src        string
83         Mbuf       *C.rmr_mbuf_t
84         Whid       int
85         status     int
86 }
87
88 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
89         p := C.CString(protPort)
90         m := C.int(maxSize)
91         c := C.int(threadType)
92         defer C.free(unsafe.Pointer(p))
93
94         //ctx := C.rmr_init(p, m, C.int(0))
95         //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
96         ctx := C.rmr_init(p, m, c)
97         if ctx == nil {
98                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
99         }
100
101         return &RMRClient{
102                 protPort:   protPort,
103                 numWorkers: numWorkers,
104                 context:    ctx,
105                 consumers:  make([]MessageConsumer, 0),
106                 stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
107         }
108 }
109
110 func NewRMRClient() *RMRClient {
111         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
112 }
113
114 func (m *RMRClient) Start(c MessageConsumer) {
115         if c != nil {
116                 m.consumers = append(m.consumers, c)
117         }
118
119         var counter int = 0
120         for {
121                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
122                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
123                         break
124                 }
125                 if counter%10 == 0 {
126                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
127                 }
128                 time.Sleep(1 * time.Second)
129                 counter++
130         }
131         m.wg.Add(m.numWorkers)
132
133         if m.readyCb != nil {
134                 go m.readyCb(m.readyCbParams)
135         }
136
137         for w := 0; w < m.numWorkers; w++ {
138                 go m.Worker("worker-"+strconv.Itoa(w), 0)
139         }
140         m.Wait()
141 }
142
143 func (m *RMRClient) Worker(taskName string, msgSize int) {
144         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
145
146         defer m.wg.Done()
147         for {
148                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
149                 if rxBuffer == nil {
150                         m.LogMBufError("RecvMsg failed", rxBuffer)
151                         m.UpdateStatCounter("ReceiveError")
152                         continue
153                 }
154                 m.UpdateStatCounter("Received")
155
156                 go m.parseMessage(rxBuffer)
157         }
158 }
159
160 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
161         if len(m.consumers) == 0 {
162                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
163                 return
164         }
165
166         params := &RMRParams{}
167         params.Mbuf = rxBuffer
168         params.Mtype = int(rxBuffer.mtype)
169         params.SubId = int(rxBuffer.sub_id)
170         params.Meid = &RMRMeid{}
171
172         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
173         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
174                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
175         }
176
177         xidBuf := make([]byte, int(C.RMR_MAX_XID))
178         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
179                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
180         }
181
182         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
183         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
184                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
185         }
186
187         // Default case: a single consumer
188         if len(m.consumers) == 1 && m.consumers[0] != nil {
189                 params.PayloadLen = int(rxBuffer.len)
190                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
191                 err := m.consumers[0].Consume(params)
192                 if err != nil {
193                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
194                 }
195                 return
196         }
197
198         // Special case for multiple consumers
199         for _, c := range m.consumers {
200                 cptr := unsafe.Pointer(rxBuffer.payload)
201                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
202                 params.PayloadLen = int(rxBuffer.len)
203                 params.Mtype = int(rxBuffer.mtype)
204                 params.SubId = int(rxBuffer.sub_id)
205
206                 err := c.Consume(params)
207                 if err != nil {
208                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
209                 }
210         }
211 }
212
213 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
214         buf := C.rmr_alloc_msg(m.context, 0)
215         if buf == nil {
216                 Logger.Error("rmrClient: Allocating message buffer failed!")
217         }
218         return buf
219 }
220
221 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
222         if mbuf == nil {
223                 return
224         }
225         C.rmr_free_msg(mbuf)
226 }
227
228 func (m *RMRClient) SendMsg(params *RMRParams) bool {
229         return m.Send(params, false)
230 }
231
232 func (m *RMRClient) SendRts(params *RMRParams) bool {
233         return m.Send(params, true)
234 }
235
236 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
237         txBuffer := params.Mbuf
238         if txBuffer == nil {
239                 txBuffer = m.Allocate()
240                 if txBuffer == nil {
241                         return false
242                 }
243         }
244
245         txBuffer.mtype = C.int(params.Mtype)
246         txBuffer.sub_id = C.int(params.SubId)
247         txBuffer.len = C.int(len(params.Payload))
248         if params.PayloadLen != 0 {
249                 txBuffer.len = C.int(params.PayloadLen)
250         }
251         datap := C.CBytes(params.Payload)
252         defer C.free(datap)
253
254         if params != nil {
255                 if params.Meid != nil {
256                         b := make([]byte, int(C.RMR_MAX_MEID))
257                         copy(b, []byte(params.Meid.RanName))
258                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
259                 }
260                 xidLen := len(params.Xid)
261                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
262                         b := make([]byte, int(C.RMR_MAX_XID))
263                         copy(b, []byte(params.Xid))
264                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
265                 }
266         }
267         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
268
269         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
270         if params.status == int(C.RMR_OK) {
271                 return true
272         }
273         return false
274 }
275
276 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
277         var (
278                 currBuffer  *C.rmr_mbuf_t
279                 counterName string = "Transmitted"
280         )
281
282         txBuffer.state = 0
283         if whid != 0 {
284                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
285         } else {
286                 if isRts {
287                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
288                 } else {
289                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
290                 }
291         }
292
293         if currBuffer == nil {
294                 m.UpdateStatCounter("TransmitError")
295                 return m.LogMBufError("SendBuf failed", txBuffer)
296         }
297
298         // Just quick retry seems to help for K8s issue
299         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
300         if maxRetryOnFailure == 0 {
301                 maxRetryOnFailure = 5
302         }
303
304         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
305                 if whid != 0 {
306                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
307                 } else {
308                         if isRts {
309                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
310                         } else {
311                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
312                         }
313                 }
314         }
315
316         if currBuffer.state != C.RMR_OK {
317                 counterName = "TransmitError"
318                 m.LogMBufError("SendBuf failed", currBuffer)
319         }
320
321         m.UpdateStatCounter(counterName)
322         defer m.Free(currBuffer)
323
324         return int(currBuffer.state)
325 }
326
327 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
328         return m.Wh_open(target)
329 }
330
331 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
332         endpoint := C.CString(target)
333         return C.rmr_wh_open(m.context, endpoint)
334 }
335
336 func (m *RMRClient) Closewh(whid int) {
337         m.Wh_close(C.rmr_whid_t(whid))
338 }
339
340 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
341         C.rmr_wh_close(m.context, whid)
342 }
343
344 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
345         if params.status == int(C.RMR_ERR_RETRY) {
346                 return true
347         }
348         return false
349 }
350
351 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
352         if params.status == int(C.RMR_ERR_NOENDPT) {
353                 return true
354         }
355         return false
356 }
357
358 func (m *RMRClient) UpdateStatCounter(name string) {
359         m.mux.Lock()
360         m.stat[name].Inc()
361         m.mux.Unlock()
362 }
363
364 func (m *RMRClient) RegisterMetrics() {
365         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
366 }
367
368 func (m *RMRClient) Wait() {
369         m.wg.Wait()
370 }
371
372 func (m *RMRClient) IsReady() bool {
373         return m.ready != 0
374 }
375
376 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
377         m.readyCb = cb
378         m.readyCbParams = params
379 }
380
381 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
382         id, ok := RICMessageTypes[name]
383         return id, ok
384 }
385
386 func (m *RMRClient) GetRicMessageName(id int) (s string) {
387         for k, v := range RICMessageTypes {
388                 if id == v {
389                         return k
390                 }
391         }
392         return
393 }
394
395 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
396         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
397         return int(mbuf.state)
398 }
399
400 // To be removed ...
401 func (m *RMRClient) GetStat() (r RMRStatistics) {
402         return
403 }