2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (RMR) Pipeline SBI implementation
40 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
41 "routing-manager/pkg/rtmgr"
48 var rmrdynamiccallid = 201
52 rcChan chan *xapp.RMRParams
55 type EPStatus struct {
60 type RMRParams struct {
64 func (params *RMRParams) String() string {
66 sum := md5.Sum(params.Payload)
67 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
71 func NewRmrPush() *RmrPush {
72 instance := new(RmrPush)
76 func (c *RmrPush) Initialize(ip string) error {
80 func (c *RmrPush) Terminate() error {
84 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
86 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
87 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
88 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
90 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
92 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
98 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
99 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100 xapp.Logger.Debug("args: %v", *ep)
102 xapp.Rmr.Closewh(ep.Whid)
106 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
107 c.updateEndpoints(rcs, c)
110 func (c *RmrPush) DistributeAll(policies *[]string) error {
111 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112 xapp.Logger.Debug("args: %v", *policies)
114 /*for _, ep := range rtmgr.Eps {
115 go c.send(ep, policies)
117 channel := make(chan EPStatus)
119 if rmrcallid == 200 {
123 for _, ep := range rtmgr.Eps {
124 go c.send_sync(ep, policies, channel, rmrcallid)
130 result := make([]EPStatus, len(rtmgr.Eps))
131 for i, _ := range result {
132 result[i] = <-channel
133 if result[i].status == true {
136 xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
140 if count < len(rtmgr.Eps) {
141 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
148 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
149 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
151 ret := c.send_data(ep, policies, call_id)
153 channel <- EPStatus{ep.Uuid, ret}
157 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
158 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
162 var policy = []byte{}
164 for _, pe := range *policies {
166 for j:=0; j<len(b); j++{
167 policy = append(policy,b[j])
170 params := &RMRParams{&xapp.RMRParams{}}
172 params.PayloadLen = len(policy)
173 params.Payload =[]byte(policy)
175 params.Whid = ep.Whid
176 params.Callid = call_id
178 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
179 routestatus := strings.Split(retstr," ")
180 if state != C.RMR_OK && routestatus[0] != "OK" {
181 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
184 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
189 func (c *RmrPush) CheckEndpoint(payload string)(ep *rtmgr.Endpoint) {
190 return c.checkEndpoint(payload)
193 func (c *RmrPush) CreateEndpoint(rmrsrc string)(ep *string,whid int) {
194 return c.createEndpoint(rmrsrc)
197 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
198 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
199 xapp.Logger.Debug("args: %v", *policies)
201 if rmrdynamiccallid == 255 {
202 rmrdynamiccallid = 201
205 go c.sendDynamicRoutes(ep, whid, policies,rmrdynamiccallid)
211 func (c *RmrPush) sendDynamicRoutes(ep string,whid int, policies *[]string, call_id int) bool {
212 xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
216 var policy = []byte{}
218 for _, pe := range *policies {
220 for j:=0; j<len(b); j++{
221 policy = append(policy,b[j])
224 params := &RMRParams{&xapp.RMRParams{}}
226 params.PayloadLen = len(policy)
227 params.Payload =[]byte(policy)
230 params.Callid = call_id
232 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
233 routestatus := strings.Split(retstr," ")
234 if state != C.RMR_OK && routestatus[0] != "OK" {
235 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
238 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))