4b7f87102db3c81b7162cae97e5d6138bbd4cceb
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     rmrpipe.go
25   Abstract: mangos (RMR) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <rmr/rmr.h>
33 */
34 import "C"
35
36 import (
37         "bytes"
38         "crypto/md5"
39         "errors"
40         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
41         "routing-manager/pkg/rtmgr"
42         "strconv"
43         "strings"
44         "fmt"
45 )
46
47 var rmrcallid = 1
48 var rmrdynamiccallid = 201
49
50 type RmrPush struct {
51         Sbi
52         rcChan chan *xapp.RMRParams
53 }
54
55 type EPStatus struct {
56         endpoint string
57         status   bool
58 }
59
60 type RMRParams struct {
61         *xapp.RMRParams
62 }
63
64 func (params *RMRParams) String() string {
65         var b bytes.Buffer
66         sum := md5.Sum(params.Payload)
67         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
68         return b.String()
69 }
70
71 func NewRmrPush() *RmrPush {
72         instance := new(RmrPush)
73         return instance
74 }
75
76 func (c *RmrPush) Initialize(ip string) error {
77         return nil
78 }
79
80 func (c *RmrPush) Terminate() error {
81         return nil
82 }
83
84 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
85
86         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
87         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
88         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
89         if ep.Whid < 0 {
90                 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
91         } else {
92                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
93         }
94
95         return nil
96 }
97
98 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
99         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100         xapp.Logger.Debug("args: %v", *ep)
101
102         xapp.Rmr.Closewh(ep.Whid)
103         return nil
104 }
105
106 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
107         c.updateEndpoints(rcs, c)
108 }
109
110 func (c *RmrPush) DistributeAll(policies *[]string) error {
111         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112         xapp.Logger.Debug("args: %v", *policies)
113
114         /*for _, ep := range rtmgr.Eps {
115                 go c.send(ep, policies)
116         }*/
117         channel := make(chan EPStatus)
118
119         if rmrcallid == 200 {
120                 rmrcallid = 1
121         }
122
123         for _, ep := range rtmgr.Eps {
124                 go c.send_sync(ep,  policies, channel, rmrcallid)
125         }
126
127         rmrcallid++
128
129         count := 0
130         result := make([]EPStatus, len(rtmgr.Eps))
131         for i, _ := range result {
132                 result[i] = <-channel
133                 if result[i].status == true {
134                         count++
135                 } else {
136                         xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
137                 }
138         }
139
140         if count < len(rtmgr.Eps) {
141                 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
142         }
143
144
145         return nil
146 }
147
148 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
149         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
150
151         ret := c.send_data(ep, policies, call_id)
152
153         channel <- EPStatus{ep.Uuid, ret}
154
155 }
156
157 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
158         xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
159         var state int
160         var retstr string
161
162         var policy = []byte{}
163
164         for _, pe := range *policies {
165                 b:= []byte(pe)
166                 for j:=0; j<len(b); j++{
167                         policy = append(policy,b[j])
168                 }
169         }
170         params := &RMRParams{&xapp.RMRParams{}}
171         params.Mtype = 20
172         params.PayloadLen = len(policy)
173         params.Payload =[]byte(policy)
174         params.Mbuf = nil
175         params.Whid = ep.Whid
176         params.Callid = call_id
177         params.Timeout = 200
178         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
179         routestatus := strings.Split(retstr," ")
180         if state != C.RMR_OK && routestatus[0] != "OK" {
181               xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
182               return false
183         } else {
184                 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
185               return true
186         }
187 }
188
189 func (c *RmrPush) CheckEndpoint(payload string)(ep *rtmgr.Endpoint)  {
190         return c.checkEndpoint(payload)
191 }
192
193 func (c *RmrPush) CreateEndpoint(rmrsrc string)(ep *string,whid int)  {
194         return c.createEndpoint(rmrsrc)
195 }
196
197 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
198         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
199         xapp.Logger.Debug("args: %v", *policies)
200
201         if rmrdynamiccallid == 255 {
202                 rmrdynamiccallid = 201
203         }
204
205         go c.sendDynamicRoutes(ep, whid, policies,rmrdynamiccallid)
206         rmrdynamiccallid++
207
208         return nil
209 }
210
211 func (c *RmrPush) sendDynamicRoutes(ep string,whid int, policies *[]string, call_id int) bool {
212         xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
213         var state int
214         var retstr string
215
216         var policy = []byte{}
217
218         for _, pe := range *policies {
219                 b:= []byte(pe)
220                 for j:=0; j<len(b); j++{
221                         policy = append(policy,b[j])
222                 }
223         }
224         params := &RMRParams{&xapp.RMRParams{}}
225         params.Mtype = 20
226         params.PayloadLen = len(policy)
227         params.Payload =[]byte(policy)
228         params.Mbuf = nil
229         params.Whid = whid
230         params.Callid = call_id
231         params.Timeout = 200
232         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
233         routestatus := strings.Split(retstr," ")
234         if state != C.RMR_OK && routestatus[0] != "OK" {
235                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
236               return false
237         } else {
238                 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
239       return true
240         }
241 }
242