==================================================================================
*/
/*
- Mnemonic: nngpipe.go
- Abstract: mangos (NNG) Pipeline SBI implementation
+ Mnemonic: rmrpipe.go
+ Abstract: mangos (RMR) Pipeline SBI implementation
Date: 12 March 2019
*/
package sbi
+/*
+#include <rmr/rmr.h>
+*/
+import "C"
+
import (
"bytes"
"crypto/md5"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"routing-manager/pkg/rtmgr"
"strconv"
- //"time"
+ "strings"
"fmt"
)
-type NngPush struct {
+var rmrcallid = 1
+var rmrdynamiccallid = 201
+
+type RmrPush struct {
Sbi
rcChan chan *xapp.RMRParams
}
+type EPStatus struct {
+ endpoint string
+ status bool
+}
+
type RMRParams struct {
*xapp.RMRParams
}
return b.String()
}
-func NewNngPush() *NngPush {
- instance := new(NngPush)
+func NewRmrPush() *RmrPush {
+ instance := new(RmrPush)
return instance
}
-func (c *NngPush) Initialize(ip string) error {
+func (c *RmrPush) Initialize(ip string) error {
return nil
}
-func (c *NngPush) Terminate() error {
+func (c *RmrPush) Terminate() error {
return nil
}
-func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
xapp.Logger.Debug("Invoked sbi.AddEndpoint")
- endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
+ endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
ep.Whid = int(xapp.Rmr.Openwh(endpoint))
if ep.Whid < 0 {
return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
return nil
}
-func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
xapp.Logger.Debug("args: %v", *ep)
return nil
}
-func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
c.updateEndpoints(rcs, c)
}
-func (c *NngPush) DistributeAll(policies *[]string) error {
+func (c *RmrPush) DistributeAll(policies *[]string) error {
xapp.Logger.Debug("Invoked: sbi.DistributeAll")
xapp.Logger.Debug("args: %v", *policies)
- for _, ep := range rtmgr.Eps {
+ /*for _, ep := range rtmgr.Eps {
go c.send(ep, policies)
+ }*/
+ channel := make(chan EPStatus)
+
+ if rmrcallid == 200 {
+ rmrcallid = 1
}
+ for _, ep := range rtmgr.Eps {
+ go c.send_sync(ep, policies, channel, rmrcallid)
+ }
+
+ rmrcallid++
+
+ count := 0
+ result := make([]EPStatus, len(rtmgr.Eps))
+ for i, _ := range result {
+ result[i] = <-channel
+ if result[i].status == true {
+ count++
+ } else {
+ xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
+ }
+ }
+
+ if count < len(rtmgr.Eps) {
+ return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
+ }
+
+
return nil
}
-func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
- xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
+ xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
- var policy = []byte{}
- cumulative_policy := 0
- count := 0
- maxrecord := xapp.Config.GetInt("maxrecord")
- if maxrecord == 0 {
- maxrecord = 10
- }
+ ret := c.send_data(ep, policies, call_id)
+
+ channel <- EPStatus{ep.Uuid, ret}
+
+}
+
+func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
+ xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
+ var state int
+ var retstr string
+
+ var policy = []byte{}
- for _, pe := range *policies {
- b := []byte(pe)
- for j := 0; j < len(b); j++ {
- policy = append(policy, b[j])
- }
- count++
- cumulative_policy++
- if count == maxrecord || cumulative_policy == len(*policies) {
- params := &RMRParams{&xapp.RMRParams{}}
- params.Mtype = 20
- params.PayloadLen = len(policy)
- params.Payload = []byte(policy)
- params.Mbuf = nil
- params.Whid = ep.Whid
- xapp.Rmr.SendMsg(params.RMRParams)
- count = 0
- policy = nil
- xapp.Logger.Debug("Sent message with payload len = %d", params.PayloadLen)
- }
+ for _, pe := range *policies {
+ b:= []byte(pe)
+ for j:=0; j<len(b); j++{
+ policy = append(policy,b[j])
+ }
}
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = 20
+ params.PayloadLen = len(policy)
+ params.Payload =[]byte(policy)
+ params.Mbuf = nil
+ params.Whid = ep.Whid
+ params.Callid = call_id
+ params.Timeout = 200
+ state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+ routestatus := strings.Split(retstr," ")
+ if state != C.RMR_OK && routestatus[0] != "OK" {
+ xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+ return false
+ } else {
+ xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+ return true
+ }
+}
- xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
+func (c *RmrPush) CheckEndpoint(payload string)(ep *rtmgr.Endpoint) {
+ return c.checkEndpoint(payload)
}
-func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
- return c.createEndpoint(payload, c)
+func (c *RmrPush) CreateEndpoint(rmrsrc string)(ep *string,whid int) {
+ return c.createEndpoint(rmrsrc)
}
-func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
+func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
xapp.Logger.Debug("args: %v", *policies)
- go c.send(ep, policies)
+ if rmrdynamiccallid == 255 {
+ rmrdynamiccallid = 201
+ }
+
+ go c.sendDynamicRoutes(ep, whid, policies,rmrdynamiccallid)
+ rmrdynamiccallid++
return nil
}
+
+func (c *RmrPush) sendDynamicRoutes(ep string,whid int, policies *[]string, call_id int) bool {
+ xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
+ var state int
+ var retstr string
+
+ var policy = []byte{}
+
+ for _, pe := range *policies {
+ b:= []byte(pe)
+ for j:=0; j<len(b); j++{
+ policy = append(policy,b[j])
+ }
+ }
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = 20
+ params.PayloadLen = len(policy)
+ params.Payload =[]byte(policy)
+ params.Mbuf = nil
+ params.Whid = whid
+ params.Callid = call_id
+ params.Timeout = 200
+ state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+ routestatus := strings.Split(retstr," ")
+ if state != C.RMR_OK && routestatus[0] != "OK" {
+ xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+ return false
+ } else {
+ xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+ return true
+ }
+}
+