d310b3a0d0d3016a7ab95b4fc89b049b768b8eed
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_si
36 */
37 import "C"
38
39 import (
40         "fmt"
41         "github.com/spf13/viper"
42         "strconv"
43         "strings"
44         "time"
45         "unsafe"
46 )
47
48 var RMRCounterOpts = []CounterOpts{
49         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50         {Name: "Received", Help: "The total number of received RMR messages"},
51         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53 }
54
55 var RMRErrors = map[int]string{
56         C.RMR_OK:             "state is good",
57         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
58         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
59         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
60         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
61         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62         C.RMR_ERR_CALLFAILED: "unable to send call() message",
63         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
64         C.RMR_ERR_WHID:       "wormhole id was invalid",
65         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
66         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
68         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
69         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
70         C.RMR_ERR_TRUNC:      "received message likely truncated",
71         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
73 }
74
75 type RMRParams struct {
76         Mtype      int
77         Payload    []byte
78         PayloadLen int
79         Meid       *RMRMeid
80         Xid        string
81         SubId      int
82         Src        string
83         Mbuf       *C.rmr_mbuf_t
84         Whid       int
85         Callid     int
86         Timeout    int
87         status     int
88 }
89
90 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
91         p := C.CString(protPort)
92         m := C.int(maxSize)
93         c := C.int(threadType)
94         defer C.free(unsafe.Pointer(p))
95
96         //ctx := C.rmr_init(p, m, C.int(0))
97         //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
98         ctx := C.rmr_init(p, m, c)
99         if ctx == nil {
100                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
101         }
102
103         return &RMRClient{
104                 protPort:   protPort,
105                 numWorkers: numWorkers,
106                 context:    ctx,
107                 consumers:  make([]MessageConsumer, 0),
108                 stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
109         }
110 }
111
112 func NewRMRClient() *RMRClient {
113         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
114 }
115
116 func (m *RMRClient) Start(c MessageConsumer) {
117         if c != nil {
118                 m.consumers = append(m.consumers, c)
119         }
120
121         var counter int = 0
122         for {
123                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
124                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
125                         break
126                 }
127                 if counter%10 == 0 {
128                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
129                 }
130                 time.Sleep(1 * time.Second)
131                 counter++
132         }
133         m.wg.Add(m.numWorkers)
134
135         if m.readyCb != nil {
136                 go m.readyCb(m.readyCbParams)
137         }
138
139         for w := 0; w < m.numWorkers; w++ {
140                 go m.Worker("worker-"+strconv.Itoa(w), 0)
141         }
142         m.Wait()
143 }
144
145 func (m *RMRClient) Worker(taskName string, msgSize int) {
146         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
147
148         defer m.wg.Done()
149         for {
150                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
151                 if rxBuffer == nil {
152                         m.LogMBufError("RecvMsg failed", rxBuffer)
153                         m.UpdateStatCounter("ReceiveError")
154                         continue
155                 }
156                 m.UpdateStatCounter("Received")
157
158                 m.msgWg.Add(1)
159                 go m.parseMessage(rxBuffer)
160                 m.msgWg.Wait()
161         }
162 }
163
164 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
165         defer m.msgWg.Done()
166         if len(m.consumers) == 0 {
167                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
168                 return
169         }
170
171         params := &RMRParams{}
172         params.Mbuf = rxBuffer
173         params.Mtype = int(rxBuffer.mtype)
174         params.SubId = int(rxBuffer.sub_id)
175         params.Meid = &RMRMeid{}
176
177         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
178         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
179                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
180         }
181
182         xidBuf := make([]byte, int(C.RMR_MAX_XID))
183         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
184                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
185         }
186
187         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
188         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
189                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
190         }
191
192         // Default case: a single consumer
193         if len(m.consumers) == 1 && m.consumers[0] != nil {
194                 params.PayloadLen = int(rxBuffer.len)
195                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
196                 err := m.consumers[0].Consume(params)
197                 if err != nil {
198                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
199                 }
200                 return
201         }
202
203         // Special case for multiple consumers
204         for _, c := range m.consumers {
205                 cptr := unsafe.Pointer(rxBuffer.payload)
206                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
207                 params.PayloadLen = int(rxBuffer.len)
208                 params.Mtype = int(rxBuffer.mtype)
209                 params.SubId = int(rxBuffer.sub_id)
210
211                 err := c.Consume(params)
212                 if err != nil {
213                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
214                 }
215         }
216 }
217
218 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
219         buf := C.rmr_alloc_msg(m.context, C.int(size))
220         if buf == nil {
221                 Logger.Error("rmrClient: Allocating message buffer failed!")
222         }
223         return buf
224 }
225
226 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
227         if mbuf == nil {
228                 return
229         }
230         C.rmr_free_msg(mbuf)
231 }
232
233 func (m *RMRClient) SendMsg(params *RMRParams) bool {
234         return m.Send(params, false)
235 }
236
237 func (m *RMRClient) SendRts(params *RMRParams) bool {
238         return m.Send(params, true)
239 }
240
241 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
242         if params.Mbuf != nil {
243                 m.Free(params.Mbuf)
244                 params.Mbuf = nil
245         }
246
247         payLen := len(params.Payload)
248         if params.PayloadLen != 0 {
249                 payLen = params.PayloadLen
250         }
251
252         txBuffer := m.Allocate(payLen)
253         if txBuffer == nil {
254                 return nil
255         }
256         txBuffer.mtype = C.int(params.Mtype)
257         txBuffer.sub_id = C.int(params.SubId)
258         txBuffer.len = C.int(payLen)
259
260         datap := C.CBytes(params.Payload)
261         defer C.free(datap)
262
263         if params != nil {
264                 if params.Meid != nil {
265                         b := make([]byte, int(C.RMR_MAX_MEID))
266                         copy(b, []byte(params.Meid.RanName))
267                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
268                 }
269                 xidLen := len(params.Xid)
270                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
271                         b := make([]byte, int(C.RMR_MAX_XID))
272                         copy(b, []byte(params.Xid))
273                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
274                 }
275         }
276         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
277         return txBuffer
278 }
279
280 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
281
282         txBuffer := m.CopyBuffer(params)
283         if txBuffer == nil {
284                 return false
285         }
286         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
287         if params.status == int(C.RMR_OK) {
288                 return true
289         }
290         return false
291 }
292
293 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
294         var (
295                 currBuffer  *C.rmr_mbuf_t
296                 counterName string = "Transmitted"
297         )
298
299         txBuffer.state = 0
300         if whid != 0 {
301                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
302         } else {
303                 if isRts {
304                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
305                 } else {
306                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
307                 }
308         }
309
310         if currBuffer == nil {
311                 m.UpdateStatCounter("TransmitError")
312                 return m.LogMBufError("SendBuf failed", txBuffer)
313         }
314
315         // Just quick retry seems to help for K8s issue
316         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
317         if maxRetryOnFailure == 0 {
318                 maxRetryOnFailure = 5
319         }
320
321         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
322                 if whid != 0 {
323                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
324                 } else {
325                         if isRts {
326                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
327                         } else {
328                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
329                         }
330                 }
331         }
332
333         if currBuffer.state != C.RMR_OK {
334                 counterName = "TransmitError"
335                 m.LogMBufError("SendBuf failed", currBuffer)
336         }
337
338         m.UpdateStatCounter(counterName)
339         defer m.Free(currBuffer)
340
341         return int(currBuffer.state)
342 }
343
344 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
345         var (
346                 currBuffer  *C.rmr_mbuf_t
347                 counterName string = "Transmitted"
348         )
349         txBuffer := m.CopyBuffer(params)
350         if txBuffer == nil {
351                 return C.RMR_ERR_INITFAILED, ""
352         }
353
354         txBuffer.state = 0
355
356         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
357
358         if currBuffer == nil {
359                 m.UpdateStatCounter("TransmitError")
360                 return m.LogMBufError("SendBuf failed", txBuffer), ""
361         }
362
363         if currBuffer.state != C.RMR_OK {
364                 counterName = "TransmitError"
365                 m.LogMBufError("SendBuf failed", currBuffer)
366         }
367
368         m.UpdateStatCounter(counterName)
369         defer m.Free(currBuffer)
370
371         cptr := unsafe.Pointer(currBuffer.payload)
372         payload := C.GoBytes(cptr, C.int(currBuffer.len))
373
374         return int(currBuffer.state), string(payload)
375 }
376
377 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
378         return m.Wh_open(target)
379 }
380
381 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
382         endpoint := C.CString(target)
383         return C.rmr_wh_open(m.context, endpoint)
384 }
385
386 func (m *RMRClient) Closewh(whid int) {
387         m.Wh_close(C.rmr_whid_t(whid))
388 }
389
390 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
391         C.rmr_wh_close(m.context, whid)
392 }
393
394 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
395         if params.status == int(C.RMR_ERR_RETRY) {
396                 return true
397         }
398         return false
399 }
400
401 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
402         if params.status == int(C.RMR_ERR_NOENDPT) {
403                 return true
404         }
405         return false
406 }
407
408 func (m *RMRClient) UpdateStatCounter(name string) {
409         m.mux.Lock()
410         m.stat[name].Inc()
411         m.mux.Unlock()
412 }
413
414 func (m *RMRClient) RegisterMetrics() {
415         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
416 }
417
418 func (m *RMRClient) Wait() {
419         m.wg.Wait()
420 }
421
422 func (m *RMRClient) IsReady() bool {
423         return m.ready != 0
424 }
425
426 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
427         m.readyCb = cb
428         m.readyCbParams = params
429 }
430
431 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
432         id, ok := RICMessageTypes[name]
433         return id, ok
434 }
435
436 func (m *RMRClient) GetRicMessageName(id int) (s string) {
437         for k, v := range RICMessageTypes {
438                 if id == v {
439                         return k
440                 }
441         }
442         return
443 }
444
445 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
446         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
447         return int(mbuf.state)
448 }
449
450 // To be removed ...
451 func (m *RMRClient) GetStat() (r RMRStatistics) {
452         return
453 }