Added new implementation for the following feature requests:
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "github.com/spf13/viper"
41         "strconv"
42         "strings"
43         "time"
44         "unsafe"
45 )
46
47 var RMRCounterOpts = []CounterOpts{
48         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49         {Name: "Received", Help: "The total number of received RMR messages"},
50         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
52 }
53
54 type RMRParams struct {
55         Mtype           int
56         Payload         []byte
57         PayloadLen      int
58         Meid            *RMRMeid
59         Xid             string
60         SubId           int
61         Src             string
62         Mbuf            *C.rmr_mbuf_t
63 }
64
65 func NewRMRClient() *RMRClient {
66         p := C.CString(viper.GetString("rmr.protPort"))
67         m := C.int(viper.GetInt("rmr.maxSize"))
68         defer C.free(unsafe.Pointer(p))
69
70         ctx := C.rmr_init(p, m, C.int(0))
71         if ctx == nil {
72                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
73         }
74
75         return &RMRClient{
76                 context:   ctx,
77                 consumers: make([]MessageConsumer, 0),
78                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
79         }
80 }
81
82 func (m *RMRClient) Start(c MessageConsumer) {
83         if c != nil {
84                 m.consumers = append(m.consumers, c)
85         }
86
87         for {
88                 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
89
90                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
91                         break
92                 }
93                 time.Sleep(10 * time.Second)
94         }
95         m.wg.Add(viper.GetInt("rmr.numWorkers"))
96
97         if m.readyCb != nil {
98                 go m.readyCb(m.readyCbParams)
99         }
100
101         for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
102                 go m.Worker("worker-"+strconv.Itoa(w), 0)
103         }
104         m.Wait()
105 }
106
107 func (m *RMRClient) Worker(taskName string, msgSize int) {
108         p := viper.GetString("rmr.protPort")
109         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
110
111         defer m.wg.Done()
112         for {
113                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
114                 if rxBuffer == nil {
115                         m.UpdateStatCounter("ReceiveError")
116                         continue
117                 }
118                 m.UpdateStatCounter("Received")
119
120                 go m.parseMessage(rxBuffer)
121         }
122 }
123
124 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
125         if len(m.consumers) == 0 {
126                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
127                 return
128         }
129
130         params := &RMRParams{}
131         params.Mbuf = rxBuffer
132         params.Mtype = int(rxBuffer.mtype)
133         params.SubId = int(rxBuffer.sub_id)
134         params.Meid = &RMRMeid{}
135
136         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
137         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
138                 params.Meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
139                 params.Meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
140         }
141
142         xidBuf := make([]byte, int(C.RMR_MAX_XID))
143         if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
144                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
145         }
146
147         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
148         if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
149                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
150         }
151
152         for _, c := range m.consumers {
153                 cptr := unsafe.Pointer(rxBuffer.payload)
154                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
155                 params.PayloadLen = int(rxBuffer.len)
156
157                 err := c.Consume(params)
158                 if err != nil {
159                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
160                 }
161         }
162 }
163
164 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
165         buf := C.rmr_alloc_msg(m.context, 0)
166         if buf == nil {
167                 Logger.Error("rmrClient: Allocating message buffer failed!")
168         }
169
170         return buf
171 }
172
173 func (m *RMRClient) SendMsg(params *RMRParams) bool {
174         return m.Send(params, false)
175 }
176
177 func (m *RMRClient) SendRts(params *RMRParams) bool {
178         return m.Send(params, true)
179 }
180
181 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
182         buf := params.Mbuf
183         if buf == nil {
184                 buf = m.Allocate()
185         }
186
187         buf.mtype = C.int(params.Mtype)
188         buf.sub_id = C.int(params.SubId)
189         buf.len = C.int(len(params.Payload))
190         datap := C.CBytes(params.Payload)
191         defer C.free(datap)
192
193         if params != nil {
194                 if params.Meid != nil {
195                         b := make([]byte, int(C.RMR_MAX_MEID))
196                         copy(b, []byte(params.Meid.PlmnID))
197                         copy(b[16:], []byte(params.Meid.EnbID))
198                         C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
199                 }
200                 xidLen := len(params.Xid)
201                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
202                         b := make([]byte, int(C.RMR_MAX_MEID))
203                         copy(b, []byte(params.Xid))
204                         C.rmr_bytes2xact(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
205                 }
206         }
207         C.write_bytes_array(buf.payload, datap, buf.len)
208
209         return m.SendBuf(buf, isRts)
210 }
211
212 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
213         for i := 0; i < 10; i++ {
214                 txBuffer.state = 0
215                 if isRts {
216                         txBuffer = C.rmr_rts_msg(m.context, txBuffer)
217                 } else {
218                         txBuffer = C.rmr_send_msg(m.context, txBuffer)
219                 }
220
221                 if txBuffer == nil {
222                         break
223                 } else if txBuffer.state != C.RMR_OK {
224                         if txBuffer.state != C.RMR_ERR_RETRY {
225                                 time.Sleep(100 * time.Microsecond)
226                                 m.UpdateStatCounter("TransmitError")
227                         }
228                         for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
229                                 txBuffer = C.rmr_send_msg(m.context, txBuffer)
230                         }
231                 }
232
233                 if txBuffer.state == C.RMR_OK {
234                         m.UpdateStatCounter("Transmitted")
235                         return true
236                 }
237         }
238         m.UpdateStatCounter("TransmitError")
239         return false
240 }
241
242 func (m *RMRClient) UpdateStatCounter(name string) {
243         m.mux.Lock()
244         m.stat[name].Inc()
245         m.mux.Unlock()
246 }
247
248 func (m *RMRClient) RegisterMetrics() {
249         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
250 }
251
252 func (m *RMRClient) Wait() {
253         m.wg.Wait()
254 }
255
256 func (m *RMRClient) IsReady() bool {
257         return m.ready != 0
258 }
259
260 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
261         m.readyCb = cb
262         m.readyCbParams = params
263 }
264
265 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
266         id, ok := RICMessageTypes[name]
267         return id, ok
268 }
269
270 func (m *RMRClient) GetRicMessageName(id int) (s string) {
271         for k, v := range RICMessageTypes {
272                 if id == v {
273                         return k
274                 }
275         }
276         return
277 }
278
279 // To be removed ...
280 func (m *RMRClient) GetStat() (r RMRStatistics) {
281         return
282 }