16545cb27848d0d068826890a5cf926723c3c489
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     rmrpipe.go
25   Abstract: mangos (RMR) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <rmr/rmr.h>
33 */
34 import "C"
35
36 import (
37         "bytes"
38         "crypto/md5"
39         "errors"
40         "fmt"
41         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
42         "routing-manager/pkg/rtmgr"
43         "strconv"
44         "strings"
45         "sync"
46         "time"
47 )
48
49 var rmrcallid = 1
50 var rmrdynamiccallid = 201
51 var addendpointct = 1
52
53 var conn sync.Mutex
54
55 type RmrPush struct {
56         Sbi
57         rcChan chan *xapp.RMRParams
58 }
59
60 type EPStatus struct {
61         endpoint string
62         status   bool
63 }
64
65 type RMRParams struct {
66         *xapp.RMRParams
67 }
68
69 func (params *RMRParams) String() string {
70         var b bytes.Buffer
71         sum := md5.Sum(params.Payload)
72         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
73         return b.String()
74 }
75
76 func NewRmrPush() *RmrPush {
77         instance := new(RmrPush)
78         return instance
79 }
80
81 func (c *RmrPush) Initialize(ip string) error {
82         return nil
83 }
84
85 func (c *RmrPush) Terminate() error {
86         return nil
87 }
88
89 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
90         count := addendpointct + 1
91         xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
92         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
93         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
94         if ep.Whid < 0 {
95                 time.Sleep(time.Duration(10) * time.Second)
96                 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
97                 if ep.Whid < 0 {
98                         return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid) + " count: " + strconv.Itoa(count))
99                 }
100         } else {
101                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
102         }
103
104         return nil
105 }
106
107 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
108         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
109         xapp.Logger.Debug("args: %v", *ep)
110
111         xapp.Rmr.Closewh(ep.Whid)
112         return nil
113 }
114
115 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
116         c.updateEndpoints(rcs, c)
117 }
118
119 func (c *RmrPush) DistributeAll(policies *[]string) error {
120         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
121         xapp.Logger.Debug("args: %v", *policies)
122
123         /*for _, ep := range rtmgr.Eps {
124                 go c.send(ep, policies)
125         }*/
126         //channel := make(chan EPStatus)
127
128         if rmrcallid == 200 {
129                 rmrcallid = 1
130         }
131
132         for _, ep := range rtmgr.Eps {
133                 go c.send_sync(ep, policies, rmrcallid)
134         }
135
136         rmrcallid++
137
138         /*
139                                 count := 0
140                         result := make([]EPStatus, len(rtmgr.Eps))
141                         for i, _ := range result {
142                                 result[i] = <-channel
143                                 if result[i].status == true {
144                                         count++
145                                 } else {
146                                         xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
147                                 }
148                         }
149
150                         if count < len(rtmgr.Eps) {
151                                 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
152                         }*/
153
154         return nil
155 }
156
157 //func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
158 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) {
159         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
160
161         ret := c.send_data(ep, policies, call_id)
162         xapp.Logger.Debug("return value is %v", ret)
163         conn.Lock()
164         rtmgr.RMRConnStatus[ep.Uuid] = ret
165         conn.Unlock()
166         // Handling per connection .. may be updating global map
167
168         //channel <- EPStatus{ep.Uuid, ret}
169
170 }
171
172 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
173         xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
174         var state int
175         var retstr string
176
177         var policy = []byte{}
178
179         for _, pe := range *policies {
180                 b := []byte(pe)
181                 for j := 0; j < len(b); j++ {
182                         policy = append(policy, b[j])
183                 }
184         }
185         params := &RMRParams{&xapp.RMRParams{}}
186         params.Mtype = 20
187         params.PayloadLen = len(policy)
188         params.Payload = []byte(policy)
189         params.Mbuf = nil
190         params.Whid = ep.Whid
191         params.Callid = call_id
192         params.Timeout = 200
193         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
194         routestatus := strings.Split(retstr, " ")
195         if state != C.RMR_OK && routestatus[0] != "OK" {
196                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
197                 return false
198         } else {
199                 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
200                 return true
201         }
202 }
203
204 func (c *RmrPush) CheckEndpoint(payload string) (ep *rtmgr.Endpoint) {
205         return c.checkEndpoint(payload)
206 }
207
208 func (c *RmrPush) CreateEndpoint(rmrsrc string) (ep *string, whid int) {
209         return c.createEndpoint(rmrsrc)
210 }
211
212 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
213         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
214         xapp.Logger.Debug("args: %v", *policies)
215
216         if rmrdynamiccallid == 255 {
217                 rmrdynamiccallid = 201
218         }
219
220         go c.sendDynamicRoutes(ep, whid, policies, rmrdynamiccallid)
221         rmrdynamiccallid++
222
223         return nil
224 }
225
226 func (c *RmrPush) sendDynamicRoutes(ep string, whid int, policies *[]string, call_id int) bool {
227         xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
228         var state int
229         var retstr string
230
231         var policy = []byte{}
232
233         for _, pe := range *policies {
234                 b := []byte(pe)
235                 for j := 0; j < len(b); j++ {
236                         policy = append(policy, b[j])
237                 }
238         }
239         params := &RMRParams{&xapp.RMRParams{}}
240         params.Mtype = 20
241         params.PayloadLen = len(policy)
242         params.Payload = []byte(policy)
243         params.Mbuf = nil
244         params.Whid = whid
245         params.Callid = call_id
246         params.Timeout = 200
247         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
248         routestatus := strings.Split(retstr, " ")
249         if state != C.RMR_OK && routestatus[0] != "OK" {
250                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
251                 return false
252         } else {
253                 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
254                 return true
255         }
256 }