Creating dymanic endpoints(Eps) and distributing routes to those endpoints on upgrad...
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     rmrpipe.go
25   Abstract: mangos (RMR) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <rmr/rmr.h>
33 */
34 import "C"
35
36 import (
37         "bytes"
38         "crypto/md5"
39         "errors"
40         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
41         "routing-manager/pkg/rtmgr"
42         "strconv"
43         "strings"
44         "fmt"
45 )
46
47 var rmrcallid = 1
48 var rmrdynamiccallid = 201
49
50 type RmrPush struct {
51         Sbi
52         rcChan chan *xapp.RMRParams
53 }
54
55 type EPStatus struct {
56         endpoint string
57         status   bool
58 }
59
60 type RMRParams struct {
61         *xapp.RMRParams
62 }
63
64 func (params *RMRParams) String() string {
65         var b bytes.Buffer
66         sum := md5.Sum(params.Payload)
67         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
68         return b.String()
69 }
70
71 func NewRmrPush() *RmrPush {
72         instance := new(RmrPush)
73         return instance
74 }
75
76 func (c *RmrPush) Initialize(ip string) error {
77         return nil
78 }
79
80 func (c *RmrPush) Terminate() error {
81         return nil
82 }
83
84 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
85
86         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
87         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
88         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
89         if ep.Whid < 0 {
90                 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
91         } else {
92                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
93         }
94
95         return nil
96 }
97
98 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
99         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100         xapp.Logger.Debug("args: %v", *ep)
101
102         xapp.Rmr.Closewh(ep.Whid)
103         return nil
104 }
105
106 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
107         c.updateEndpoints(rcs, c)
108 }
109
110 func (c *RmrPush) DistributeAll(policies *[]string) error {
111         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112         xapp.Logger.Debug("args: %v", *policies)
113
114         /*for _, ep := range rtmgr.Eps {
115                 go c.send(ep, policies)
116         }*/
117         channel := make(chan EPStatus)
118
119         if rmrcallid == 200 {
120                 rmrcallid = 1
121         }
122
123         for _, ep := range rtmgr.Eps {
124                 go c.send_sync(ep,  policies, channel, rmrcallid)
125         }
126         for rEp, id := range rtmgr.RmrEp {
127                 go c.send_rt_process(rEp,id,policies,rmrcallid)
128         }
129
130         rmrcallid++
131
132         count := 0
133         result := make([]EPStatus, len(rtmgr.Eps))
134         for i, _ := range result {
135                 result[i] = <-channel
136                 if result[i].status == true {
137                         count++
138                 } else {
139                         xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
140                 }
141         }
142
143         if count < len(rtmgr.Eps) {
144                 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
145         }
146
147
148         return nil
149 }
150
151 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
152         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
153
154         ret := c.send_data(ep, policies, call_id)
155
156         channel <- EPStatus{ep.Uuid, ret}
157
158 }
159
160 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
161         xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
162         var state int
163         var retstr string
164
165         var policy = []byte{}
166
167         for _, pe := range *policies {
168                 b:= []byte(pe)
169                 for j:=0; j<len(b); j++{
170                         policy = append(policy,b[j])
171                 }
172         }
173         params := &RMRParams{&xapp.RMRParams{}}
174         params.Mtype = 20
175         params.PayloadLen = len(policy)
176         params.Payload =[]byte(policy)
177         params.Mbuf = nil
178         params.Whid = ep.Whid
179         params.Callid = call_id
180         params.Timeout = 200
181         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
182         routestatus := strings.Split(retstr," ")
183         if state != C.RMR_OK && routestatus[0] == "OK" {
184               xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
185               return false
186         } else {
187                 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
188               return true
189         }
190
191         xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
192         return false
193 }
194
195 func (c *RmrPush) CreateEndpoint(payload string,rmrsrc string)*rtmgr.Endpoint  {
196         return c.createEndpoint(payload,rmrsrc, c)
197 }
198
199 func (c *RmrPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
200         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
201         xapp.Logger.Debug("args: %v", *policies)
202
203         if rmrdynamiccallid == 255 {
204                 rmrdynamiccallid = 201
205         }
206
207         go c.send_data(ep, policies,rmrdynamiccallid)
208         rmrdynamiccallid++
209
210         return nil
211 }
212
213 func (c *RmrPush) send_rt_process(ep string,whid int, policies *[]string, call_id int) bool {
214         xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
215         var state int
216         var retstr string
217
218         var policy = []byte{}
219
220         for _, pe := range *policies {
221                 b:= []byte(pe)
222                 for j:=0; j<len(b); j++{
223                         policy = append(policy,b[j])
224                 }
225         }
226         params := &RMRParams{&xapp.RMRParams{}}
227         params.Mtype = 20
228         params.PayloadLen = len(policy)
229         params.Payload =[]byte(policy)
230         params.Mbuf = nil
231         params.Whid = whid
232         params.Callid = call_id
233         params.Timeout = 200
234         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
235         routestatus := strings.Split(retstr," ")
236         if state != C.RMR_OK && routestatus[0] == "OK" {
237                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
238               return false
239         } else {
240                 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whi_id: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
241       return true
242         }
243
244         xapp.Logger.Error("Route Update to endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " xapp.Rmr.SendCallMsg not called")
245         return false
246 }
247