2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (RMR) Pipeline SBI implementation
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
42 "routing-manager/pkg/rtmgr"
50 var rmrdynamiccallid = 201
57 rcChan chan *xapp.RMRParams
60 type EPStatus struct {
65 type RMRParams struct {
69 func (params *RMRParams) String() string {
71 sum := md5.Sum(params.Payload)
72 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
76 func NewRmrPush() *RmrPush {
77 instance := new(RmrPush)
81 func (c *RmrPush) Initialize(ip string) error {
85 func (c *RmrPush) Terminate() error {
89 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
90 count := addendpointct + 1
91 xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
92 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
93 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
95 time.Sleep(time.Duration(10) * time.Second)
96 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
98 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid) + " count: " + strconv.Itoa(count))
101 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
107 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
108 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
109 xapp.Logger.Debug("args: %v", *ep)
111 xapp.Rmr.Closewh(ep.Whid)
115 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
116 c.updateEndpoints(rcs, c)
119 func (c *RmrPush) DistributeAll(policies *[]string) error {
120 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
121 xapp.Logger.Debug("args: %v", *policies)
123 /*for _, ep := range rtmgr.Eps {
124 go c.send(ep, policies)
126 //channel := make(chan EPStatus)
128 if rmrcallid == 200 {
132 for _, ep := range rtmgr.Eps {
133 go c.send_sync(ep, policies, rmrcallid)
140 result := make([]EPStatus, len(rtmgr.Eps))
141 for i, _ := range result {
142 result[i] = <-channel
143 if result[i].status == true {
146 xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
150 if count < len(rtmgr.Eps) {
151 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
157 //func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
158 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) {
159 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
161 ret := c.send_data(ep, policies, call_id)
162 xapp.Logger.Debug("return value is %v", ret)
164 rtmgr.RMRConnStatus[ep.Uuid] = ret
166 // Handling per connection .. may be updating global map
168 //channel <- EPStatus{ep.Uuid, ret}
172 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
173 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
177 var policy = []byte{}
179 for _, pe := range *policies {
181 for j := 0; j < len(b); j++ {
182 policy = append(policy, b[j])
185 params := &RMRParams{&xapp.RMRParams{}}
187 params.PayloadLen = len(policy)
188 params.Payload = []byte(policy)
190 params.Whid = ep.Whid
191 params.Callid = call_id
193 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
194 routestatus := strings.Split(retstr, " ")
195 if state != C.RMR_OK && routestatus[0] != "OK" {
196 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
199 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
204 func (c *RmrPush) CheckEndpoint(payload string) (ep *rtmgr.Endpoint) {
205 return c.checkEndpoint(payload)
208 func (c *RmrPush) CreateEndpoint(rmrsrc string) (ep *string, whid int) {
209 return c.createEndpoint(rmrsrc)
212 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
213 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
214 xapp.Logger.Debug("args: %v", *policies)
216 if rmrdynamiccallid == 255 {
217 rmrdynamiccallid = 201
220 go c.sendDynamicRoutes(ep, whid, policies, rmrdynamiccallid)
226 func (c *RmrPush) sendDynamicRoutes(ep string, whid int, policies *[]string, call_id int) bool {
227 xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
231 var policy = []byte{}
233 for _, pe := range *policies {
235 for j := 0; j < len(b); j++ {
236 policy = append(policy, b[j])
239 params := &RMRParams{&xapp.RMRParams{}}
241 params.PayloadLen = len(policy)
242 params.Payload = []byte(policy)
245 params.Callid = call_id
247 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
248 routestatus := strings.Split(retstr, " ")
249 if state != C.RMR_OK && routestatus[0] != "OK" {
250 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
253 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))