Initiate rmr_mbuf_t with payload len when sending message.
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_si
36 */
37 import "C"
38
39 import (
40         "fmt"
41         "github.com/spf13/viper"
42         "strconv"
43         "strings"
44         "time"
45         "unsafe"
46 )
47
48 var RMRCounterOpts = []CounterOpts{
49         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50         {Name: "Received", Help: "The total number of received RMR messages"},
51         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53 }
54
55 var RMRErrors = map[int]string{
56         C.RMR_OK:             "state is good",
57         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
58         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
59         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
60         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
61         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62         C.RMR_ERR_CALLFAILED: "unable to send call() message",
63         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
64         C.RMR_ERR_WHID:       "wormhole id was invalid",
65         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
66         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
68         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
69         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
70         C.RMR_ERR_TRUNC:      "received message likely truncated",
71         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
73 }
74
75 type RMRParams struct {
76         Mtype      int
77         Payload    []byte
78         PayloadLen int
79         Meid       *RMRMeid
80         Xid        string
81         SubId      int
82         Src        string
83         Mbuf       *C.rmr_mbuf_t
84         Whid       int
85         Callid     int
86         Timeout    int
87         status     int
88 }
89
90 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
91         p := C.CString(protPort)
92         m := C.int(maxSize)
93         c := C.int(threadType)
94         defer C.free(unsafe.Pointer(p))
95
96         //ctx := C.rmr_init(p, m, C.int(0))
97         //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
98         ctx := C.rmr_init(p, m, c)
99         if ctx == nil {
100                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
101         }
102
103         return &RMRClient{
104                 protPort:   protPort,
105                 numWorkers: numWorkers,
106                 context:    ctx,
107                 consumers:  make([]MessageConsumer, 0),
108                 stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
109         }
110 }
111
112 func NewRMRClient() *RMRClient {
113         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
114 }
115
116 func (m *RMRClient) Start(c MessageConsumer) {
117         if c != nil {
118                 m.consumers = append(m.consumers, c)
119         }
120
121         var counter int = 0
122         for {
123                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
124                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
125                         break
126                 }
127                 if counter%10 == 0 {
128                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
129                 }
130                 time.Sleep(1 * time.Second)
131                 counter++
132         }
133         m.wg.Add(m.numWorkers)
134
135         if m.readyCb != nil {
136                 go m.readyCb(m.readyCbParams)
137         }
138
139         for w := 0; w < m.numWorkers; w++ {
140                 go m.Worker("worker-"+strconv.Itoa(w), 0)
141         }
142         m.Wait()
143 }
144
145 func (m *RMRClient) Worker(taskName string, msgSize int) {
146         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
147
148         defer m.wg.Done()
149         for {
150                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
151                 if rxBuffer == nil {
152                         m.LogMBufError("RecvMsg failed", rxBuffer)
153                         m.UpdateStatCounter("ReceiveError")
154                         continue
155                 }
156                 m.UpdateStatCounter("Received")
157
158                 go m.parseMessage(rxBuffer)
159         }
160 }
161
162 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
163         if len(m.consumers) == 0 {
164                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
165                 return
166         }
167
168         params := &RMRParams{}
169         params.Mbuf = rxBuffer
170         params.Mtype = int(rxBuffer.mtype)
171         params.SubId = int(rxBuffer.sub_id)
172         params.Meid = &RMRMeid{}
173
174         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
175         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
176                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
177         }
178
179         xidBuf := make([]byte, int(C.RMR_MAX_XID))
180         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
181                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
182         }
183
184         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
185         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
186                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
187         }
188
189         // Default case: a single consumer
190         if len(m.consumers) == 1 && m.consumers[0] != nil {
191                 params.PayloadLen = int(rxBuffer.len)
192                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
193                 err := m.consumers[0].Consume(params)
194                 if err != nil {
195                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
196                 }
197                 return
198         }
199
200         // Special case for multiple consumers
201         for _, c := range m.consumers {
202                 cptr := unsafe.Pointer(rxBuffer.payload)
203                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
204                 params.PayloadLen = int(rxBuffer.len)
205                 params.Mtype = int(rxBuffer.mtype)
206                 params.SubId = int(rxBuffer.sub_id)
207
208                 err := c.Consume(params)
209                 if err != nil {
210                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
211                 }
212         }
213 }
214
215 func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
216         buf := C.rmr_alloc_msg(m.context, C.int(size))
217         if buf == nil {
218                 Logger.Error("rmrClient: Allocating message buffer failed!")
219         }
220         return buf
221 }
222
223 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
224         if mbuf == nil {
225                 return
226         }
227         C.rmr_free_msg(mbuf)
228 }
229
230 func (m *RMRClient) SendMsg(params *RMRParams) bool {
231         return m.Send(params, false)
232 }
233
234 func (m *RMRClient) SendRts(params *RMRParams) bool {
235         return m.Send(params, true)
236 }
237
238 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
239         if params.Mbuf != nil {
240                 m.Free(params.Mbuf)
241                 params.Mbuf = nil
242         }
243
244         payLen := len(params.Payload)
245         if params.PayloadLen != 0 {
246                 payLen = params.PayloadLen
247         }
248
249         txBuffer := m.Allocate(payLen)
250         if txBuffer == nil {
251                 return nil
252         }
253         txBuffer.mtype = C.int(params.Mtype)
254         txBuffer.sub_id = C.int(params.SubId)
255         txBuffer.len = C.int(payLen)
256
257         datap := C.CBytes(params.Payload)
258         defer C.free(datap)
259
260         if params != nil {
261                 if params.Meid != nil {
262                         b := make([]byte, int(C.RMR_MAX_MEID))
263                         copy(b, []byte(params.Meid.RanName))
264                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
265                 }
266                 xidLen := len(params.Xid)
267                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
268                         b := make([]byte, int(C.RMR_MAX_XID))
269                         copy(b, []byte(params.Xid))
270                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
271                 }
272         }
273         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
274         return txBuffer
275 }
276
277 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
278
279         txBuffer := m.CopyBuffer(params)
280         if txBuffer == nil {
281                 return false
282         }
283         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
284         if params.status == int(C.RMR_OK) {
285                 return true
286         }
287         return false
288 }
289
290 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
291         var (
292                 currBuffer  *C.rmr_mbuf_t
293                 counterName string = "Transmitted"
294         )
295
296         txBuffer.state = 0
297         if whid != 0 {
298                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
299         } else {
300                 if isRts {
301                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
302                 } else {
303                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
304                 }
305         }
306
307         if currBuffer == nil {
308                 m.UpdateStatCounter("TransmitError")
309                 return m.LogMBufError("SendBuf failed", txBuffer)
310         }
311
312         // Just quick retry seems to help for K8s issue
313         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
314         if maxRetryOnFailure == 0 {
315                 maxRetryOnFailure = 5
316         }
317
318         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
319                 if whid != 0 {
320                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
321                 } else {
322                         if isRts {
323                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
324                         } else {
325                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
326                         }
327                 }
328         }
329
330         if currBuffer.state != C.RMR_OK {
331                 counterName = "TransmitError"
332                 m.LogMBufError("SendBuf failed", currBuffer)
333         }
334
335         m.UpdateStatCounter(counterName)
336         defer m.Free(currBuffer)
337
338         return int(currBuffer.state)
339 }
340
341 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
342         var (
343                 currBuffer  *C.rmr_mbuf_t
344                 counterName string = "Transmitted"
345         )
346         txBuffer := m.CopyBuffer(params)
347         if txBuffer == nil {
348                 return C.RMR_ERR_INITFAILED, ""
349         }
350
351         txBuffer.state = 0
352
353         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
354
355         if currBuffer == nil {
356                 m.UpdateStatCounter("TransmitError")
357                 return m.LogMBufError("SendBuf failed", txBuffer), ""
358         }
359
360         if currBuffer.state != C.RMR_OK {
361                 counterName = "TransmitError"
362                 m.LogMBufError("SendBuf failed", currBuffer)
363         }
364
365         m.UpdateStatCounter(counterName)
366         defer m.Free(currBuffer)
367
368         cptr := unsafe.Pointer(currBuffer.payload)
369         payload := C.GoBytes(cptr, C.int(currBuffer.len))
370
371         return int(currBuffer.state), string(payload)
372 }
373
374 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
375         return m.Wh_open(target)
376 }
377
378 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
379         endpoint := C.CString(target)
380         return C.rmr_wh_open(m.context, endpoint)
381 }
382
383 func (m *RMRClient) Closewh(whid int) {
384         m.Wh_close(C.rmr_whid_t(whid))
385 }
386
387 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
388         C.rmr_wh_close(m.context, whid)
389 }
390
391 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
392         if params.status == int(C.RMR_ERR_RETRY) {
393                 return true
394         }
395         return false
396 }
397
398 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
399         if params.status == int(C.RMR_ERR_NOENDPT) {
400                 return true
401         }
402         return false
403 }
404
405 func (m *RMRClient) UpdateStatCounter(name string) {
406         m.mux.Lock()
407         m.stat[name].Inc()
408         m.mux.Unlock()
409 }
410
411 func (m *RMRClient) RegisterMetrics() {
412         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
413 }
414
415 func (m *RMRClient) Wait() {
416         m.wg.Wait()
417 }
418
419 func (m *RMRClient) IsReady() bool {
420         return m.ready != 0
421 }
422
423 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
424         m.readyCb = cb
425         m.readyCbParams = params
426 }
427
428 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
429         id, ok := RICMessageTypes[name]
430         return id, ok
431 }
432
433 func (m *RMRClient) GetRicMessageName(id int) (s string) {
434         for k, v := range RICMessageTypes {
435                 if id == v {
436                         return k
437                 }
438         }
439         return
440 }
441
442 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
443         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
444         return int(mbuf.state)
445 }
446
447 // To be removed ...
448 func (m *RMRClient) GetStat() (r RMRStatistics) {
449         return
450 }