2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (RMR) Pipeline SBI implementation
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
42 "routing-manager/pkg/rtmgr"
50 var rmrdynamiccallid = 201
57 rcChan chan *xapp.RMRParams
60 type EPStatus struct {
65 type RMRParams struct {
69 func (params *RMRParams) String() string {
71 sum := md5.Sum(params.Payload)
72 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
76 func NewRmrPush() *RmrPush {
77 instance := new(RmrPush)
81 func (c *RmrPush) Initialize(ip string) error {
85 func (c *RmrPush) Terminate() error {
89 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
90 addendpointct = addendpointct + 1
91 count := addendpointct
92 xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
93 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
94 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
96 time.Sleep(time.Duration(10) * time.Second)
97 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
99 return errors.New("cannot open wormhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid) + " count: " + strconv.Itoa(count))
102 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
108 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
109 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
110 xapp.Logger.Debug("args: %v", *ep)
112 xapp.Rmr.Closewh(ep.Whid)
116 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
117 c.updateEndpoints(rcs, c)
120 func (c *RmrPush) DistributeAll(policies *[]string) error {
121 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
122 xapp.Logger.Debug("args: %v", *policies)
124 /*for _, ep := range rtmgr.Eps {
125 go c.send(ep, policies)
127 //channel := make(chan EPStatus)
129 if rmrcallid == 200 {
133 for _, ep := range rtmgr.Eps {
134 go c.send_sync(ep, policies, rmrcallid)
141 result := make([]EPStatus, len(rtmgr.Eps))
142 for i, _ := range result {
143 result[i] = <-channel
144 if result[i].status == true {
147 xapp.Logger.Error("RMR sent failed for endpoint %v", result[i].endpoint)
151 if count < len(rtmgr.Eps) {
152 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
158 //func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
159 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) {
160 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
162 ret := c.send_data(ep, policies, call_id)
163 for count = 0; count <= 2 && ret == false; count++ {
164 time.Sleep(time.Second)
165 ret := c.send_data(ep, policies, call_id)
169 xapp.Logger.Error("Invoked send_data to try again due to return value : %v", ret)
173 rtmgr.RMRConnStatus[ep.Uuid] = ret
175 // Handling per connection .. may be updating global map
177 //channel <- EPStatus{ep.Uuid, ret}
181 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
182 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
186 var policy = []byte{}
188 for _, pe := range *policies {
190 for j := 0; j < len(b); j++ {
191 policy = append(policy, b[j])
194 params := &RMRParams{&xapp.RMRParams{}}
196 params.PayloadLen = len(policy)
197 params.Payload = []byte(policy)
199 params.Whid = ep.Whid
200 params.Callid = call_id
202 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
203 routestatus := strings.Split(retstr, " ")
204 if state != C.RMR_OK && routestatus[0] != "OK" {
205 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
208 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
213 func (c *RmrPush) CheckEndpoint(payload string) (ep *rtmgr.Endpoint) {
214 return c.checkEndpoint(payload)
217 func (c *RmrPush) CreateEndpoint(rmrsrc string) (ep *string, whid int) {
218 return c.createEndpoint(rmrsrc)
221 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
222 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
223 xapp.Logger.Debug("args: %v", *policies)
225 if rmrdynamiccallid == 255 {
226 rmrdynamiccallid = 201
229 go c.sendDynamicRoutes(ep, whid, policies, rmrdynamiccallid)
235 func (c *RmrPush) sendDynamicRoutes(ep string, whid int, policies *[]string, call_id int) bool {
236 xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
240 var policy = []byte{}
242 for _, pe := range *policies {
244 for j := 0; j < len(b); j++ {
245 policy = append(policy, b[j])
248 params := &RMRParams{&xapp.RMRParams{}}
250 params.PayloadLen = len(policy)
251 params.Payload = []byte(policy)
254 params.Callid = call_id
256 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
257 routestatus := strings.Split(retstr, " ")
258 if state != C.RMR_OK && routestatus[0] != "OK" {
259 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
262 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))