2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (RMR) Pipeline SBI implementation
40 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
41 "routing-manager/pkg/rtmgr"
48 var rmrdynamiccallid = 201
52 rcChan chan *xapp.RMRParams
55 type EPStatus struct {
60 type RMRParams struct {
64 func (params *RMRParams) String() string {
66 sum := md5.Sum(params.Payload)
67 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
71 func NewRmrPush() *RmrPush {
72 instance := new(RmrPush)
76 func (c *RmrPush) Initialize(ip string) error {
80 func (c *RmrPush) Terminate() error {
84 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
86 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
87 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
88 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
90 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
92 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
98 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
99 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100 xapp.Logger.Debug("args: %v", *ep)
102 xapp.Rmr.Closewh(ep.Whid)
106 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
107 c.updateEndpoints(rcs, c)
110 func (c *RmrPush) DistributeAll(policies *[]string) error {
111 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112 xapp.Logger.Debug("args: %v", *policies)
114 /*for _, ep := range rtmgr.Eps {
115 go c.send(ep, policies)
117 channel := make(chan EPStatus)
119 if rmrcallid == 200 {
123 for _, ep := range rtmgr.Eps {
124 go c.send_sync(ep, policies, channel, rmrcallid)
126 for rEp, id := range rtmgr.RmrEp {
127 go c.send_rt_process(rEp,id,policies,rmrcallid)
133 result := make([]EPStatus, len(rtmgr.Eps))
134 for i, _ := range result {
135 result[i] = <-channel
136 if result[i].status == true {
139 xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
143 if count < len(rtmgr.Eps) {
144 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
151 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
152 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
154 ret := c.send_data(ep, policies, call_id)
156 channel <- EPStatus{ep.Uuid, ret}
160 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
161 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
165 var policy = []byte{}
167 for _, pe := range *policies {
169 for j:=0; j<len(b); j++{
170 policy = append(policy,b[j])
173 params := &RMRParams{&xapp.RMRParams{}}
175 params.PayloadLen = len(policy)
176 params.Payload =[]byte(policy)
178 params.Whid = ep.Whid
179 params.Callid = call_id
181 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
182 routestatus := strings.Split(retstr," ")
183 if state != C.RMR_OK && routestatus[0] == "OK" {
184 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
187 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
191 xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
195 func (c *RmrPush) CreateEndpoint(payload string,rmrsrc string)*rtmgr.Endpoint {
196 return c.createEndpoint(payload,rmrsrc, c)
199 func (c *RmrPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
200 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
201 xapp.Logger.Debug("args: %v", *policies)
203 if rmrdynamiccallid == 255 {
204 rmrdynamiccallid = 201
207 go c.send_data(ep, policies,rmrdynamiccallid)
213 func (c *RmrPush) send_rt_process(ep string,whid int, policies *[]string, call_id int) bool {
214 xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
218 var policy = []byte{}
220 for _, pe := range *policies {
222 for j:=0; j<len(b); j++{
223 policy = append(policy,b[j])
226 params := &RMRParams{&xapp.RMRParams{}}
228 params.PayloadLen = len(policy)
229 params.Payload =[]byte(policy)
232 params.Callid = call_id
234 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
235 routestatus := strings.Split(retstr," ")
236 if state != C.RMR_OK && routestatus[0] == "OK" {
237 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
240 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whi_id: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
244 xapp.Logger.Error("Route Update to endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " xapp.Rmr.SendCallMsg not called")