7ba43c552f29db6bc25cdd16845aeb1d4f588471
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     rmrpipe.go
25   Abstract: mangos (RMR) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <rmr/rmr.h>
33 */
34 import "C"
35
36 import (
37         "bytes"
38         "crypto/md5"
39         "errors"
40         "fmt"
41         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
42         "routing-manager/pkg/rtmgr"
43         "strconv"
44         "strings"
45         "sync"
46         "time"
47 )
48
49 var rmrcallid = 1
50 var rmrdynamiccallid = 201
51 var addendpointct = 1
52 var count int
53 var conn sync.Mutex
54
55 type RmrPush struct {
56         Sbi
57         rcChan chan *xapp.RMRParams
58 }
59
60 type EPStatus struct {
61         endpoint string
62         status   bool
63 }
64
65 type RMRParams struct {
66         *xapp.RMRParams
67 }
68
69 func (params *RMRParams) String() string {
70         var b bytes.Buffer
71         sum := md5.Sum(params.Payload)
72         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
73         return b.String()
74 }
75
76 func NewRmrPush() *RmrPush {
77         instance := new(RmrPush)
78         return instance
79 }
80
81 func (c *RmrPush) Initialize(ip string) error {
82         return nil
83 }
84
85 func (c *RmrPush) Terminate() error {
86         return nil
87 }
88
89 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
90         addendpointct = addendpointct + 1
91         count := addendpointct
92         xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
93         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
94         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
95         if ep.Whid < 0 {
96                 time.Sleep(time.Duration(10) * time.Second)
97                 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
98                 if ep.Whid < 0 {
99                         return errors.New("cannot open wormhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid) + " count: " + strconv.Itoa(count))
100                 }
101         } else {
102                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
103         }
104
105         return nil
106 }
107
108 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
109         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
110         xapp.Logger.Debug("args: %v", *ep)
111
112         xapp.Rmr.Closewh(ep.Whid)
113         return nil
114 }
115
116 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
117         c.updateEndpoints(rcs, c)
118 }
119
120 func (c *RmrPush) DistributeAll(policies *[]string) error {
121         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
122         xapp.Logger.Debug("args: %v", *policies)
123
124         /*for _, ep := range rtmgr.Eps {
125                 go c.send(ep, policies)
126         }*/
127         //channel := make(chan EPStatus)
128
129         if rmrcallid == 200 {
130                 rmrcallid = 1
131         }
132
133         for _, ep := range rtmgr.Eps {
134                 go c.send_sync(ep, policies, rmrcallid)
135         }
136
137         rmrcallid++
138
139         /*
140                                 count := 0
141                         result := make([]EPStatus, len(rtmgr.Eps))
142                         for i, _ := range result {
143                                 result[i] = <-channel
144                                 if result[i].status == true {
145                                         count++
146                                 } else {
147                                         xapp.Logger.Error("RMR sent failed for endpoint %v", result[i].endpoint)
148                                 }
149                         }
150
151                         if count < len(rtmgr.Eps) {
152                                 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
153                         }*/
154
155         return nil
156 }
157
158 //func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
159 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) {
160         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
161
162         ret := c.send_data(ep, policies, call_id)
163         for count = 0; count <= 2 && ret == false; count++ {
164                 time.Sleep(time.Second)
165                 ret := c.send_data(ep, policies, call_id)
166                 if ret == true {
167                         break
168                 }
169                 xapp.Logger.Error("Invoked  send_data to try again due to return value : %v", ret)
170         }
171
172         conn.Lock()
173         rtmgr.RMRConnStatus[ep.Uuid] = ret
174         conn.Unlock()
175         // Handling per connection .. may be updating global map
176
177         //channel <- EPStatus{ep.Uuid, ret}
178
179 }
180
181 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
182         xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
183         var state int
184         var retstr string
185
186         var policy = []byte{}
187
188         for _, pe := range *policies {
189                 b := []byte(pe)
190                 for j := 0; j < len(b); j++ {
191                         policy = append(policy, b[j])
192                 }
193         }
194         params := &RMRParams{&xapp.RMRParams{}}
195         params.Mtype = 20
196         params.PayloadLen = len(policy)
197         params.Payload = []byte(policy)
198         params.Mbuf = nil
199         params.Whid = ep.Whid
200         params.Callid = call_id
201         params.Timeout = 200
202         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
203         routestatus := strings.Split(retstr, " ")
204         if state != C.RMR_OK && routestatus[0] != "OK" {
205                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
206                 return false
207         } else {
208                 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
209                 return true
210         }
211 }
212
213 func (c *RmrPush) CheckEndpoint(payload string) (ep *rtmgr.Endpoint) {
214         return c.checkEndpoint(payload)
215 }
216
217 func (c *RmrPush) CreateEndpoint(rmrsrc string) (ep *string, whid int) {
218         return c.createEndpoint(rmrsrc)
219 }
220
221 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
222         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
223         xapp.Logger.Debug("args: %v", *policies)
224
225         if rmrdynamiccallid == 255 {
226                 rmrdynamiccallid = 201
227         }
228
229         go c.sendDynamicRoutes(ep, whid, policies, rmrdynamiccallid)
230         rmrdynamiccallid++
231
232         return nil
233 }
234
235 func (c *RmrPush) sendDynamicRoutes(ep string, whid int, policies *[]string, call_id int) bool {
236         xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
237         var state int
238         var retstr string
239
240         var policy = []byte{}
241
242         for _, pe := range *policies {
243                 b := []byte(pe)
244                 for j := 0; j < len(b); j++ {
245                         policy = append(policy, b[j])
246                 }
247         }
248         params := &RMRParams{&xapp.RMRParams{}}
249         params.Mtype = 20
250         params.PayloadLen = len(policy)
251         params.Payload = []byte(policy)
252         params.Mbuf = nil
253         params.Whid = whid
254         params.Callid = call_id
255         params.Timeout = 200
256         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
257         routestatus := strings.Split(retstr, " ")
258         if state != C.RMR_OK && routestatus[0] != "OK" {
259                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
260                 return false
261         } else {
262                 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
263                 return true
264         }
265 }