2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (RMR) Pipeline SBI implementation
40 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
41 "routing-manager/pkg/rtmgr"
48 var rmrdynamiccallid = 201
52 rcChan chan *xapp.RMRParams
55 type EPStatus struct {
60 type RMRParams struct {
64 func (params *RMRParams) String() string {
66 sum := md5.Sum(params.Payload)
67 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
71 func NewRmrPush() *RmrPush {
72 instance := new(RmrPush)
76 func (c *RmrPush) Initialize(ip string) error {
80 func (c *RmrPush) Terminate() error {
84 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
86 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
87 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
88 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
90 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
92 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
98 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
99 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100 xapp.Logger.Debug("args: %v", *ep)
102 xapp.Rmr.Closewh(ep.Whid)
106 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
107 c.updateEndpoints(rcs, c)
110 func (c *RmrPush) DistributeAll(policies *[]string) error {
111 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112 xapp.Logger.Debug("args: %v", *policies)
114 /*for _, ep := range rtmgr.Eps {
115 go c.send(ep, policies)
117 channel := make(chan EPStatus)
119 if rmrcallid == 200 {
123 for _, ep := range rtmgr.Eps {
124 go c.send_sync(ep, policies, channel, rmrcallid)
129 result := make([]EPStatus, len(rtmgr.Eps))
130 for i, _ := range result {
131 result[i] = <-channel
132 if result[i].status == true {
135 xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
139 if count < len(rtmgr.Eps) {
140 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
147 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
148 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
150 ret := c.send_data(ep, policies, call_id)
152 channel <- EPStatus{ep.Uuid, ret}
156 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
157 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
161 var policy = []byte{}
163 for _, pe := range *policies {
165 for j:=0; j<len(b); j++{
166 policy = append(policy,b[j])
169 params := &RMRParams{&xapp.RMRParams{}}
171 params.PayloadLen = len(policy)
172 params.Payload =[]byte(policy)
174 params.Whid = ep.Whid
175 params.Callid = call_id
177 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
178 routestatus := strings.Split(retstr," ")
179 if state != C.RMR_OK && routestatus[0] == "OK" {
180 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
183 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
187 xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
191 func (c *RmrPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
192 return c.createEndpoint(payload, c)
195 func (c *RmrPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
196 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
197 xapp.Logger.Debug("args: %v", *policies)
199 if rmrdynamiccallid == 255 {
200 rmrdynamiccallid = 201
203 go c.send_data(ep, policies,rmrdynamiccallid)