2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (RMR) Pipeline SBI implementation
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
42 "routing-manager/pkg/rtmgr"
50 var rmrdynamiccallid = 201
57 rcChan chan *xapp.RMRParams
60 type EPStatus struct {
65 type RMRParams struct {
69 func (params *RMRParams) String() string {
71 sum := md5.Sum(params.Payload)
72 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
76 func NewRmrPush() *RmrPush {
77 instance := new(RmrPush)
81 func (c *RmrPush) Initialize(ip string) error {
85 func (c *RmrPush) Terminate() error {
89 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
90 addendpointct = addendpointct + 1
91 count := addendpointct
92 xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
93 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
94 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
96 time.Sleep(time.Duration(10) * time.Second)
97 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
99 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid) + " count: " + strconv.Itoa(count))
102 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
108 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
109 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
110 xapp.Logger.Debug("args: %v", *ep)
112 xapp.Rmr.Closewh(ep.Whid)
116 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
117 c.updateEndpoints(rcs, c)
120 func (c *RmrPush) DistributeAll(policies *[]string) error {
121 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
122 xapp.Logger.Debug("args: %v", *policies)
124 /*for _, ep := range rtmgr.Eps {
125 go c.send(ep, policies)
127 //channel := make(chan EPStatus)
129 if rmrcallid == 200 {
133 for _, ep := range rtmgr.Eps {
134 go c.send_sync(ep, policies, rmrcallid)
141 result := make([]EPStatus, len(rtmgr.Eps))
142 for i, _ := range result {
143 result[i] = <-channel
144 if result[i].status == true {
147 xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
151 if count < len(rtmgr.Eps) {
152 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
158 //func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
159 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) {
160 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
162 ret := c.send_data(ep, policies, call_id)
164 rtmgr.RMRConnStatus[ep.Uuid] = ret
166 // Handling per connection .. may be updating global map
168 //channel <- EPStatus{ep.Uuid, ret}
172 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
173 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
177 var policy = []byte{}
179 for _, pe := range *policies {
181 for j := 0; j < len(b); j++ {
182 policy = append(policy, b[j])
185 params := &RMRParams{&xapp.RMRParams{}}
187 params.PayloadLen = len(policy)
188 params.Payload = []byte(policy)
190 params.Whid = ep.Whid
191 params.Callid = call_id
193 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
194 routestatus := strings.Split(retstr, " ")
195 if state != C.RMR_OK && routestatus[0] != "OK" {
196 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
199 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
204 func (c *RmrPush) CheckEndpoint(payload string) (ep *rtmgr.Endpoint) {
205 return c.checkEndpoint(payload)
208 func (c *RmrPush) CreateEndpoint(rmrsrc string) (ep *string, whid int) {
209 return c.createEndpoint(rmrsrc)
212 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
213 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
214 xapp.Logger.Debug("args: %v", *policies)
216 if rmrdynamiccallid == 255 {
217 rmrdynamiccallid = 201
220 go c.sendDynamicRoutes(ep, whid, policies, rmrdynamiccallid)
226 func (c *RmrPush) sendDynamicRoutes(ep string, whid int, policies *[]string, call_id int) bool {
227 xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
231 var policy = []byte{}
233 for _, pe := range *policies {
235 for j := 0; j < len(b); j++ {
236 policy = append(policy, b[j])
239 params := &RMRParams{&xapp.RMRParams{}}
241 params.PayloadLen = len(policy)
242 params.Payload = []byte(policy)
245 params.Callid = call_id
247 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
248 routestatus := strings.Split(retstr, " ")
249 if state != C.RMR_OK && routestatus[0] != "OK" {
250 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
253 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))