Removed UENIB. Simple Dockerfile for ci
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "github.com/spf13/viper"
41         "strconv"
42         "strings"
43         "time"
44         "unsafe"
45 )
46
47 var RMRCounterOpts = []CounterOpts{
48         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49         {Name: "Received", Help: "The total number of received RMR messages"},
50         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
52 }
53
54 type RMRParams struct {
55         Mtype      int
56         Payload    []byte
57         PayloadLen int
58         Meid       *RMRMeid
59         Xid        string
60         SubId      int
61         Src        string
62         Mbuf       *C.rmr_mbuf_t
63 }
64
65 func NewRMRClient() *RMRClient {
66         p := C.CString(viper.GetString("rmr.protPort"))
67         m := C.int(viper.GetInt("rmr.maxSize"))
68         defer C.free(unsafe.Pointer(p))
69
70         ctx := C.rmr_init(p, m, C.int(0))
71         if ctx == nil {
72                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
73         }
74
75         return &RMRClient{
76                 context:   ctx,
77                 consumers: make([]MessageConsumer, 0),
78                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
79         }
80 }
81
82 func (m *RMRClient) Start(c MessageConsumer) {
83         if c != nil {
84                 m.consumers = append(m.consumers, c)
85         }
86
87         for {
88                 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
89
90                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
91                         break
92                 }
93                 time.Sleep(10 * time.Second)
94         }
95         m.wg.Add(viper.GetInt("rmr.numWorkers"))
96
97         if m.readyCb != nil {
98                 go m.readyCb(m.readyCbParams)
99         }
100
101         for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
102                 go m.Worker("worker-"+strconv.Itoa(w), 0)
103         }
104         m.Wait()
105 }
106
107 func (m *RMRClient) Worker(taskName string, msgSize int) {
108         p := viper.GetString("rmr.protPort")
109         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
110
111         defer m.wg.Done()
112         for {
113                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
114                 if rxBuffer == nil {
115                         m.UpdateStatCounter("ReceiveError")
116                         continue
117                 }
118                 m.UpdateStatCounter("Received")
119
120                 go m.parseMessage(rxBuffer)
121         }
122 }
123
124 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
125         if len(m.consumers) == 0 {
126                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
127                 return
128         }
129
130         params := &RMRParams{}
131         params.Mbuf = rxBuffer
132         params.Mtype = int(rxBuffer.mtype)
133         params.SubId = int(rxBuffer.sub_id)
134         params.Meid = &RMRMeid{}
135
136         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
137         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
138                 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
139                 //params.Meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
140                 //params.Meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
141         }
142
143         xidBuf := make([]byte, int(C.RMR_MAX_XID))
144         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
145                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
146         }
147
148         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
149         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
150                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
151         }
152
153         for _, c := range m.consumers {
154                 cptr := unsafe.Pointer(rxBuffer.payload)
155                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
156                 params.PayloadLen = int(rxBuffer.len)
157
158                 err := c.Consume(params)
159                 if err != nil {
160                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
161                 }
162         }
163 }
164
165 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
166         buf := C.rmr_alloc_msg(m.context, 0)
167         if buf == nil {
168                 Logger.Error("rmrClient: Allocating message buffer failed!")
169         }
170         return buf
171 }
172
173 func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
174         if mbuf == nil {
175                 return
176         }
177         C.rmr_free_msg(mbuf)
178 }
179
180 func (m *RMRClient) SendMsg(params *RMRParams) bool {
181         return m.Send(params, false)
182 }
183
184 func (m *RMRClient) SendRts(params *RMRParams) bool {
185         return m.Send(params, true)
186 }
187
188 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
189         txBuffer := params.Mbuf
190         if txBuffer == nil {
191                 txBuffer = m.Allocate()
192         }
193
194         txBuffer.mtype = C.int(params.Mtype)
195         txBuffer.sub_id = C.int(params.SubId)
196         txBuffer.len = C.int(len(params.Payload))
197         if params.PayloadLen != 0 {
198                 txBuffer.len = C.int(params.PayloadLen)
199         }
200         datap := C.CBytes(params.Payload)
201         defer C.free(datap)
202
203         if params != nil {
204                 if params.Meid != nil {
205                         b := make([]byte, int(C.RMR_MAX_MEID))
206                         copy(b, []byte(params.Meid.RanName))
207                         //copy(b, []byte(params.Meid.PlmnID))
208                         //copy(b[16:], []byte(params.Meid.EnbID))
209                         C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
210                 }
211                 xidLen := len(params.Xid)
212                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
213                         b := make([]byte, int(C.RMR_MAX_XID))
214                         copy(b, []byte(params.Xid))
215                         C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
216                 }
217         }
218         C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
219
220         return m.SendBuf(txBuffer, isRts)
221 }
222
223 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
224         var (
225                 currBuffer  *C.rmr_mbuf_t
226                 state       bool   = true
227                 counterName string = "Transmitted"
228         )
229
230         txBuffer.state = 0
231         if isRts {
232                 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
233         } else {
234                 currBuffer = C.rmr_send_msg(m.context, txBuffer)
235         }
236
237         if currBuffer == nil {
238                 m.UpdateStatCounter("TransmitError")
239                 return false
240         }
241
242         // Just quick retry seems to help for K8s issue
243         for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
244                 if isRts {
245                         currBuffer = C.rmr_rts_msg(m.context, currBuffer)
246                 } else {
247                         currBuffer = C.rmr_send_msg(m.context, currBuffer)
248                 }
249         }
250
251         if currBuffer.state != C.RMR_OK {
252                 state = false
253                 counterName = "TransmitError"
254         }
255
256         m.UpdateStatCounter(counterName)
257         m.Free(currBuffer)
258         return state
259 }
260
261 func (m *RMRClient) UpdateStatCounter(name string) {
262         m.mux.Lock()
263         m.stat[name].Inc()
264         m.mux.Unlock()
265 }
266
267 func (m *RMRClient) RegisterMetrics() {
268         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
269 }
270
271 func (m *RMRClient) Wait() {
272         m.wg.Wait()
273 }
274
275 func (m *RMRClient) IsReady() bool {
276         return m.ready != 0
277 }
278
279 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
280         m.readyCb = cb
281         m.readyCbParams = params
282 }
283
284 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
285         id, ok := RICMessageTypes[name]
286         return id, ok
287 }
288
289 func (m *RMRClient) GetRicMessageName(id int) (s string) {
290         for k, v := range RICMessageTypes {
291                 if id == v {
292                         return k
293                 }
294         }
295         return
296 }
297
298 // To be removed ...
299 func (m *RMRClient) GetStat() (r RMRStatistics) {
300         return
301 }