"bytes"
"crypto/md5"
"errors"
+ "fmt"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"routing-manager/pkg/rtmgr"
"strconv"
"strings"
- "fmt"
+ "sync"
+ "time"
)
var rmrcallid = 1
var rmrdynamiccallid = 201
+var addendpointct = 1
+
+var conn sync.Mutex
type RmrPush struct {
Sbi
}
type EPStatus struct {
- endpoint string
- status bool
+ endpoint string
+ status bool
}
type RMRParams struct {
}
func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
-
- xapp.Logger.Debug("Invoked sbi.AddEndpoint")
+ addendpointct = addendpointct + 1
+ count := addendpointct
+ xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
ep.Whid = int(xapp.Rmr.Openwh(endpoint))
if ep.Whid < 0 {
- return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
+ time.Sleep(time.Duration(10) * time.Second)
+ ep.Whid = int(xapp.Rmr.Openwh(endpoint))
+ if ep.Whid < 0 {
+ return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid) + " count: " + strconv.Itoa(count))
+ }
} else {
xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
}
/*for _, ep := range rtmgr.Eps {
go c.send(ep, policies)
}*/
- channel := make(chan EPStatus)
+ //channel := make(chan EPStatus)
if rmrcallid == 200 {
rmrcallid = 1
}
- for _, ep := range rtmgr.Eps {
- go c.send_sync(ep, policies, channel, rmrcallid)
- }
+ for _, ep := range rtmgr.Eps {
+ go c.send_sync(ep, policies, rmrcallid)
+ }
rmrcallid++
- count := 0
- result := make([]EPStatus, len(rtmgr.Eps))
- for i, _ := range result {
- result[i] = <-channel
- if result[i].status == true {
- count++
- } else {
- xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
- }
- }
-
- if count < len(rtmgr.Eps) {
- return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
- }
-
+ /*
+ count := 0
+ result := make([]EPStatus, len(rtmgr.Eps))
+ for i, _ := range result {
+ result[i] = <-channel
+ if result[i].status == true {
+ count++
+ } else {
+ xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
+ }
+ }
+
+ if count < len(rtmgr.Eps) {
+ return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
+ }*/
return nil
}
-func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
- xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+//func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
+func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) {
+ xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
ret := c.send_data(ep, policies, call_id)
+ conn.Lock()
+ rtmgr.RMRConnStatus[ep.Uuid] = ret
+ conn.Unlock()
+ // Handling per connection .. may be updating global map
- channel <- EPStatus{ep.Uuid, ret}
+ //channel <- EPStatus{ep.Uuid, ret}
}
func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
- xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
- var state int
- var retstr string
+ xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
+ var state int
+ var retstr string
- var policy = []byte{}
+ var policy = []byte{}
- for _, pe := range *policies {
- b:= []byte(pe)
- for j:=0; j<len(b); j++{
- policy = append(policy,b[j])
- }
+ for _, pe := range *policies {
+ b := []byte(pe)
+ for j := 0; j < len(b); j++ {
+ policy = append(policy, b[j])
+ }
}
- params := &RMRParams{&xapp.RMRParams{}}
- params.Mtype = 20
- params.PayloadLen = len(policy)
- params.Payload =[]byte(policy)
- params.Mbuf = nil
- params.Whid = ep.Whid
- params.Callid = call_id
- params.Timeout = 200
- state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
- routestatus := strings.Split(retstr," ")
- if state != C.RMR_OK && routestatus[0] != "OK" {
- xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
- return false
- } else {
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = 20
+ params.PayloadLen = len(policy)
+ params.Payload = []byte(policy)
+ params.Mbuf = nil
+ params.Whid = ep.Whid
+ params.Callid = call_id
+ params.Timeout = 200
+ state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+ routestatus := strings.Split(retstr, " ")
+ if state != C.RMR_OK && routestatus[0] != "OK" {
+ xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+ return false
+ } else {
xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
- return true
- }
+ return true
+ }
}
-func (c *RmrPush) CheckEndpoint(payload string)(ep *rtmgr.Endpoint) {
+func (c *RmrPush) CheckEndpoint(payload string) (ep *rtmgr.Endpoint) {
return c.checkEndpoint(payload)
}
-func (c *RmrPush) CreateEndpoint(rmrsrc string)(ep *string,whid int) {
+func (c *RmrPush) CreateEndpoint(rmrsrc string) (ep *string, whid int) {
return c.createEndpoint(rmrsrc)
}
xapp.Logger.Debug("args: %v", *policies)
if rmrdynamiccallid == 255 {
- rmrdynamiccallid = 201
- }
+ rmrdynamiccallid = 201
+ }
- go c.sendDynamicRoutes(ep, whid, policies,rmrdynamiccallid)
+ go c.sendDynamicRoutes(ep, whid, policies, rmrdynamiccallid)
rmrdynamiccallid++
return nil
}
-func (c *RmrPush) sendDynamicRoutes(ep string,whid int, policies *[]string, call_id int) bool {
- xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
- var state int
- var retstr string
-
- var policy = []byte{}
-
- for _, pe := range *policies {
- b:= []byte(pe)
- for j:=0; j<len(b); j++{
- policy = append(policy,b[j])
- }
- }
- params := &RMRParams{&xapp.RMRParams{}}
- params.Mtype = 20
- params.PayloadLen = len(policy)
- params.Payload =[]byte(policy)
- params.Mbuf = nil
- params.Whid = whid
- params.Callid = call_id
- params.Timeout = 200
- state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
- routestatus := strings.Split(retstr," ")
- if state != C.RMR_OK && routestatus[0] != "OK" {
- xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
- return false
- } else {
- xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
- return true
- }
-}
+func (c *RmrPush) sendDynamicRoutes(ep string, whid int, policies *[]string, call_id int) bool {
+ xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
+ var state int
+ var retstr string
+
+ var policy = []byte{}
+ for _, pe := range *policies {
+ b := []byte(pe)
+ for j := 0; j < len(b); j++ {
+ policy = append(policy, b[j])
+ }
+ }
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = 20
+ params.PayloadLen = len(policy)
+ params.Payload = []byte(policy)
+ params.Mbuf = nil
+ params.Whid = whid
+ params.Callid = call_id
+ params.Timeout = 200
+ state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+ routestatus := strings.Split(retstr, " ")
+ if state != C.RMR_OK && routestatus[0] != "OK" {
+ xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+ return false
+ } else {
+ xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+ return true
+ }
+}