2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (RMR) Pipeline SBI implementation
40 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
41 "routing-manager/pkg/rtmgr"
48 var rmrdynamiccallid = 201
52 rcChan chan *xapp.RMRParams
55 type EPStatus struct {
60 type RMRParams struct {
64 func (params *RMRParams) String() string {
66 sum := md5.Sum(params.Payload)
67 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
71 func NewRmrPush() *RmrPush {
72 instance := new(RmrPush)
76 func (c *RmrPush) Initialize(ip string) error {
80 func (c *RmrPush) Terminate() error {
84 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
86 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
87 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
88 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
90 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
92 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
98 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
99 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100 xapp.Logger.Debug("args: %v", *ep)
102 xapp.Rmr.Closewh(ep.Whid)
106 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
107 c.updateEndpoints(rcs, c)
110 func (c *RmrPush) DistributeAll(policies *[]string) error {
111 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112 xapp.Logger.Debug("args: %v", *policies)
114 /*for _, ep := range rtmgr.Eps {
115 go c.send(ep, policies)
117 channel := make(chan EPStatus)
119 if rmrcallid == 200 {
123 for _, ep := range rtmgr.Eps {
124 go c.send_sync(ep, policies, channel, rmrcallid)
130 result := make([]EPStatus, len(rtmgr.Eps))
131 for i, _ := range result {
132 result[i] = <-channel
133 if result[i].status == true {
136 xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
140 if count < len(rtmgr.Eps) {
141 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
148 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
149 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
151 ret := c.send_data(ep, policies, call_id)
153 channel <- EPStatus{ep.Uuid, ret}
157 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
158 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
162 var policy = []byte{}
164 for _, pe := range *policies {
166 for j:=0; j<len(b); j++{
167 policy = append(policy,b[j])
170 params := &RMRParams{&xapp.RMRParams{}}
172 params.PayloadLen = len(policy)
173 params.Payload =[]byte(policy)
175 params.Whid = ep.Whid
176 params.Callid = call_id
178 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
179 routestatus := strings.Split(retstr," ")
180 if state != C.RMR_OK && routestatus[0] == "OK" {
181 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
184 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
188 xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
192 func (c *RmrPush) CreateEndpoint(payload string,rmrsrc string)(ep *string,whid int) {
193 return c.createEndpoint(payload,rmrsrc, c)
196 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
197 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
198 xapp.Logger.Debug("args: %v", *policies)
200 if rmrdynamiccallid == 255 {
201 rmrdynamiccallid = 201
204 go c.sendDynamicRoutes(ep, whid, policies,rmrdynamiccallid)
210 func (c *RmrPush) sendDynamicRoutes(ep string,whid int, policies *[]string, call_id int) bool {
211 xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
215 var policy = []byte{}
217 for _, pe := range *policies {
219 for j:=0; j<len(b); j++{
220 policy = append(policy,b[j])
223 params := &RMRParams{&xapp.RMRParams{}}
225 params.PayloadLen = len(policy)
226 params.Payload =[]byte(policy)
229 params.Callid = call_id
231 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
232 routestatus := strings.Split(retstr," ")
233 if state != C.RMR_OK && routestatus[0] == "OK" {
234 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
237 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whi_id: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
241 xapp.Logger.Error("Route Update to endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " xapp.Rmr.SendCallMsg not called")