0aeced219009dd3d54e3aa8d2477bd94c0ef6f12
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_si
36 */
37 import "C"
38
39 import (
40         "fmt"
41         "github.com/spf13/viper"
42         "strconv"
43         "strings"
44         "time"
45         "unsafe"
46 )
47
48 var RMRCounterOpts = []CounterOpts{
49         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50         {Name: "Received", Help: "The total number of received RMR messages"},
51         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53 }
54
55 var RMRErrors = map[int]string{
56         C.RMR_OK:             "state is good",
57         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
58         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
59         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
60         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
61         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62         C.RMR_ERR_CALLFAILED: "unable to send call() message",
63         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
64         C.RMR_ERR_WHID:       "wormhole id was invalid",
65         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
66         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
68         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
69         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
70         C.RMR_ERR_TRUNC:      "received message likely truncated",
71         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
73 }
74
75 type RMRParams struct {
76         Mtype      int
77         Payload    []byte
78         PayloadLen int
79         Meid       *RMRMeid
80         Xid        string
81         SubId      int
82         Src        string
83         Mbuf       *C.rmr_mbuf_t
84         Whid       int
85         Callid     int
86         Timeout    int
87         status     int
88 }
89
90 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
91         p := C.CString(protPort)
92         m := C.int(maxSize)
93         c := C.int(threadType)
94         defer C.free(unsafe.Pointer(p))
95
96         //ctx := C.rmr_init(p, m, C.int(0))
97         //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
98         ctx := C.rmr_init(p, m, c)
99         if ctx == nil {
100                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
101         }
102
103         return &RMRClient{
104                 protPort:   protPort,
105                 numWorkers: numWorkers,
106                 context:    ctx,
107                 consumers:  make([]MessageConsumer, 0),
108                 stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
109         }
110 }
111
112 func NewRMRClient() *RMRClient {
113         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
114 }
115
116 func (m *RMRClient) Start(c MessageConsumer) {
117         if c != nil {
118                 m.consumers = append(m.consumers, c)
119         }
120
121         var counter int = 0
122         for {
123                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
124                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
125                         break
126                 }
127                 if counter%10 == 0 {
128                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
129                 }
130                 time.Sleep(1 * time.Second)
131                 counter++
132         }
133         m.wg.Add(m.numWorkers)
134
135         if m.readyCb != nil {
136                 go m.readyCb(m.readyCbParams)
137         }
138
139         for w := 0; w < m.numWorkers; w++ {
140                 go m.Worker("worker-"+strconv.Itoa(w), 0)
141         }
142         m.Wait()
143 }
144
145 func (m *RMRClient) Worker(taskName string, msgSize int) {
146         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
147
148         defer m.wg.Done()
149         for {
150                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
151                 if rxBuffer == nil {
152                         m.LogMBufError("RecvMsg failed", rxBuffer)
153                         m.UpdateStatCounter("ReceiveError")
154                         continue
155                 }
156                 m.UpdateStatCounter("Received")
157
158                 go m.parseMessage(rxBuffer)
159         }
160 }
161
162 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
163         if len(m.consumers) == 0 {
164                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
165                 return
166         }
167
168         params := &RMRParams{}
169         params.Mbuf = rxBuffer
170         params.Mtype = int(rxBuffer.mtype)
171         params.SubId = int(rxBuffer.sub_id)
172         params.Meid = &RMRMeid{}
173
174         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
175         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
176                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
177         }
178
179         xidBuf := make([]byte, int(C.RMR_MAX_XID))
180         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
181                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
182         }
183
184         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
185         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
186                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
187         }
188
189         // Default case: a single consumer
190         if len(m.consumers) == 1 && m.consumers[0] != nil {
191                 params.PayloadLen = int(rxBuffer.len)
192                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
193                 err := m.consumers[0].Consume(params)
194                 if err != nil {
195                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
196                 }
197                 return
198         }
199
200         // Special case for multiple consumers
201         for _, c := range m.consumers {
202                 cptr := unsafe.Pointer(rxBuffer.payload)
203                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
204                 params.PayloadLen = int(rxBuffer.len)
205                 params.Mtype = int(rxBuffer.mtype)
206                 params.SubId = int(rxBuffer.sub_id)
207
208                 err := c.Consume(params)
209                 if err != nil {
210                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
211                 }
212         }
213 }
214
215 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
216         buf := C.rmr_alloc_msg(m.context, 0)
217         if buf == nil {
218                 Logger.Error("rmrClient: Allocating message buffer failed!")
219         }
220         return buf
221 }
222
223 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
224         if mbuf == nil {
225                 return
226         }
227         C.rmr_free_msg(mbuf)
228 }
229
230 func (m *RMRClient) SendMsg(params *RMRParams) bool {
231         return m.Send(params, false)
232 }
233
234 func (m *RMRClient) SendRts(params *RMRParams) bool {
235         return m.Send(params, true)
236 }
237
238 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
239         txBuffer := params.Mbuf
240         if txBuffer == nil {
241                 txBuffer = m.Allocate()
242                 if txBuffer == nil {
243                         return nil
244                 }
245         }
246
247         txBuffer.mtype = C.int(params.Mtype)
248         txBuffer.sub_id = C.int(params.SubId)
249         txBuffer.len = C.int(len(params.Payload))
250         if params.PayloadLen != 0 {
251                 txBuffer.len = C.int(params.PayloadLen)
252         }
253         datap := C.CBytes(params.Payload)
254         defer C.free(datap)
255
256         if params != nil {
257                 if params.Meid != nil {
258                         b := make([]byte, int(C.RMR_MAX_MEID))
259                         copy(b, []byte(params.Meid.RanName))
260                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
261                 }
262                 xidLen := len(params.Xid)
263                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
264                         b := make([]byte, int(C.RMR_MAX_XID))
265                         copy(b, []byte(params.Xid))
266                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
267                 }
268         }
269         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
270         return txBuffer
271 }
272
273 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
274
275         txBuffer := m.CopyBuffer(params)
276         if txBuffer == nil {
277                 return false
278         }
279         params.status = m.SendBuf(txBuffer, isRts, params.Whid)
280         if params.status == int(C.RMR_OK) {
281                 return true
282         }
283         return false
284 }
285
286 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
287         var (
288                 currBuffer  *C.rmr_mbuf_t
289                 counterName string = "Transmitted"
290         )
291
292         txBuffer.state = 0
293         if whid != 0 {
294                 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
295         } else {
296                 if isRts {
297                         currBuffer = C.rmr_rts_msg(m.context, txBuffer)
298                 } else {
299                         currBuffer = C.rmr_send_msg(m.context, txBuffer)
300                 }
301         }
302
303         if currBuffer == nil {
304                 m.UpdateStatCounter("TransmitError")
305                 return m.LogMBufError("SendBuf failed", txBuffer)
306         }
307
308         // Just quick retry seems to help for K8s issue
309         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
310         if maxRetryOnFailure == 0 {
311                 maxRetryOnFailure = 5
312         }
313
314         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
315                 if whid != 0 {
316                         currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
317                 } else {
318                         if isRts {
319                                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
320                         } else {
321                                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
322                         }
323                 }
324         }
325
326         if currBuffer.state != C.RMR_OK {
327                 counterName = "TransmitError"
328                 m.LogMBufError("SendBuf failed", currBuffer)
329         }
330
331         m.UpdateStatCounter(counterName)
332         defer m.Free(currBuffer)
333
334         return int(currBuffer.state)
335 }
336
337 func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
338         var (
339                 currBuffer  *C.rmr_mbuf_t
340                 counterName string = "Transmitted"
341         )
342         txBuffer := m.CopyBuffer(params)
343         if txBuffer == nil {
344                 return C.RMR_ERR_INITFAILED, ""
345         }
346
347         txBuffer.state = 0
348
349         currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
350
351         if currBuffer == nil {
352                 m.UpdateStatCounter("TransmitError")
353                 return m.LogMBufError("SendBuf failed", txBuffer), ""
354         }
355
356         if currBuffer.state != C.RMR_OK {
357                 counterName = "TransmitError"
358                 m.LogMBufError("SendBuf failed", currBuffer)
359         }
360
361         m.UpdateStatCounter(counterName)
362         defer m.Free(currBuffer)
363
364         cptr := unsafe.Pointer(currBuffer.payload)
365         payload := C.GoBytes(cptr, C.int(currBuffer.len))
366
367         return int(currBuffer.state), string(payload)
368 }
369
370 func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
371         return m.Wh_open(target)
372 }
373
374 func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
375         endpoint := C.CString(target)
376         return C.rmr_wh_open(m.context, endpoint)
377 }
378
379 func (m *RMRClient) Closewh(whid int) {
380         m.Wh_close(C.rmr_whid_t(whid))
381 }
382
383 func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
384         C.rmr_wh_close(m.context, whid)
385 }
386
387 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
388         if params.status == int(C.RMR_ERR_RETRY) {
389                 return true
390         }
391         return false
392 }
393
394 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
395         if params.status == int(C.RMR_ERR_NOENDPT) {
396                 return true
397         }
398         return false
399 }
400
401 func (m *RMRClient) UpdateStatCounter(name string) {
402         m.mux.Lock()
403         m.stat[name].Inc()
404         m.mux.Unlock()
405 }
406
407 func (m *RMRClient) RegisterMetrics() {
408         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
409 }
410
411 func (m *RMRClient) Wait() {
412         m.wg.Wait()
413 }
414
415 func (m *RMRClient) IsReady() bool {
416         return m.ready != 0
417 }
418
419 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
420         m.readyCb = cb
421         m.readyCbParams = params
422 }
423
424 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
425         id, ok := RICMessageTypes[name]
426         return id, ok
427 }
428
429 func (m *RMRClient) GetRicMessageName(id int) (s string) {
430         for k, v := range RICMessageTypes {
431                 if id == v {
432                         return k
433                 }
434         }
435         return
436 }
437
438 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
439         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
440         return int(mbuf.state)
441 }
442
443 // To be removed ...
444 func (m *RMRClient) GetStat() (r RMRStatistics) {
445         return
446 }