Add support for MEID
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "github.com/spf13/viper"
41         "strconv"
42         "strings"
43         "time"
44         "unsafe"
45 )
46
47 var RMRCounterOpts = []CounterOpts{
48         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49         {Name: "Received", Help: "The total number of received RMR messages"},
50         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
52 }
53
54 func NewRMRClient() *RMRClient {
55         p := C.CString(viper.GetString("rmr.protPort"))
56         m := C.int(viper.GetInt("rmr.maxSize"))
57         defer C.free(unsafe.Pointer(p))
58
59         ctx := C.rmr_init(p, m, C.int(0))
60         if ctx == nil {
61                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
62         }
63
64         return &RMRClient{
65                 context:   ctx,
66                 consumers: make([]MessageConsumer, 0),
67                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
68         }
69 }
70
71 func (m *RMRClient) Start(c MessageConsumer) {
72         if c != nil {
73                 m.consumers = append(m.consumers, c)
74         }
75
76         for {
77                 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
78
79                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
80                         break
81                 }
82                 time.Sleep(10 * time.Second)
83         }
84         m.wg.Add(viper.GetInt("rmr.numWorkers"))
85
86         if m.readyCb != nil {
87                 go m.readyCb(m.readyCbParams)
88         }
89
90         for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
91                 go m.Worker("worker-"+strconv.Itoa(w), 0)
92         }
93         m.Wait()
94 }
95
96 func (m *RMRClient) Worker(taskName string, msgSize int) {
97         p := viper.GetString("rmr.protPort")
98         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
99
100         defer m.wg.Done()
101         for {
102                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
103                 if rxBuffer == nil {
104                         m.UpdateStatCounter("ReceiveError")
105                         continue
106                 }
107                 m.UpdateStatCounter("Received")
108
109                 go m.parseMessage(rxBuffer)
110         }
111 }
112
113 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
114         if len(m.consumers) == 0 {
115                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
116                 return
117         }
118
119         meid := &RMRMeid{}
120         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
121         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
122                 meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
123                 meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
124         }
125
126         for _, c := range m.consumers {
127                 cptr := unsafe.Pointer(rxBuffer.payload)
128                 payload := C.GoBytes(cptr, C.int(rxBuffer.len))
129
130                 err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), payload, meid)
131                 if err != nil {
132                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
133                 }
134         }
135 }
136
137 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
138         buf := C.rmr_alloc_msg(m.context, 0)
139         if buf == nil {
140                 Logger.Error("rmrClient: Allocating message buffer failed!")
141         }
142
143         return buf
144 }
145
146 func (m *RMRClient) Send(mtype int, sid int, payload []byte, meid *RMRMeid) bool {
147         buf := m.Allocate()
148
149         buf.mtype = C.int(mtype)
150         buf.sub_id = C.int(sid)
151         buf.len = C.int(len(payload))
152         datap := C.CBytes(payload)
153         defer C.free(datap)
154
155         if meid != nil {
156                 b := make([]byte, int(C.RMR_MAX_MEID))
157                 copy(b, []byte(meid.PlmnID))
158                 copy(b[16:], []byte(meid.EnbID))
159                 C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
160         }
161         C.write_bytes_array(buf.payload, datap, buf.len)
162
163         return m.SendBuf(buf)
164 }
165
166 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool {
167         for i := 0; i < 10; i++ {
168                 txBuffer.state = 0
169                 txBuffer := C.rmr_send_msg(m.context, txBuffer)
170                 if txBuffer == nil {
171                         break
172                 } else if txBuffer.state != C.RMR_OK {
173                         if txBuffer.state != C.RMR_ERR_RETRY {
174                                 time.Sleep(100 * time.Microsecond)
175                                 m.UpdateStatCounter("TransmitError")
176                         }
177                         for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
178                                 txBuffer = C.rmr_send_msg(m.context, txBuffer)
179                         }
180                 }
181
182                 if txBuffer.state == C.RMR_OK {
183                         m.UpdateStatCounter("Transmitted")
184                         return true
185                 }
186         }
187         m.UpdateStatCounter("TransmitError")
188         return false
189 }
190
191 func (m *RMRClient) UpdateStatCounter(name string) {
192         m.mux.Lock()
193         m.stat[name].Inc()
194         m.mux.Unlock()
195 }
196
197 func (m *RMRClient) RegisterMetrics() {
198         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
199 }
200
201 func (m *RMRClient) Wait() {
202         m.wg.Wait()
203 }
204
205 func (m *RMRClient) IsReady() bool {
206         return m.ready != 0
207 }
208
209 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
210         m.readyCb = cb
211         m.readyCbParams = params
212 }
213
214 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
215         id, ok := RICMessageTypes[name]
216         return id, ok
217 }
218
219 func (m *RMRClient) GetRicMessageName(id int) (s string) {
220         for k, v := range RICMessageTypes {
221                 if id == v {
222                         return k
223                 }
224         }
225         return
226 }
227
228 // To be removed ...
229 func (m *RMRClient) GetStat() (r RMRStatistics) {
230         return
231 }