c2ba4b106df7c8d5ae623ccdaa80a2f79589718d
[ric-plt/xapp-frame.git] / pkg / xapp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "github.com/spf13/viper"
41         "strconv"
42         "strings"
43         "time"
44         "unsafe"
45 )
46
47 var RMRCounterOpts = []CounterOpts{
48         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49         {Name: "Received", Help: "The total number of received RMR messages"},
50         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
52 }
53
54 func NewRMRClient() *RMRClient {
55         p := C.CString(viper.GetString("rmr.protPort"))
56         m := C.int(viper.GetInt("rmr.maxSize"))
57         defer C.free(unsafe.Pointer(p))
58
59         ctx := C.rmr_init(p, m, C.int(0))
60         if ctx == nil {
61                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
62         }
63
64         return &RMRClient{
65                 context:   ctx,
66                 consumers: make([]MessageConsumer, 0),
67                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
68         }
69 }
70
71 func (m *RMRClient) Start(c MessageConsumer) {
72         if c != nil {
73                 m.consumers = append(m.consumers, c)
74         }
75
76         for {
77                 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
78
79                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
80                         break
81                 }
82                 time.Sleep(10 * time.Second)
83         }
84         m.wg.Add(viper.GetInt("rmr.numWorkers"))
85
86         if m.readyCb != nil {
87                 go m.readyCb(m.readyCbParams)
88         }
89
90         for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
91                 go m.Worker("worker-"+strconv.Itoa(w), 0)
92         }
93         m.Wait()
94 }
95
96 func (m *RMRClient) Worker(taskName string, msgSize int) {
97         p := viper.GetString("rmr.protPort")
98         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
99
100         defer m.wg.Done()
101         for {
102                 rxBuffer := C.rmr_rcv_msg(m.context, nil)
103                 if rxBuffer == nil {
104                         m.UpdateStatCounter("ReceiveError")
105                         continue
106                 }
107                 m.UpdateStatCounter("Received")
108
109                 go m.parseMessage(rxBuffer)
110         }
111 }
112
113 func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
114         if len(m.consumers) == 0 {
115                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
116                 return
117         }
118
119         meid := &RMRMeid{}
120         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
121         if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
122                 meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
123                 meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
124         }
125
126         for _, c := range m.consumers {
127                 cptr := unsafe.Pointer(rxBuffer.payload)
128                 payload := C.GoBytes(cptr, C.int(rxBuffer.len))
129
130                 err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), payload, meid)
131                 if err != nil {
132                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
133                 }
134         }
135 }
136
137 func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
138         buf := C.rmr_alloc_msg(m.context, 0)
139         if buf == nil {
140                 Logger.Error("rmrClient: Allocating message buffer failed!")
141         }
142
143         return buf
144 }
145
146 func (m *RMRClient) Send(mtype int, sid int, payload []byte, meid *RMRMeid) bool {
147         return m.SendMsg(mtype, sid, meid, payload, len(payload))
148 }
149
150 func (m *RMRClient) SendMsg(mtype int, sid int, meid *RMRMeid, payload []byte, payloadLen int) bool {
151         buf := m.Allocate()
152
153         buf.mtype = C.int(mtype)
154         buf.sub_id = C.int(sid)
155         buf.len = C.int(payloadLen)
156         datap := C.CBytes(payload)
157         defer C.free(datap)
158
159         if meid != nil {
160                 b := make([]byte, int(C.RMR_MAX_MEID))
161                 copy(b, []byte(meid.PlmnID))
162                 copy(b[16:], []byte(meid.EnbID))
163                 C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
164         }
165         C.write_bytes_array(buf.payload, datap, buf.len)
166
167         return m.SendBuf(buf)
168 }
169
170 func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool {
171         for i := 0; i < 10; i++ {
172                 txBuffer.state = 0
173                 txBuffer := C.rmr_send_msg(m.context, txBuffer)
174                 if txBuffer == nil {
175                         break
176                 } else if txBuffer.state != C.RMR_OK {
177                         if txBuffer.state != C.RMR_ERR_RETRY {
178                                 time.Sleep(100 * time.Microsecond)
179                                 m.UpdateStatCounter("TransmitError")
180                         }
181                         for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
182                                 txBuffer = C.rmr_send_msg(m.context, txBuffer)
183                         }
184                 }
185
186                 if txBuffer.state == C.RMR_OK {
187                         m.UpdateStatCounter("Transmitted")
188                         return true
189                 }
190         }
191         m.UpdateStatCounter("TransmitError")
192         return false
193 }
194
195 func (m *RMRClient) UpdateStatCounter(name string) {
196         m.mux.Lock()
197         m.stat[name].Inc()
198         m.mux.Unlock()
199 }
200
201 func (m *RMRClient) RegisterMetrics() {
202         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
203 }
204
205 func (m *RMRClient) Wait() {
206         m.wg.Wait()
207 }
208
209 func (m *RMRClient) IsReady() bool {
210         return m.ready != 0
211 }
212
213 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
214         m.readyCb = cb
215         m.readyCbParams = params
216 }
217
218 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
219         id, ok := RICMessageTypes[name]
220         return id, ok
221 }
222
223 func (m *RMRClient) GetRicMessageName(id int) (s string) {
224         for k, v := range RICMessageTypes {
225                 if id == v {
226                         return k
227                 }
228         }
229         return
230 }
231
232 // To be removed ...
233 func (m *RMRClient) GetStat() (r RMRStatistics) {
234         return
235 }