Made possible to have multiple rmrclient instances
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "fmt"
41         "github.com/spf13/viper"
42         "strconv"
43         "strings"
44         "time"
45         "unsafe"
46 )
47
48 var RMRCounterOpts = []CounterOpts{
49         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50         {Name: "Received", Help: "The total number of received RMR messages"},
51         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53 }
54
55 var RMRErrors = map[int]string{
56         C.RMR_OK:             "state is good",
57         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
58         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
59         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
60         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
61         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62         C.RMR_ERR_CALLFAILED: "unable to send call() message",
63         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
64         C.RMR_ERR_WHID:       "wormhole id was invalid",
65         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
66         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
68         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
69         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
70         C.RMR_ERR_TRUNC:      "received message likely truncated",
71         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
73 }
74
75 type RMRParams struct {
76         Mtype      int
77         Payload    []byte
78         PayloadLen int
79         Meid       *RMRMeid
80         Xid        string
81         SubId      int
82         Src        string
83         Mbuf       *C.rmr_mbuf_t
84 }
85
86 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
87         p := C.CString(protPort)
88         m := C.int(maxSize)
89         defer C.free(unsafe.Pointer(p))
90
91         ctx := C.rmr_init(p, m, C.int(0))
92         if ctx == nil {
93                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
94         }
95
96         return &RMRClient{
97                 protPort:   protPort,
98                 numWorkers: numWorkers,
99                 context:    ctx,
100                 consumers:  make([]MessageConsumer, 0),
101                 stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
102         }
103 }
104
105 func NewRMRClient() *RMRClient {
106         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
107 }
108
109 func (m *RMRClient) Start(c MessageConsumer) {
110         if c != nil {
111                 m.consumers = append(m.consumers, c)
112         }
113
114         for {
115                 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
116
117                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
118                         break
119                 }
120                 time.Sleep(10 * time.Second)
121         }
122         m.wg.Add(m.numWorkers)
123
124         if m.readyCb != nil {
125                 go m.readyCb(m.readyCbParams)
126         }
127
128         for w := 0; w < m.numWorkers; w++ {
129                 go m.Worker("worker-"+strconv.Itoa(w), 0)
130         }
131         m.Wait()
132 }
133
134 func (m *RMRClient) Worker(taskName string, msgSize int) {
135         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
136
137         defer m.wg.Done()
138         for {
139                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
140                 if rxBuffer == nil {
141                         m.LogMBufError("RecvMsg failed", rxBuffer)
142                         m.UpdateStatCounter("ReceiveError")
143                         continue
144                 }
145                 m.UpdateStatCounter("Received")
146
147                 go m.parseMessage(rxBuffer)
148         }
149 }
150
151 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
152         if len(m.consumers) == 0 {
153                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
154                 return
155         }
156
157         params := &RMRParams{}
158         params.Mbuf = rxBuffer
159         params.Mtype = int(rxBuffer.mtype)
160         params.SubId = int(rxBuffer.sub_id)
161         params.Meid = &RMRMeid{}
162
163         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
164         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
165                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
166         }
167
168         xidBuf := make([]byte, int(C.RMR_MAX_XID))
169         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
170                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
171         }
172
173         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
174         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
175                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
176         }
177
178         for _, c := range m.consumers {
179                 cptr := unsafe.Pointer(rxBuffer.payload)
180                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
181                 params.PayloadLen = int(rxBuffer.len)
182
183                 err := c.Consume(params)
184                 if err != nil {
185                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
186                 }
187         }
188 }
189
190 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
191         buf := C.rmr_alloc_msg(m.context, 0)
192         if buf == nil {
193                 Logger.Error("rmrClient: Allocating message buffer failed!")
194         }
195         return buf
196 }
197
198 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
199         if mbuf == nil {
200                 return
201         }
202         C.rmr_free_msg(mbuf)
203 }
204
205 func (m *RMRClient) SendMsg(params *RMRParams) bool {
206         return m.Send(params, false)
207 }
208
209 func (m *RMRClient) SendRts(params *RMRParams) bool {
210         return m.Send(params, true)
211 }
212
213 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
214         txBuffer := params.Mbuf
215         if txBuffer == nil {
216                 txBuffer = m.Allocate()
217                 if txBuffer == nil {
218                         return false
219                 }
220         }
221
222         txBuffer.mtype = C.int(params.Mtype)
223         txBuffer.sub_id = C.int(params.SubId)
224         txBuffer.len = C.int(len(params.Payload))
225         if params.PayloadLen != 0 {
226                 txBuffer.len = C.int(params.PayloadLen)
227         }
228         datap := C.CBytes(params.Payload)
229         defer C.free(datap)
230
231         if params != nil {
232                 if params.Meid != nil {
233                         b := make([]byte, int(C.RMR_MAX_MEID))
234                         copy(b, []byte(params.Meid.RanName))
235                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
236                 }
237                 xidLen := len(params.Xid)
238                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
239                         b := make([]byte, int(C.RMR_MAX_XID))
240                         copy(b, []byte(params.Xid))
241                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
242                 }
243         }
244         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
245
246         return m.SendBuf(txBuffer, isRts)
247 }
248
249 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
250         var (
251                 currBuffer  *C.rmr_mbuf_t
252                 state       bool   = true
253                 counterName string = "Transmitted"
254         )
255
256         txBuffer.state = 0
257         if isRts {
258                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
259         } else {
260                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
261         }
262
263         if currBuffer == nil {
264                 m.UpdateStatCounter("TransmitError")
265                 return m.LogMBufError("SendBuf failed", txBuffer)
266         }
267
268         // Just quick retry seems to help for K8s issue
269         for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
270                 if isRts {
271                         currBuffer = C.rmr_rts_msg(m.context, currBuffer)
272                 } else {
273                         currBuffer = C.rmr_send_msg(m.context, currBuffer)
274                 }
275         }
276
277         if currBuffer.state != C.RMR_OK {
278                 counterName = "TransmitError"
279                 state = m.LogMBufError("SendBuf failed", currBuffer)
280         }
281
282         m.UpdateStatCounter(counterName)
283         m.Free(currBuffer)
284         return state
285 }
286
287 func (m *RMRClient) UpdateStatCounter(name string) {
288         m.mux.Lock()
289         m.stat[name].Inc()
290         m.mux.Unlock()
291 }
292
293 func (m *RMRClient) RegisterMetrics() {
294         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
295 }
296
297 func (m *RMRClient) Wait() {
298         m.wg.Wait()
299 }
300
301 func (m *RMRClient) IsReady() bool {
302         return m.ready != 0
303 }
304
305 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
306         m.readyCb = cb
307         m.readyCbParams = params
308 }
309
310 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
311         id, ok := RICMessageTypes[name]
312         return id, ok
313 }
314
315 func (m *RMRClient) GetRicMessageName(id int) (s string) {
316         for k, v := range RICMessageTypes {
317                 if id == v {
318                         return k
319                 }
320         }
321         return
322 }
323
324 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) bool {
325         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
326         return false
327 }
328
329 // To be removed ...
330 func (m *RMRClient) GetStat() (r RMRStatistics) {
331         return
332 }