Upgraded to RMR 4.7.4 and some improvements
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     rmrpipe.go
25   Abstract: mangos (RMR) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <rmr/rmr.h>
33 */
34 import "C"
35
36 import (
37         "bytes"
38         "crypto/md5"
39         "errors"
40         "fmt"
41         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
42         "routing-manager/pkg/rtmgr"
43         "strconv"
44         "strings"
45         "sync"
46         "time"
47 )
48
49 var rmrcallid = 1
50 var rmrdynamiccallid = 201
51 var addendpointct = 1
52
53 var conn sync.Mutex
54
55 type RmrPush struct {
56         Sbi
57         rcChan chan *xapp.RMRParams
58 }
59
60 type EPStatus struct {
61         endpoint string
62         status   bool
63 }
64
65 type RMRParams struct {
66         *xapp.RMRParams
67 }
68
69 func (params *RMRParams) String() string {
70         var b bytes.Buffer
71         sum := md5.Sum(params.Payload)
72         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
73         return b.String()
74 }
75
76 func NewRmrPush() *RmrPush {
77         instance := new(RmrPush)
78         return instance
79 }
80
81 func (c *RmrPush) Initialize(ip string) error {
82         return nil
83 }
84
85 func (c *RmrPush) Terminate() error {
86         return nil
87 }
88
89 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
90         addendpointct = addendpointct + 1
91         count := addendpointct
92         xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
93         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
94         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
95         if ep.Whid < 0 {
96                 time.Sleep(time.Duration(10) * time.Second)
97                 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
98                 if ep.Whid < 0 {
99                         return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid) + " count: " + strconv.Itoa(count))
100                 }
101         } else {
102                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
103         }
104
105         return nil
106 }
107
108 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
109         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
110         xapp.Logger.Debug("args: %v", *ep)
111
112         xapp.Rmr.Closewh(ep.Whid)
113         return nil
114 }
115
116 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
117         c.updateEndpoints(rcs, c)
118 }
119
120 func (c *RmrPush) DistributeAll(policies *[]string) error {
121         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
122         xapp.Logger.Debug("args: %v", *policies)
123
124         /*for _, ep := range rtmgr.Eps {
125                 go c.send(ep, policies)
126         }*/
127         //channel := make(chan EPStatus)
128
129         if rmrcallid == 200 {
130                 rmrcallid = 1
131         }
132
133         for _, ep := range rtmgr.Eps {
134                 go c.send_sync(ep, policies, rmrcallid)
135         }
136
137         rmrcallid++
138
139         /*
140                                 count := 0
141                         result := make([]EPStatus, len(rtmgr.Eps))
142                         for i, _ := range result {
143                                 result[i] = <-channel
144                                 if result[i].status == true {
145                                         count++
146                                 } else {
147                                         xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
148                                 }
149                         }
150
151                         if count < len(rtmgr.Eps) {
152                                 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
153                         }*/
154
155         return nil
156 }
157
158 //func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
159 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) {
160         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
161
162         ret := c.send_data(ep, policies, call_id)
163         conn.Lock()
164         rtmgr.RMRConnStatus[ep.Uuid] = ret
165         conn.Unlock()
166         // Handling per connection .. may be updating global map
167
168         //channel <- EPStatus{ep.Uuid, ret}
169
170 }
171
172 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
173         xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
174         var state int
175         var retstr string
176
177         var policy = []byte{}
178
179         for _, pe := range *policies {
180                 b := []byte(pe)
181                 for j := 0; j < len(b); j++ {
182                         policy = append(policy, b[j])
183                 }
184         }
185         params := &RMRParams{&xapp.RMRParams{}}
186         params.Mtype = 20
187         params.PayloadLen = len(policy)
188         params.Payload = []byte(policy)
189         params.Mbuf = nil
190         params.Whid = ep.Whid
191         params.Callid = call_id
192         params.Timeout = 200
193         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
194         routestatus := strings.Split(retstr, " ")
195         if state != C.RMR_OK && routestatus[0] != "OK" {
196                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
197                 return false
198         } else {
199                 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
200                 return true
201         }
202 }
203
204 func (c *RmrPush) CheckEndpoint(payload string) (ep *rtmgr.Endpoint) {
205         return c.checkEndpoint(payload)
206 }
207
208 func (c *RmrPush) CreateEndpoint(rmrsrc string) (ep *string, whid int) {
209         return c.createEndpoint(rmrsrc)
210 }
211
212 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
213         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
214         xapp.Logger.Debug("args: %v", *policies)
215
216         if rmrdynamiccallid == 255 {
217                 rmrdynamiccallid = 201
218         }
219
220         go c.sendDynamicRoutes(ep, whid, policies, rmrdynamiccallid)
221         rmrdynamiccallid++
222
223         return nil
224 }
225
226 func (c *RmrPush) sendDynamicRoutes(ep string, whid int, policies *[]string, call_id int) bool {
227         xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
228         var state int
229         var retstr string
230
231         var policy = []byte{}
232
233         for _, pe := range *policies {
234                 b := []byte(pe)
235                 for j := 0; j < len(b); j++ {
236                         policy = append(policy, b[j])
237                 }
238         }
239         params := &RMRParams{&xapp.RMRParams{}}
240         params.Mtype = 20
241         params.PayloadLen = len(policy)
242         params.Payload = []byte(policy)
243         params.Mbuf = nil
244         params.Whid = whid
245         params.Callid = call_id
246         params.Timeout = 200
247         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
248         routestatus := strings.Split(retstr, " ")
249         if state != C.RMR_OK && routestatus[0] != "OK" {
250                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
251                 return false
252         } else {
253                 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
254                 return true
255         }
256 }