xapp-frame start to check more often is rmr ready, but log info only every 10 seconds.
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "fmt"
41         "github.com/spf13/viper"
42         "strconv"
43         "strings"
44         "time"
45         "unsafe"
46 )
47
48 var RMRCounterOpts = []CounterOpts{
49         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50         {Name: "Received", Help: "The total number of received RMR messages"},
51         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53 }
54
55 var RMRErrors = map[int]string{
56         C.RMR_OK:             "state is good",
57         C.RMR_ERR_BADARG:     "argument passed to function was unusable",
58         C.RMR_ERR_NOENDPT:    "send/call could not find an endpoint based on msg type",
59         C.RMR_ERR_EMPTY:      "msg received had no payload; attempt to send an empty message",
60         C.RMR_ERR_NOHDR:      "message didn't contain a valid header",
61         C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62         C.RMR_ERR_CALLFAILED: "unable to send call() message",
63         C.RMR_ERR_NOWHOPEN:   "no wormholes are open",
64         C.RMR_ERR_WHID:       "wormhole id was invalid",
65         C.RMR_ERR_OVERFLOW:   "operation would have busted through a buffer/field size",
66         C.RMR_ERR_RETRY:      "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67         C.RMR_ERR_RCVFAILED:  "receive failed (hard error)",
68         C.RMR_ERR_TIMEOUT:    "message processing call timed out",
69         C.RMR_ERR_UNSET:      "the message hasn't been populated with a transport buffer",
70         C.RMR_ERR_TRUNC:      "received message likely truncated",
71         C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72         C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
73 }
74
75 type RMRParams struct {
76         Mtype      int
77         Payload    []byte
78         PayloadLen int
79         Meid       *RMRMeid
80         Xid        string
81         SubId      int
82         Src        string
83         Mbuf       *C.rmr_mbuf_t
84 }
85
86 func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
87         p := C.CString(protPort)
88         m := C.int(maxSize)
89         defer C.free(unsafe.Pointer(p))
90
91         ctx := C.rmr_init(p, m, C.int(0))
92         if ctx == nil {
93                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
94         }
95
96         return &RMRClient{
97                 protPort:   protPort,
98                 numWorkers: numWorkers,
99                 context:    ctx,
100                 consumers:  make([]MessageConsumer, 0),
101                 stat:       Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
102         }
103 }
104
105 func NewRMRClient() *RMRClient {
106         return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
107 }
108
109 func (m *RMRClient) Start(c MessageConsumer) {
110         if c != nil {
111                 m.consumers = append(m.consumers, c)
112         }
113
114         var counter int = 0
115         for {
116                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
117                         Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
118                         break
119                 }
120                 if counter%10 == 0 {
121                         Logger.Info("rmrClient: Waiting for RMR to be ready ...")
122                 }
123                 time.Sleep(1 * time.Second)
124                 counter++
125         }
126         m.wg.Add(m.numWorkers)
127
128         if m.readyCb != nil {
129                 go m.readyCb(m.readyCbParams)
130         }
131
132         for w := 0; w < m.numWorkers; w++ {
133                 go m.Worker("worker-"+strconv.Itoa(w), 0)
134         }
135         m.Wait()
136 }
137
138 func (m *RMRClient) Worker(taskName string, msgSize int) {
139         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, m.protPort)
140
141         defer m.wg.Done()
142         for {
143                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
144                 if rxBuffer == nil {
145                         m.LogMBufError("RecvMsg failed", rxBuffer)
146                         m.UpdateStatCounter("ReceiveError")
147                         continue
148                 }
149                 m.UpdateStatCounter("Received")
150
151                 go m.parseMessage(rxBuffer)
152         }
153 }
154
155 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
156         if len(m.consumers) == 0 {
157                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
158                 return
159         }
160
161         params := &RMRParams{}
162         params.Mbuf = rxBuffer
163         params.Mtype = int(rxBuffer.mtype)
164         params.SubId = int(rxBuffer.sub_id)
165         params.Meid = &RMRMeid{}
166
167         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
168         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
169                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
170         }
171
172         xidBuf := make([]byte, int(C.RMR_MAX_XID))
173         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
174                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
175         }
176
177         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
178         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
179                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
180         }
181
182         for _, c := range m.consumers {
183                 cptr := unsafe.Pointer(rxBuffer.payload)
184                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
185                 params.PayloadLen = int(rxBuffer.len)
186
187                 err := c.Consume(params)
188                 if err != nil {
189                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
190                 }
191         }
192 }
193
194 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
195         buf := C.rmr_alloc_msg(m.context, 0)
196         if buf == nil {
197                 Logger.Error("rmrClient: Allocating message buffer failed!")
198         }
199         return buf
200 }
201
202 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
203         if mbuf == nil {
204                 return
205         }
206         C.rmr_free_msg(mbuf)
207 }
208
209 func (m *RMRClient) SendMsg(params *RMRParams) bool {
210         return m.Send(params, false)
211 }
212
213 func (m *RMRClient) SendRts(params *RMRParams) bool {
214         return m.Send(params, true)
215 }
216
217 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
218         txBuffer := params.Mbuf
219         if txBuffer == nil {
220                 txBuffer = m.Allocate()
221                 if txBuffer == nil {
222                         return false
223                 }
224         }
225
226         txBuffer.mtype = C.int(params.Mtype)
227         txBuffer.sub_id = C.int(params.SubId)
228         txBuffer.len = C.int(len(params.Payload))
229         if params.PayloadLen != 0 {
230                 txBuffer.len = C.int(params.PayloadLen)
231         }
232         datap := C.CBytes(params.Payload)
233         defer C.free(datap)
234
235         if params != nil {
236                 if params.Meid != nil {
237                         b := make([]byte, int(C.RMR_MAX_MEID))
238                         copy(b, []byte(params.Meid.RanName))
239                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
240                 }
241                 xidLen := len(params.Xid)
242                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
243                         b := make([]byte, int(C.RMR_MAX_XID))
244                         copy(b, []byte(params.Xid))
245                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
246                 }
247         }
248         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
249
250         return m.SendBuf(txBuffer, isRts)
251 }
252
253 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
254         var (
255                 currBuffer  *C.rmr_mbuf_t
256                 state       bool   = true
257                 counterName string = "Transmitted"
258         )
259
260         txBuffer.state = 0
261         if isRts {
262                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
263         } else {
264                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
265         }
266
267         if currBuffer == nil {
268                 m.UpdateStatCounter("TransmitError")
269                 return m.LogMBufError("SendBuf failed", txBuffer)
270         }
271
272         // Just quick retry seems to help for K8s issue
273         for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
274                 if isRts {
275                         currBuffer = C.rmr_rts_msg(m.context, currBuffer)
276                 } else {
277                         currBuffer = C.rmr_send_msg(m.context, currBuffer)
278                 }
279         }
280
281         if currBuffer.state != C.RMR_OK {
282                 counterName = "TransmitError"
283                 state = m.LogMBufError("SendBuf failed", currBuffer)
284         }
285
286         m.UpdateStatCounter(counterName)
287         m.Free(currBuffer)
288         return state
289 }
290
291 func (m *RMRClient) UpdateStatCounter(name string) {
292         m.mux.Lock()
293         m.stat[name].Inc()
294         m.mux.Unlock()
295 }
296
297 func (m *RMRClient) RegisterMetrics() {
298         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
299 }
300
301 func (m *RMRClient) Wait() {
302         m.wg.Wait()
303 }
304
305 func (m *RMRClient) IsReady() bool {
306         return m.ready != 0
307 }
308
309 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
310         m.readyCb = cb
311         m.readyCbParams = params
312 }
313
314 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
315         id, ok := RICMessageTypes[name]
316         return id, ok
317 }
318
319 func (m *RMRClient) GetRicMessageName(id int) (s string) {
320         for k, v := range RICMessageTypes {
321                 if id == v {
322                         return k
323                 }
324         }
325         return
326 }
327
328 func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) bool {
329         Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
330         return false
331 }
332
333 // To be removed ...
334 func (m *RMRClient) GetStat() (r RMRStatistics) {
335         return
336 }