49a28d9f82e4e6fbf07d50a2a15954ff61a46179
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "fmt"
41         "github.com/spf13/viper"
42         "strconv"
43         "strings"
44         "time"
45         "unsafe"
46 )
47
48 var RMRCounterOpts = []CounterOpts{
49         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50         {Name: "Received", Help: "The total number of received RMR messages"},
51         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53 }
54
55 var RMRErrors = map[int]string{
56         C.RMR_OK:             "state is good",
57         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
58         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
59         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
60         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
61         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62         C.RMR_ERR_CALLFAILED: "unable to send call() message",
63         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
64         C.RMR_ERR_WHID:       "wormhole id was invalid",
65         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
66         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
68         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
69         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
70         C.RMR_ERR_TRUNC:      "received message likely truncated",
71         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
73 }
74
75 type RMRParams struct {
76         Mtype      int
77         Payload    []byte
78         PayloadLen int
79         Meid       *RMRMeid
80         Xid        string
81         SubId      int
82         Src        string
83         Mbuf       *C.rmr_mbuf_t
84         status     int
85 }
86
87 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
88         p := C.CString(protPort)
89         m := C.int(maxSize)
90         defer C.free(unsafe.Pointer(p))
91
92         ctx := C.rmr_init(p, m, C.int(0))
93         if ctx == nil {
94                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
95         }
96
97         return &RMRClient{
98                 protPort:   protPort,
99                 numWorkers: numWorkers,
100                 context:    ctx,
101                 consumers:  make([]MessageConsumer, 0),
102                 stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
103         }
104 }
105
106 func NewRMRClient() *RMRClient {
107         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
108 }
109
110 func (m *RMRClient) Start(c MessageConsumer) {
111         if c != nil {
112                 m.consumers = append(m.consumers, c)
113         }
114
115         var counter int = 0
116         for {
117                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
118                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
119                         break
120                 }
121                 if counter%10 == 0 {
122                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
123                 }
124                 time.Sleep(1 * time.Second)
125                 counter++
126         }
127         m.wg.Add(m.numWorkers)
128
129         if m.readyCb != nil {
130                 go m.readyCb(m.readyCbParams)
131         }
132
133         for w := 0; w < m.numWorkers; w++ {
134                 go m.Worker("worker-"+strconv.Itoa(w), 0)
135         }
136         m.Wait()
137 }
138
139 func (m *RMRClient) Worker(taskName string, msgSize int) {
140         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
141
142         defer m.wg.Done()
143         for {
144                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
145                 if rxBuffer == nil {
146                         m.LogMBufError("RecvMsg failed", rxBuffer)
147                         m.UpdateStatCounter("ReceiveError")
148                         continue
149                 }
150                 m.UpdateStatCounter("Received")
151
152                 go m.parseMessage(rxBuffer)
153         }
154 }
155
156 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
157         if len(m.consumers) == 0 {
158                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
159                 return
160         }
161
162         params := &RMRParams{}
163         params.Mbuf = rxBuffer
164         params.Mtype = int(rxBuffer.mtype)
165         params.SubId = int(rxBuffer.sub_id)
166         params.Meid = &RMRMeid{}
167
168         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
169         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
170                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
171         }
172
173         xidBuf := make([]byte, int(C.RMR_MAX_XID))
174         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
175                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
176         }
177
178         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
179         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
180                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
181         }
182
183         // Default case: a single consumer
184         if len(m.consumers) == 1 && m.consumers[0] != nil {
185                 params.PayloadLen = int(rxBuffer.len)
186                 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
187                 err := m.consumers[0].Consume(params)
188                 if err != nil {
189                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
190                 }
191                 return
192         }
193
194         // Special case for multiple consumers
195         for _, c := range m.consumers {
196                 cptr := unsafe.Pointer(rxBuffer.payload)
197                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
198                 params.PayloadLen = int(rxBuffer.len)
199                 params.Mtype = int(rxBuffer.mtype)
200                 params.SubId = int(rxBuffer.sub_id)
201
202                 err := c.Consume(params)
203                 if err != nil {
204                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
205                 }
206         }
207 }
208
209 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
210         buf := C.rmr_alloc_msg(m.context, 0)
211         if buf == nil {
212                 Logger.Error("rmrClient: Allocating message buffer failed!")
213         }
214         return buf
215 }
216
217 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
218         if mbuf == nil {
219                 return
220         }
221         C.rmr_free_msg(mbuf)
222 }
223
224 func (m *RMRClient) SendMsg(params *RMRParams) bool {
225         return m.Send(params, false)
226 }
227
228 func (m *RMRClient) SendRts(params *RMRParams) bool {
229         return m.Send(params, true)
230 }
231
232 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
233         txBuffer := params.Mbuf
234         if txBuffer == nil {
235                 txBuffer = m.Allocate()
236                 if txBuffer == nil {
237                         return false
238                 }
239         }
240
241         txBuffer.mtype = C.int(params.Mtype)
242         txBuffer.sub_id = C.int(params.SubId)
243         txBuffer.len = C.int(len(params.Payload))
244         if params.PayloadLen != 0 {
245                 txBuffer.len = C.int(params.PayloadLen)
246         }
247         datap := C.CBytes(params.Payload)
248         defer C.free(datap)
249
250         if params != nil {
251                 if params.Meid != nil {
252                         b := make([]byte, int(C.RMR_MAX_MEID))
253                         copy(b, []byte(params.Meid.RanName))
254                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
255                 }
256                 xidLen := len(params.Xid)
257                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
258                         b := make([]byte, int(C.RMR_MAX_XID))
259                         copy(b, []byte(params.Xid))
260                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
261                 }
262         }
263         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
264
265         params.status = m.SendBuf(txBuffer, isRts)
266         if params.status == int(C.RMR_OK) {
267                 return true
268         }
269         return false
270 }
271
272 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
273         var (
274                 currBuffer  *C.rmr_mbuf_t
275                 counterName string = "Transmitted"
276         )
277
278         txBuffer.state = 0
279         if isRts {
280                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
281         } else {
282                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
283         }
284
285         if currBuffer == nil {
286                 m.UpdateStatCounter("TransmitError")
287                 return m.LogMBufError("SendBuf failed", txBuffer)
288         }
289
290         // Just quick retry seems to help for K8s issue
291         maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
292         if maxRetryOnFailure == 0 {
293                 maxRetryOnFailure = 5
294         }
295
296         for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
297                 if isRts {
298                         currBuffer = C.rmr_rts_msg(m.context, currBuffer)
299                 } else {
300                         currBuffer = C.rmr_send_msg(m.context, currBuffer)
301                 }
302         }
303
304         if currBuffer.state != C.RMR_OK {
305                 counterName = "TransmitError"
306                 m.LogMBufError("SendBuf failed", currBuffer)
307         }
308
309         m.UpdateStatCounter(counterName)
310         defer m.Free(currBuffer)
311
312         return int(currBuffer.state)
313 }
314
315 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
316         if params.status == int(C.RMR_ERR_RETRY) {
317                 return true
318         }
319         return false
320 }
321
322 func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
323         if params.status == int(C.RMR_ERR_NOENDPT) {
324                 return true
325         }
326         return false
327 }
328
329 func (m *RMRClient) UpdateStatCounter(name string) {
330         m.mux.Lock()
331         m.stat[name].Inc()
332         m.mux.Unlock()
333 }
334
335 func (m *RMRClient) RegisterMetrics() {
336         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
337 }
338
339 func (m *RMRClient) Wait() {
340         m.wg.Wait()
341 }
342
343 func (m *RMRClient) IsReady() bool {
344         return m.ready != 0
345 }
346
347 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
348         m.readyCb = cb
349         m.readyCbParams = params
350 }
351
352 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
353         id, ok := RICMessageTypes[name]
354         return id, ok
355 }
356
357 func (m *RMRClient) GetRicMessageName(id int) (s string) {
358         for k, v := range RICMessageTypes {
359                 if id == v {
360                         return k
361                 }
362         }
363         return
364 }
365
366 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
367         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
368         return int(mbuf.state)
369 }
370
371 // To be removed ...
372 func (m *RMRClient) GetStat() (r RMRStatistics) {
373         return
374 }