Correcting Dockerfile
[ric-plt/submgr.git] / tmp / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 /*
23 #include <time.h>
24 #include <stdlib.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <rmr/rmr.h>
28 #include <rmr/RIC_message_types.h>
29
30 void write_bytes_array(unsigned char *dst, void *data, int len) {
31     memcpy((void *)dst, (void *)data, len);
32 }
33
34 #cgo CFLAGS: -I../
35 #cgo LDFLAGS: -lrmr_nng -lnng
36 */
37 import "C"
38
39 import (
40         "github.com/spf13/viper"
41         "strconv"
42         "strings"
43         "time"
44         "unsafe"
45 )
46
47 type RMRMbuf C.rmr_mbuf_t
48
49 var RMRCounterOpts = []CounterOpts{
50         {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
51         {Name: "Received", Help: "The total number of received RMR messages"},
52         {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
53         {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
54 }
55
56 type RMRParams struct {
57         Mtype           int
58         Payload         []byte
59         PayloadLen      int
60         Meid            *RMRMeid
61         Xid             string
62         SubId           int
63         Src             string
64         Mbuf            *RMRMbuf
65 }
66
67 func NewRMRClient() *RMRClient {
68         p := C.CString(viper.GetString("rmr.protPort"))
69         m := C.int(viper.GetInt("rmr.maxSize"))
70         defer C.free(unsafe.Pointer(p))
71
72         ctx := C.rmr_init(p, m, C.int(0))
73         if ctx == nil {
74                 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
75         }
76
77         return &RMRClient{
78                 context:   ctx,
79                 consumers: make([]MessageConsumer, 0),
80                 stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
81         }
82 }
83
84 func (m *RMRClient) Start(c MessageConsumer) {
85         if c != nil {
86                 m.consumers = append(m.consumers, c)
87         }
88
89         for {
90                 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
91
92                 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
93                         break
94                 }
95                 time.Sleep(10 * time.Second)
96         }
97         m.wg.Add(viper.GetInt("rmr.numWorkers"))
98
99         if m.readyCb != nil {
100                 go m.readyCb(m.readyCbParams)
101         }
102
103         for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
104                 go m.Worker("worker-"+strconv.Itoa(w), 0)
105         }
106         m.Wait()
107 }
108
109 func (m *RMRClient) Worker(taskName string, msgSize int) {
110         p := viper.GetString("rmr.protPort")
111         Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
112
113         defer m.wg.Done()
114         for {
115                 rxBuffer := (*RMRMbuf)(C.rmr_rcv_msg(m.context, nil))
116                 if rxBuffer == nil {
117                         m.UpdateStatCounter("ReceiveError")
118                         continue
119                 }
120                 m.UpdateStatCounter("Received")
121
122                 go m.parseMessage(rxBuffer)
123         }
124 }
125
126 func (m *RMRClient) parseMessage(rxBuffer *RMRMbuf) {
127         if len(m.consumers) == 0 {
128                 Logger.Info("rmrClient: No message handlers defined, message discarded!")
129                 return
130         }
131
132         params := &RMRParams{}
133         params.Mbuf = rxBuffer
134         params.Mtype = int(rxBuffer.mtype)
135         params.SubId = int(rxBuffer.sub_id)
136         params.Meid = &RMRMeid{}
137
138         meidBuf := make([]byte, int(C.RMR_MAX_MEID))
139         if meidCstr := C.rmr_get_meid((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
140                 params.Meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
141                 params.Meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
142         }
143
144         xidBuf := make([]byte, int(C.RMR_MAX_XID))
145         if xidCstr := C.rmr_get_xact((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
146                 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
147         }
148
149         srcBuf := make([]byte, int(C.RMR_MAX_SRC))
150         if srcStr := C.rmr_get_src((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
151                 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
152         }
153
154         for _, c := range m.consumers {
155                 cptr := unsafe.Pointer(rxBuffer.payload)
156                 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
157                 params.PayloadLen = int(rxBuffer.len)
158
159                 err := c.Consume(params)
160                 if err != nil {
161                         Logger.Warn("rmrClient: Consumer returned error: %v", err)
162                 }
163         }
164 }
165
166 func (m *RMRClient) Allocate() *RMRMbuf {
167         buf := C.rmr_alloc_msg(m.context, 0)
168         if buf == nil {
169                 Logger.Error("rmrClient: Allocating message buffer failed!")
170         }
171
172         return (*RMRMbuf)(buf)
173 }
174
175 func (m *RMRClient) SendMsg(params *RMRParams) bool {
176         return m.Send(params, false)
177 }
178
179 func (m *RMRClient) SendRts(params *RMRParams) bool {
180         return m.Send(params, true)
181 }
182
183 func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
184         buf := params.Mbuf
185         if buf == nil {
186                 buf = m.Allocate()
187         }
188
189         buf.mtype = C.int(params.Mtype)
190         buf.sub_id = C.int(params.SubId)
191         buf.len = C.int(len(params.Payload))
192         datap := C.CBytes(params.Payload)
193         defer C.free(datap)
194
195         if params != nil {
196                 if params.Meid != nil {
197                         b := make([]byte, int(C.RMR_MAX_MEID))
198                         copy(b, []byte(params.Meid.PlmnID))
199                         copy(b[16:], []byte(params.Meid.EnbID))
200                         C.rmr_bytes2meid((*C.rmr_mbuf_t)(buf), (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
201                 }
202                 xidLen := len(params.Xid)
203                 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
204                         b := make([]byte, int(C.RMR_MAX_MEID))
205                         copy(b, []byte(params.Xid))
206                         C.rmr_bytes2xact((*C.rmr_mbuf_t)(buf), (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
207                 }
208         }
209         C.write_bytes_array(buf.payload, datap, buf.len)
210
211         return m.SendBuf(buf, isRts)
212 }
213
214 func (m *RMRClient) SendBuf(txBuffer *RMRMbuf, isRts bool) bool {
215         for i := 0; i < 10; i++ {
216                 txBuffer.state = 0
217                 if isRts {
218                         txBuffer = (*RMRMbuf)(C.rmr_rts_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
219                 } else {
220                         txBuffer = (*RMRMbuf)(C.rmr_send_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
221                 }
222
223                 if txBuffer == nil {
224                         break
225                 } else if txBuffer.state != C.RMR_OK {
226                         if txBuffer.state != C.RMR_ERR_RETRY {
227                                 time.Sleep(100 * time.Microsecond)
228                                 m.UpdateStatCounter("TransmitError")
229                         }
230                         for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
231                                 txBuffer = (*RMRMbuf)(C.rmr_send_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
232                         }
233                 }
234
235                 if txBuffer.state == C.RMR_OK {
236                         m.UpdateStatCounter("Transmitted")
237                         return true
238                 }
239         }
240         m.UpdateStatCounter("TransmitError")
241         return false
242 }
243
244 func (m *RMRClient) UpdateStatCounter(name string) {
245         m.mux.Lock()
246         m.stat[name].Inc()
247         m.mux.Unlock()
248 }
249
250 func (m *RMRClient) RegisterMetrics() {
251         m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
252 }
253
254 func (m *RMRClient) Wait() {
255         m.wg.Wait()
256 }
257
258 func (m *RMRClient) IsReady() bool {
259         return m.ready != 0
260 }
261
262 func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
263         m.readyCb = cb
264         m.readyCbParams = params
265 }
266
267 func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
268         id, ok := RICMessageTypes[name]
269         return id, ok
270 }
271
272 func (m *RMRClient) GetRicMessageName(id int) (s string) {
273         for k, v := range RICMessageTypes {
274                 if id == v {
275                         return k
276                 }
277         }
278         return
279 }
280
281 // To be removed ...
282 func (m *RMRClient) GetStat() (r RMRStatistics) {
283         return
284 }