+### v0.6.2
+* Distribution of routes is synchronous.
+
### v0.6.1
* Updating xapp_fwk to v0.4.15, that contains RIC_HEALTH_CHECK message types
"os"
"os/signal"
"routing-manager/pkg/nbi"
- "routing-manager/pkg/rpe"
+ //"routing-manager/pkg/rpe"
"routing-manager/pkg/rtmgr"
- "routing-manager/pkg/sbi"
- "routing-manager/pkg/sdl"
+ //"routing-manager/pkg/sbi"
+ //"routing-manager/pkg/sdl"
"syscall"
"time"
- "sync"
)
const SERVICENAME = "rtmgr"
-const INTERVAL time.Duration = 60
-
-func initRtmgr() (nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, err error) {
- if nbiEngine, err = nbi.GetNbi(xapp.Config.GetString("nbi")); err == nil && nbiEngine != nil {
- if sbiEngine, err = sbi.GetSbi(xapp.Config.GetString("sbi")); err == nil && sbiEngine != nil {
- if sdlEngine, err = sdl.GetSdl(xapp.Config.GetString("sdl")); err == nil && sdlEngine != nil {
- if rpeEngine, err = rpe.GetRpe(xapp.Config.GetString("rpe")); err == nil && rpeEngine != nil {
- return nbiEngine, sbiEngine, sdlEngine, rpeEngine, nil
- }
- }
- }
- }
- return nil, nil, nil, nil, err
-}
-
-
-
-func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
- for {
- if <-triggerSBI {
- m.Lock()
- data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
- m.Unlock()
- if err != nil || data == nil {
- xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
- continue
- }
- sbiEngine.UpdateEndpoints(data)
- policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
- err = sbiEngine.DistributeAll(policies)
- if err != nil {
- xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
- }
- }
- }
-}
-
-func sendRoutesToAll(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) {
-
- data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
- if err != nil || data == nil {
- xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
- return
- }
- sbiEngine.UpdateEndpoints(data)
- policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
- err = sbiEngine.DistributeAll(policies)
- if err != nil {
- xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
- return
- }
-}
-
-
-func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
-
- triggerSBI := make(chan bool)
-
- nbiErr := nbiEngine.Initialize(xapp.Config.GetString("xmurl"), xapp.Config.GetString("nbiurl"), xapp.Config.GetString("rtfile"), xapp.Config.GetString("cfgfile"), xapp.Config.GetString("e2murl"),
- sdlEngine, rpeEngine, triggerSBI, m)
- if nbiErr != nil {
- xapp.Logger.Error("Failed to initialize nbi due to: " + nbiErr.Error())
- return
- }
-
- err := sbiEngine.Initialize(xapp.Config.GetString("sbiurl"))
- if err != nil {
- xapp.Logger.Info("Failed to open push socket due to: " + err.Error())
- return
- }
- defer nbiEngine.Terminate()
- defer sbiEngine.Terminate()
-
- // This SBI Go routine is trtiggered by periodic main loop and when data is recieved on REST interface.
- go serveSBI(triggerSBI, sbiEngine, sdlEngine, rpeEngine, m)
-
- for {
- if xapp.Config.GetString("nbi") == "httpGetter" {
- data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXApps(xapp.Config.GetString("xmurl"))
- if err != nil {
- xapp.Logger.Error("Cannot fetch xapp data due to: " + err.Error())
- } else if data != nil {
- sdlEngine.WriteXApps(xapp.Config.GetString("rtfile"), data)
- }
- }
-
- sendRoutesToAll(sbiEngine, sdlEngine, rpeEngine)
-
- rtmgr.Rtmgr_ready = true
- time.Sleep(INTERVAL * time.Second)
- xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
- }
-}
func SetupCloseHandler() {
c := make(chan os.Signal, 2)
func main() {
- nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr()
- if err != nil {
- xapp.Logger.Error(err.Error())
- os.Exit(1)
- }
-
SetupCloseHandler()
xapp.Logger.Info("Start " + SERVICENAME + " service")
rtmgr.Eps = make(rtmgr.Endpoints)
+ rtmgr.Mtype = make(rtmgr.MessageTypeList)
rtmgr.Rtmgr_ready = false
- var m sync.Mutex
-
// RMR thread is starting port: 4560
c := nbi.NewControl()
- go c.Run(sbiEngine, sdlEngine, rpeEngine, &m)
+ go c.Run()
// Waiting for RMR to be ready
time.Sleep(time.Duration(2) * time.Second)
dummy_whid := int(xapp.Rmr.Openwh("localhost:4560"))
xapp.Logger.Info("created dummy Wormhole ID for routingmanager and dummy_whid :%d", dummy_whid)
- serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m)
+ nbi.Serve()
os.Exit(0)
}
# By default this file is in the docker build directory,
# but the location can configured in the JJB template.
---
-tag: 0.6.1
+tag: 0.6.2
"protPort": "tcp:4560"
"maxSize": 2072
"numWorkers": 1
+ "RTFILE":
+ "/db/rt.json"
+ "NBIURL":
+ "http://localhost:3800"
+
}
subscription:
host: "127.0.0.1:8089"
+
+ #xmurl: "http://service-ricplt-appmgr-http:8080/ric/v1/xapps"
+ #e2murl: "http://service-ricplt-e2mgr-http:3800/v1/e2t/list"
+ #rtfile: "/db/rt.json"
+ #CFGFILE: "/cfg/rtmgr-config.yaml"
+ #rpe: "rmrpush"
+ #s#bi: "rmrpush"
+ #s#biurl: "0.0.0.0"
+ #nbi: "httpRESTful"
+ #nbiurl: "http://service-ricplt-rtmgr-http:3800"
+ ##sdl: "file"
+ #local:
+ #host: ":8080"
+
"routing-manager/pkg/sdl"
"strconv"
"sync"
+ "time"
+ "os"
)
-func NewControl() Control {
+var m sync.Mutex
+
+var nbiEngine Engine
+var sbiEngine sbi.Engine
+var sdlEngine sdl.Engine
+var rpeEngine rpe.Engine
+
+const INTERVAL time.Duration = 60
+func NewControl() Control {
return Control{make(chan *xapp.RMRParams)}
}
rcChan chan *xapp.RMRParams
}
-func (c *Control) Run(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
- go c.controlLoop(sbiEngine, sdlEngine, rpeEngine, m)
+
+func (c *Control) Run() {
+ var err error
+ go c.controlLoop()
+ nbiEngine, sbiEngine, sdlEngine, rpeEngine, err = initRtmgr()
+ if err != nil {
+ xapp.Logger.Error(err.Error())
+ os.Exit(1)
+ }
xapp.Run(c)
}
return
}
-func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
+func initRtmgr() (nbiEngine Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, err error) {
+ if nbiEngine, err = GetNbi(xapp.Config.GetString("nbi")); err == nil && nbiEngine != nil {
+ if sbiEngine, err = sbi.GetSbi(xapp.Config.GetString("sbi")); err == nil && sbiEngine != nil {
+ if sdlEngine, err = sdl.GetSdl(xapp.Config.GetString("sdl")); err == nil && sdlEngine != nil {
+ if rpeEngine, err = rpe.GetRpe(xapp.Config.GetString("rpe")); err == nil && rpeEngine != nil {
+ return nbiEngine, sbiEngine, sdlEngine, rpeEngine, nil
+ }
+ }
+ }
+ }
+ return nil, nil, nil, nil, err
+}
+
+func (c *Control) controlLoop() {
for {
msg := <-c.rcChan
xapp_msg := sbi.RMRParams{msg}
xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready")
} else {
xapp.Logger.Info("Update Route Table Request(RMR to RM)")
- go c.handleUpdateToRoutingManagerRequest(msg, sbiEngine, sdlEngine, rpeEngine, m)
+ go c.handleUpdateToRoutingManagerRequest(msg)
}
case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]:
xapp.Logger.Info("state of table to route mgr %s,payload %s", xapp_msg.String(), msg.Payload)
}
}
-func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
+func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams) {
msg := sbi.RMRParams{params}
return
}
}
+
+func sendRoutesToAll() (err error) {
+
+ m.Lock()
+ data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
+ m.Unlock()
+ if err != nil || data == nil {
+ return errors.New("Cannot get data from sdl interface due to: " + err.Error())
+ }
+ sbiEngine.UpdateEndpoints(data)
+ policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
+ err = sbiEngine.DistributeAll(policies)
+ if err != nil {
+ return errors.New("Routing table cannot be published due to: " + err.Error())
+ }
+
+ return nil
+}
+
+func Serve() {
+
+ nbiErr := nbiEngine.Initialize(xapp.Config.GetString("xmurl"), xapp.Config.GetString("nbiurl"), xapp.Config.GetString("rtfile"), xapp.Config.GetString("cfgfile"), xapp.Config.GetString("e2murl"), sdlEngine, rpeEngine, &m)
+ if nbiErr != nil {
+ xapp.Logger.Error("Failed to initialize nbi due to: " + nbiErr.Error())
+ return
+ }
+
+ err := sbiEngine.Initialize(xapp.Config.GetString("sbiurl"))
+ if err != nil {
+ xapp.Logger.Info("Failed to open push socket due to: " + err.Error())
+ return
+ }
+ defer nbiEngine.Terminate()
+ defer sbiEngine.Terminate()
+
+ for {
+ sendRoutesToAll()
+
+ rtmgr.Rtmgr_ready = true
+ time.Sleep(INTERVAL * time.Second)
+ xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
+ }
+}
}
func (g *HttpGetter) Initialize(xmurl string, nbiif string, fileName string, configfile string, e2murl string,
- sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
+ sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) error {
return nil
}
"time"
)
-//var myClient = &http.Client{Timeout: 1 * time.Second}
-
type HttpRestful struct {
Engine
LaunchRest LaunchRestHandler
- RecvXappCallbackData RecvXappCallbackDataHandler
- RecvNewE2Tdata RecvNewE2TdataHandler
- ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
RetrieveStartupData RetrieveStartupDataHandler
}
func NewHttpRestful() *HttpRestful {
instance := new(HttpRestful)
instance.LaunchRest = launchRest
- instance.RecvXappCallbackData = recvXappCallbackData
- instance.RecvNewE2Tdata = recvNewE2Tdata
- instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
instance.RetrieveStartupData = retrieveStartupData
return instance
}
-// ToDo: Use Range over channel. Read and return only the latest one.
-func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
- var xappData *models.XappCallbackData
- // Drain the channel as we are only looking for the latest value until
- // xapp manager sends all xapp data with every request.
- length := len(dataChannel)
- //xapp.Logger.Info(length)
- for i := 0; i <= length; i++ {
- xapp.Logger.Info("data received")
- // If no data received from the REST, it blocks.
- xappData = <-dataChannel
- }
+func recvXappCallbackData(xappData *models.XappCallbackData) (*[]rtmgr.XApp, error) {
if nil != xappData {
var xapps []rtmgr.XApp
err := json.Unmarshal([]byte(xappData.XApps), &xapps)
return nil, nil
}
-func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) {
- var e2tData *models.E2tData
+func recvNewE2Tdata(e2tData *models.E2tData) (*rtmgr.E2TInstance, string, error) {
var str string
xapp.Logger.Info("data received")
- e2tData = <-dataChannel
-
if nil != e2tData {
e2tinst := rtmgr.E2TInstance{
return nil
}
-func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
+func provideXappHandleHandlerImpl(data *models.XappCallbackData) error {
if data != nil {
xapp.Logger.Debug("Received callback data")
}
xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
return err
} else {
- datach <- data
+ appdata, err := recvXappCallbackData(data)
+ if err != nil {
+ xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
+ } else if appdata != nil {
+ xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
+ alldata, err1 := httpGetXApps(xapp.Config.GetString("xmurl"))
+ if alldata != nil && err1 == nil {
+ m.Lock()
+ sdlEngine.WriteXApps(xapp.Config.GetString("rtfile"), alldata)
+ m.Unlock()
+ return sendRoutesToAll()
+ }
+ }
+
return nil
}
}
}
-func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
- data *models.XappSubscriptionData) error {
+func provideXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error {
xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
err := validateXappSubscriptionData(data)
if err != nil {
xapp.Logger.Error(err.Error())
return err
}
- subchan <- data
- //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
+ xapp.Logger.Debug("received XApp subscription data")
+ addSubscription(&rtmgr.Subs, data)
xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
- return nil
+ return sendRoutesToAll()
}
func subscriptionExists(data *models.XappSubscriptionData) bool {
return present
}
-func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
- data *models.XappSubscriptionData) error {
+func deleteXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error {
xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
err := validateXappSubscriptionData(data)
if err != nil {
return err
}
- subdelchan <- data
- return nil
+ xapp.Logger.Debug("received XApp subscription delete data")
+ delSubscription(&rtmgr.Subs, data)
+ return sendRoutesToAll()
+
}
-func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data *models.XappList, subid uint16) error {
+func updateXappSubscriptionHandleImpl(data *models.XappList, subid uint16) error {
xapp.Logger.Debug("Invoked updateXappSubscriptionHandleImpl")
var fqdnlist []rtmgr.FqDn
return err
}
}
- subupdatechan <- &xapplist
- return nil
+ xapp.Logger.Debug("received XApp subscription Merge data")
+ updateSubscription(&xapplist)
+ return sendRoutesToAll()
}
-func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
- data *models.E2tData) error {
+func createNewE2tHandleHandlerImpl(data *models.E2tData) error {
xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
err := validateE2tData(data)
if err != nil {
xapp.Logger.Error(err.Error())
return err
}
- e2taddchan <- data
- return nil
+ //e2taddchan <- data
+ e2data, meiddata, _ := recvNewE2Tdata(data)
+ xapp.Logger.Debug("received create New E2T data")
+ m.Lock()
+ sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata)
+ m.Unlock()
+ return sendRoutesToAll()
+
}
func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
return nil
}
-func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap,
- data models.RanE2tMap) error {
+func associateRanToE2THandlerImpl(data models.RanE2tMap) error {
xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
err := validateE2TAddressRANListData(data)
if err != nil {
xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error())
return err
}
- assranchan <- data
- return nil
+ xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
+ m.Lock()
+ sdlEngine.WriteAssRANToE2TInstance(xapp.Config.GetString("rtfile"), data)
+ m.Unlock()
+ return sendRoutesToAll()
+
}
-func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap,
- data models.RanE2tMap) error {
+func disassociateRanToE2THandlerImpl(data models.RanE2tMap) error {
xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
err := validateE2TAddressRANListData(data)
if err != nil {
xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error())
return err
}
- disassranchan <- data
- return nil
+ xapp.Logger.Debug("received disassociate RANs from E2T instance")
+ m.Lock()
+ sdlEngine.WriteDisAssRANFromE2TInstance(xapp.Config.GetString("rtfile"), data)
+ m.Unlock()
+ return sendRoutesToAll()
+
}
-func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
- data *models.E2tDeleteData) error {
+func deleteE2tHandleHandlerImpl(data *models.E2tDeleteData) error {
xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
err := validateDeleteE2tData(data)
xapp.Logger.Error(err.Error())
return err
}
+ m.Lock()
+ sdlEngine.WriteDeleteE2TInstance(xapp.Config.GetString("rtfile"), data)
+ m.Unlock()
+ return sendRoutesToAll()
- e2tdelchan <- data
- return nil
}
func dumpDebugData() (models.Debuginfo, error) {
return response, err
}
-func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, subupdatechan chan<- *rtmgr.XappList,
- subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) {
+func launchRest(nbiif *string){
swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
if err != nil {
//log.Fatalln(err)
api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
func(params handle.ProvideXappHandleParams) middleware.Responder {
xapp.Logger.Info("Data received on Http interface")
- err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
+ err := provideXappHandleHandlerImpl(params.XappCallbackData)
if err != nil {
xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
return handle.NewProvideXappHandleBadRequest()
})
api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
- err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
+ err := provideXappSubscriptionHandleImpl(params.XappSubscriptionData)
if err != nil {
return handle.NewProvideXappSubscriptionHandleBadRequest()
} else {
- //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
- time.Sleep(1 * time.Second)
return handle.NewGetHandlesOK()
}
})
api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
- err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
+ err := deleteXappSubscriptionHandleImpl(params.XappSubscriptionData)
if err != nil {
return handle.NewDeleteXappSubscriptionHandleNoContent()
} else {
- //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
- time.Sleep(1 * time.Second)
return handle.NewGetHandlesOK()
}
})
api.HandleUpdateXappSubscriptionHandleHandler = handle.UpdateXappSubscriptionHandleHandlerFunc(
func(params handle.UpdateXappSubscriptionHandleParams) middleware.Responder {
- err := updateXappSubscriptionHandleImpl(subupdatechan, ¶ms.XappList, params.SubscriptionID)
+ err := updateXappSubscriptionHandleImpl(¶ms.XappList, params.SubscriptionID)
if err != nil {
return handle.NewUpdateXappSubscriptionHandleBadRequest()
} else {
- //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
- time.Sleep(1 * time.Second)
return handle.NewUpdateXappSubscriptionHandleCreated()
}
})
api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
func(params handle.CreateNewE2tHandleParams) middleware.Responder {
- err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
+ err := createNewE2tHandleHandlerImpl(params.E2tData)
if err != nil {
return handle.NewCreateNewE2tHandleBadRequest()
} else {
- time.Sleep(1 * time.Second)
return handle.NewCreateNewE2tHandleCreated()
}
})
api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
func(params handle.AssociateRanToE2tHandleParams) middleware.Responder {
- err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
+ err := associateRanToE2THandlerImpl(params.RanE2tList)
if err != nil {
return handle.NewAssociateRanToE2tHandleBadRequest()
} else {
- time.Sleep(1 * time.Second)
return handle.NewAssociateRanToE2tHandleCreated()
}
})
api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
func(params handle.DissociateRanParams) middleware.Responder {
- err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList)
+ err := disassociateRanToE2THandlerImpl(params.DissociateList)
if err != nil {
return handle.NewDissociateRanBadRequest()
} else {
- time.Sleep(1 * time.Second)
return handle.NewDissociateRanCreated()
}
})
api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
func(params handle.DeleteE2tHandleParams) middleware.Responder {
- err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
+ err := deleteE2tHandleHandlerImpl(params.E2tData)
if err != nil {
return handle.NewDeleteE2tHandleBadRequest()
} else {
- time.Sleep(1 * time.Second)
return handle.NewDeleteE2tHandleCreated()
}
})
}
func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string, e2murl string,
- sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
+ sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) error {
err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, e2murl, sdlEngine)
if err != nil {
xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
return err
}
-
- datach := make(chan *models.XappCallbackData, 10)
- subschan := make(chan *models.XappSubscriptionData, 10)
- subdelchan := make(chan *models.XappSubscriptionData, 10)
- subupdatechan := make(chan *rtmgr.XappList, 10)
- e2taddchan := make(chan *models.E2tData, 10)
- associateranchan := make(chan models.RanE2tMap, 10)
- disassociateranchan := make(chan models.RanE2tMap, 10)
- e2tdelchan := make(chan *models.E2tDeleteData, 10)
- xapp.Logger.Info("Launching Rest Http service")
- go func() {
- r.LaunchRest(&nbiif, datach, subschan, subupdatechan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
- }()
-
- go func() {
- for {
- data, err := r.RecvXappCallbackData(datach)
- if err != nil {
- xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
- } else if data != nil {
- xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
- alldata, err1 := httpGetXApps(xmurl)
- if alldata != nil && err1 == nil {
- m.Lock()
- sdlEngine.WriteXApps(fileName, alldata)
- m.Unlock()
- triggerSBI <- true
- }
- }
- }
- }()
-
- go func() {
- for {
- data := <-subschan
- xapp.Logger.Debug("received XApp subscription data")
- addSubscription(&rtmgr.Subs, data)
- triggerSBI <- true
- }
- }()
-
- go func() {
- for {
- data := <-subdelchan
- xapp.Logger.Debug("received XApp subscription delete data")
- delSubscription(&rtmgr.Subs, data)
- triggerSBI <- true
- }
- }()
-
- go func() {
- for {
- data := <-subupdatechan
- xapp.Logger.Debug("received XApp subscription Merge data")
- updateSubscription(data)
- triggerSBI <- true
- }
- }()
-
- go func() {
- for {
-
- data, meiddata, _ := r.RecvNewE2Tdata(e2taddchan)
- if data != nil {
- xapp.Logger.Debug("received create New E2T data")
- m.Lock()
- sdlEngine.WriteNewE2TInstance(fileName, data, meiddata)
- m.Unlock()
- triggerSBI <- true
- }
- }
- }()
-
- go func() {
- for {
- data := <-associateranchan
- xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
- m.Lock()
- sdlEngine.WriteAssRANToE2TInstance(fileName, data)
- m.Unlock()
- triggerSBI <- true
- }
- }()
-
go func() {
- for {
-
- data := <-disassociateranchan
- xapp.Logger.Debug("received disassociate RANs from E2T instance")
- m.Lock()
- sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
- m.Unlock()
- triggerSBI <- true
- }
- }()
-
- go func() {
- for {
-
- data := <-e2tdelchan
- xapp.Logger.Debug("received Delete E2T data")
- if data != nil {
- m.Lock()
- sdlEngine.WriteDeleteE2TInstance(fileName, data)
- m.Unlock()
- triggerSBI <- true
- }
- }
+ r.LaunchRest(&nbiif)
}()
return nil
"routing-manager/pkg/sdl"
"routing-manager/pkg/stub"
"testing"
- "time"
"sync"
"github.com/go-openapi/swag"
)
Port: &p,
SubscriptionID: swag.Int32(123456)}
err = validateXappSubscriptionData(&data1)
- t.Log(err)
//Validate E2tData
data2 := models.E2tData{
}
/*err = validateE2tData(&data2)*/
- e2tchannel := make(chan *models.E2tData, 10)
- _ = createNewE2tHandleHandlerImpl(e2tchannel, &data2)
- defer close(e2tchannel)
+ //e2tchannel := make(chan *models.E2tData, 10)
+ _ = createNewE2tHandleHandlerImpl(&data2)
+ //defer close(e2tchannel)
//test case for provideXappSubscriptionHandleImp
- datachannel := make(chan *models.XappSubscriptionData, 10)
- _ = provideXappSubscriptionHandleImpl(datachannel, &data1)
- defer close(datachannel)
+ //datachannel := make(chan *models.XappSubscriptionData, 10)
+ sdlEngine, _ = sdl.GetSdl("file")
+ _ = provideXappSubscriptionHandleImpl( &data1)
+ //defer close(datachannel)
//test case for deleteXappSubscriptionHandleImpl
- _ = deleteXappSubscriptionHandleImpl(datachannel, &data1)
+ _ = deleteXappSubscriptionHandleImpl(&data1)
data3 := models.XappSubscriptionData{
Address: swag.String("10.55.55.5"),
Port: &p,
SubscriptionID: swag.Int32(123456)}
//test case for deleteXappSubscriptionHandleImpl
- _ = deleteXappSubscriptionHandleImpl(datachannel, &data3)
+ _ = deleteXappSubscriptionHandleImpl(&data3)
+ data4 := models.XappSubscriptionData{
+ Address: swag.String("1.5.5.5"),
+ Port: &p,
+ SubscriptionID: swag.Int32(1236)}
+ _ = deleteXappSubscriptionHandleImpl(&data4)
+
}
func TestValidateE2tDataEmpty(t *testing.T) {
err := validateE2tData(&data)
t.Log(err)
+ _ = createNewE2tHandleHandlerImpl(&data)
+
}
func TestValidateE2tDatavalidEndpointPresent(t *testing.T) {
func TestAssociateRanToE2THandlerImpl(t *testing.T) {
- associateranchan := make(chan models.RanE2tMap, 10)
data := models.RanE2tMap{
{
E2TAddress: swag.String("10.101.01.1:8098"),
},
}
- err := associateRanToE2THandlerImpl(associateranchan, data)
+ err := associateRanToE2THandlerImpl( data)
if (err != nil ) {
t.Log(err)
}
E2TAddress: swag.String("10.101.01.1:8098"),
},
}
- err = associateRanToE2THandlerImpl(associateranchan, data)
+ err = associateRanToE2THandlerImpl(data)
if (err != nil ) {
t.Log(err)
}
- data1 := <-associateranchan
- fmt.Println(data1)
//################ Delete End Point dummy entry
delete(rtmgr.Eps, uuid);
//#####################
func TestDisassociateRanToE2THandlerImpl(t *testing.T) {
- disassranchan := make(chan models.RanE2tMap, 10)
data := models.RanE2tMap{
{
E2TAddress: swag.String("10.101.01.1:8098"),
},
}
- err := disassociateRanToE2THandlerImpl(disassranchan, data)
+ err := disassociateRanToE2THandlerImpl(data)
if (err != nil ) {
t.Log(err)
}
E2TAddress: swag.String("10.101.01.1:8098"),
},
}
- err = disassociateRanToE2THandlerImpl(disassranchan, data)
+ err = disassociateRanToE2THandlerImpl(data)
if (err != nil ) {
t.Log(err)
}
- data1 := <-disassranchan
- fmt.Println(data1)
//################ Delete End Point dummy entry
delete(rtmgr.Eps, uuid);
//#####################
func TestDeleteE2tHandleHandlerImpl(t *testing.T) {
- e2tdelchan := make(chan *models.E2tDeleteData, 10)
data := models.E2tDeleteData{
E2TAddress: swag.String(""),
}
- err := deleteE2tHandleHandlerImpl(e2tdelchan, &data)
+ err := deleteE2tHandleHandlerImpl(&data)
if (err != nil ) {
t.Log(err)
}
data = models.E2tDeleteData{
E2TAddress: swag.String("10.101.01.1:8098"),
}
- err = deleteE2tHandleHandlerImpl(e2tdelchan, &data)
+ err = deleteE2tHandleHandlerImpl(&data)
if (err != nil ) {
t.Log(err)
}
- data1 := <-e2tdelchan
-
- fmt.Println(data1)
//################ Delete End Point dummy entry
delete(rtmgr.Eps, uuid);
//#####################
err := httpinstance.Terminate()
t.Log(err)
- triggerSBI := make(chan bool)
createMockPlatformComponents()
//ts := createMockAppmgrWithData("127.0.0.1:3000", BasicXAppLists, nil)
//ts.Start()
//defer ts.Close()
var m sync.Mutex
- err = httpinstance.Initialize(XMURL, "httpgetter", "rt.json", "config.json", E2MURL, sdlEngine, rpeEngine, triggerSBI, &m)
+ err = httpinstance.Initialize(XMURL, "httpgetter", "rt.json", "config.json", E2MURL, sdlEngine, rpeEngine, &m)
}
-func TestXappCallbackDataChannelwithdata(t *testing.T) {
+func TestXappCallbackWithData(t *testing.T) {
data := models.XappCallbackData{
XApps: *swag.String("[]"),
Version: *swag.Int64(1),
Event: *swag.String("someevent"),
ID: *swag.String("123456")}
- datach := make(chan *models.XappCallbackData, 1)
- go func() { _, _ = recvXappCallbackData(datach) }()
- defer close(datach)
- datach <- &data
+ _, _ = recvXappCallbackData(&data)
}
-func TestXappCallbackDataChannelNodata(t *testing.T) {
- datach := make(chan *models.XappCallbackData, 1)
- go func() { _, _ = recvXappCallbackData(datach) }()
- defer close(datach)
+
+func TestXappCallbackNodata(t *testing.T) {
+ //data := *models.XappCallbackData
+ _, _ = recvXappCallbackData(nil)
}
-func TestE2TChannelwithData(t *testing.T) {
- data2 := models.E2tData{
- E2TAddress: swag.String("1.2.3.4"),
- RanNamelist: []string{"ran1","ran2"},
- }
- dataChannel := make(chan *models.E2tData, 10)
- go func() { _, _,_ = recvNewE2Tdata(dataChannel) }()
- defer close(dataChannel)
- dataChannel <- &data2
+func TestE2TwithData(t *testing.T) {
+ data2 := models.E2tData{
+ E2TAddress: swag.String("1.2.3.4"),
+ RanNamelist: []string{"ran1","ran2"},
+ }
+ _, _,_ = recvNewE2Tdata(&data2)
}
-func TestE2TChannelwithNoData(t *testing.T) {
- dataChannel := make(chan *models.E2tData, 10)
- go func() { _, _ ,_= recvNewE2Tdata(dataChannel) }()
- defer close(dataChannel)
+func TestE2TwithNoData(t *testing.T) {
+ _, _,_ = recvNewE2Tdata(nil)
}
func TestProvideXappSubscriptionHandleImpl(t *testing.T) {
Address: swag.String("10.0.0.0"),
Port: &p,
SubscriptionID: swag.Int32(1234)}
- datachannel := make(chan *models.XappSubscriptionData, 10)
- go func() { _ = provideXappSubscriptionHandleImpl(datachannel, &data) }()
- defer close(datachannel)
- datachannel <- &data
-
- //subdel test
+ _ = provideXappSubscriptionHandleImpl(&data)
}
func createMockAppmgrWithData(url string, g []byte, p []byte, t []byte) *httptest.Server {
_ = ioutil.WriteFile(filename, file, 644)
}
-func TestRecvXappCallbackData(t *testing.T) {
- data := models.XappCallbackData{
- XApps: *swag.String("[]"),
- Version: *swag.Int64(1),
- Event: *swag.String("any"),
- ID: *swag.String("123456"),
- }
-
- ch := make(chan *models.XappCallbackData)
- defer close(ch)
- httpRestful := NewHttpRestful()
- go func() { ch <- &data }()
- time.Sleep(1 * time.Second)
- t.Log(string(len(ch)))
- xappList, err := httpRestful.RecvXappCallbackData(ch)
- if err != nil {
- t.Error("Receive failed: " + err.Error())
- } else {
- if xappList == nil {
- t.Error("Expected an XApp notification list")
- } else {
- t.Log("whatever")
- }
- }
-}
-
func TestProvideXappHandleHandlerImpl(t *testing.T) {
- datach := make(chan *models.XappCallbackData, 10)
- defer close(datach)
data := models.XappCallbackData{
XApps: *swag.String("[]"),
Version: *swag.Int64(1),
Event: *swag.String("someevent"),
ID: *swag.String("123456")}
- var httpRestful, _ = GetNbi("httpRESTful")
- err := httpRestful.(*HttpRestful).ProvideXappHandleHandlerImpl(datach, &data)
- if err != nil {
- t.Error("Error occured: " + err.Error())
- } else {
- recv := <-datach
- if recv == nil {
- t.Error("Something gone wrong: " + err.Error())
- } else {
- if recv != &data {
- t.Error("Malformed data on channel")
- }
- }
- }
+ err := provideXappHandleHandlerImpl( &data)
//Empty XappCallbackdata
data1 := models.XappCallbackData{}
- err = httpRestful.(*HttpRestful).ProvideXappHandleHandlerImpl(datach, &data1)
+ err = provideXappHandleHandlerImpl(&data1)
+ t.Log(err)
}
func TestValidateXappCallbackData(t *testing.T) {
_ = PostSubReq("\n","nbifinterface")
_ = PostSubReq("xmurl","\n")
}
+
+func TestInitEngine(t *testing.T) {
+ initRtmgr()
+}
+
+func TestUpdateXappSubscription(t *testing.T) {
+ ep := make(map[string]*rtmgr.Endpoint)
+ ep["dummy"] = &rtmgr.Endpoint{Uuid: "10.0.0.1:0", Name: "E2TERM", XAppType: "app1", Ip: "10.1.1.1", Port: 1234, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: true}
+
+ rtmgr.Eps = ep
+
+
+ p := uint16(1234)
+ xapp := models.XappElement{
+ Address: swag.String("10.1.1.1"),
+ Port: &p,
+ }
+
+ var b models.XappList
+ b = append(b,&xapp)
+ _ = updateXappSubscriptionHandleImpl(&b, 10)
+
+ //Test case when subscriptions already exist
+ data := models.XappSubscriptionData{
+ Address: swag.String("10.0.0.0"),
+ Port: &p,
+ SubscriptionID: swag.Int32(12345)}
+
+ rtmgr.Subs = *stub.ValidSubscriptions
+
+ subscriptionExists(&data)
+ addSubscription(&rtmgr.Subs, &data)
+ _ = updateXappSubscriptionHandleImpl(&b, 10)
+
+
+}
+
+func TestDumpDebugdata(t *testing.T) {
+ _,_ = dumpDebugData()
+}
+
+
type FetchAllXAppsHandler func(string) (*[]rtmgr.XApp, error)
type RecvXappCallbackDataHandler func(<-chan *models.XappCallbackData) (*[]rtmgr.XApp, error)
type RecvNewE2TdataHandler func(<-chan *models.E2tData) (*rtmgr.E2TInstance, string, error)
-type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData, chan<- *rtmgr.XappList, chan<- *models.XappSubscriptionData, chan<- *models.E2tData, chan<- models.RanE2tMap, chan<- models.RanE2tMap, chan<- *models.E2tDeleteData)
+type LaunchRestHandler func(*string)
type ProvideXappHandleHandlerImpl func(chan<- *models.XappCallbackData, *models.XappCallbackData) error
type RetrieveStartupDataHandler func(string, string, string, string, string, sdl.Engine) error
}
type Engine interface {
- Initialize(string, string, string, string, string, sdl.Engine, rpe.Engine, chan<- bool, *sync.Mutex) error
+ Initialize(string, string, string, string, string, sdl.Engine, rpe.Engine, *sync.Mutex) error
Terminate() error
}
==================================================================================
*/
/*
- Mnemonic: nngpub_test.go
+ Mnemonic: rmrpub_test.go
Abstract:
Date: 25 April 2019
*/
if rx != nil {
rxList = []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
}
- messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+ //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+ messageId := rtmgr.Mtype[messageType]
route := rtmgr.RouteTableEntry{
MessageType: messageId,
TxList: txList,
}
}
- messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+ //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+ messageId := rtmgr.Mtype[messageType]
route := rtmgr.RouteTableEntry{
MessageType: messageId,
TxList: txList,
sendEp = subManEp
case "E2MAN":
sendEp = e2ManEp
- //case "UEMAN":
- // sendEp = ueManEp
case "RSM":
sendEp = rsmEp
case "A1MEDIATOR":
xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
xapp.Logger.Debug("Endpoints: %v", endPointList)
}
- /*ueManEp := getEndpointByName(&endPointList, "UEMAN")
- if ueManEp == nil {
- xapp.Logger.Error("Platform component not found: %v", "UE Manger")
- xapp.Logger.Debug("Endpoints: %v", endPointList)
- }*/
rsmEp := getEndpointByName(&endPointList, "RSM")
if rsmEp == nil {
xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
"github.com/ghodss/yaml"
"io/ioutil"
"os"
+ "strings"
)
var (
Eps Endpoints
Subs SubscriptionList
PrsCfg *PlatformRoutes
+ Mtype MessageTypeList
)
func GetPlatformComponents(configfile string) (*PlatformComponents, error) {
xapp.Logger.Debug("Invoked rtmgr.GetPlatformComponents(" + configfile + ")")
var rcfg ConfigRtmgr
var rtroutes RtmgrRoutes
+ var mtypes MessageTypeIdentifier
yamlFile, err := os.Open(configfile)
if err != nil {
return nil, errors.New("cannot open the file due to: " + err.Error())
}
PrsCfg = &(rtroutes.Prs)
+ err = json.Unmarshal(jsonByteValue,&mtypes)
+ if err != nil {
+ return nil, errors.New("cannot parse data due to: " + err.Error())
+ } else {
+ xapp.Logger.Debug("Messgaetypes = %v", mtypes)
+ for _,m := range mtypes.Mit {
+ splitstr := strings.Split(m,"=")
+ Mtype[splitstr[0]] = splitstr[1]
+ }
+ }
err = json.Unmarshal(jsonByteValue, &rcfg)
if err != nil {
return nil, errors.New("cannot parse data due to: " + err.Error())
type SubscriptionList []Subscription
+type MessageTypeList map[string]string
+
//TODO: uuid is not a real UUID but a string of "ip:port"
// this should be changed to real UUID later on which should come from xApp Manager // petszila
type Endpoint struct {
Pcs PlatformComponents `json:"PlatformComponents"`
}
+
+type MessageTypeIdentifier struct {
+ Mit []string `json:"messagetypes"`
+}
+
+
type RicComponents struct {
XApps []XApp
E2Ts map [string]E2TInstance
==================================================================================
*/
/*
- Mnemonic: nngpipe.go
- Abstract: mangos (NNG) Pipeline SBI implementation
+ Mnemonic: rmrpipe.go
+ Abstract: mangos (RMR) Pipeline SBI implementation
Date: 12 March 2019
*/
package sbi
+/*
+#include <rmr/rmr.h>
+*/
+import "C"
+
import (
"bytes"
"crypto/md5"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"routing-manager/pkg/rtmgr"
"strconv"
- //"time"
+ "strings"
"fmt"
)
-type NngPush struct {
+var rmrcallid = 1
+var rmrdynamiccallid = 201
+
+type RmrPush struct {
Sbi
rcChan chan *xapp.RMRParams
}
+type EPStatus struct {
+ endpoint string
+ status bool
+}
+
type RMRParams struct {
*xapp.RMRParams
}
return b.String()
}
-func NewNngPush() *NngPush {
- instance := new(NngPush)
+func NewRmrPush() *RmrPush {
+ instance := new(RmrPush)
return instance
}
-func (c *NngPush) Initialize(ip string) error {
+func (c *RmrPush) Initialize(ip string) error {
return nil
}
-func (c *NngPush) Terminate() error {
+func (c *RmrPush) Terminate() error {
return nil
}
-func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
xapp.Logger.Debug("Invoked sbi.AddEndpoint")
- endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
+ endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
ep.Whid = int(xapp.Rmr.Openwh(endpoint))
if ep.Whid < 0 {
return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
return nil
}
-func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
xapp.Logger.Debug("args: %v", *ep)
return nil
}
-func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
c.updateEndpoints(rcs, c)
}
-func (c *NngPush) DistributeAll(policies *[]string) error {
+func (c *RmrPush) DistributeAll(policies *[]string) error {
xapp.Logger.Debug("Invoked: sbi.DistributeAll")
xapp.Logger.Debug("args: %v", *policies)
- for _, ep := range rtmgr.Eps {
+ /*for _, ep := range rtmgr.Eps {
go c.send(ep, policies)
+ }*/
+ channel := make(chan EPStatus)
+
+ if rmrcallid == 200 {
+ rmrcallid = 1
}
+ for _, ep := range rtmgr.Eps {
+ go c.send_sync(ep, policies, channel, rmrcallid)
+ }
+ rmrcallid++
+
+ count := 0
+ result := make([]EPStatus, len(rtmgr.Eps))
+ for i, _ := range result {
+ result[i] = <-channel
+ if result[i].status == true {
+ count++
+ } else {
+ xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
+ }
+ }
+
+ if count < len(rtmgr.Eps) {
+ return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
+ }
+
+
return nil
}
-func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
- xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
+ xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
- var policy = []byte{}
- cumulative_policy := 0
- count := 0
- maxrecord := xapp.Config.GetInt("maxrecord")
- if maxrecord == 0 {
- maxrecord = 10
- }
+ ret := c.send_data(ep, policies, call_id)
- for _, pe := range *policies {
- b := []byte(pe)
- for j := 0; j < len(b); j++ {
- policy = append(policy, b[j])
- }
- count++
- cumulative_policy++
- if count == maxrecord || cumulative_policy == len(*policies) {
- params := &RMRParams{&xapp.RMRParams{}}
- params.Mtype = 20
- params.PayloadLen = len(policy)
- params.Payload = []byte(policy)
- params.Mbuf = nil
- params.Whid = ep.Whid
- xapp.Rmr.SendMsg(params.RMRParams)
- count = 0
- policy = nil
- xapp.Logger.Debug("Sent message with payload len = %d to %s", params.PayloadLen, ep.Uuid)
- }
- }
+ channel <- EPStatus{ep.Uuid, ret}
- xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
}
-func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
+func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
+ xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
+ var state int
+ var retstr string
+
+ var policy = []byte{}
+
+ for _, pe := range *policies {
+ b:= []byte(pe)
+ for j:=0; j<len(b); j++{
+ policy = append(policy,b[j])
+ }
+ }
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = 20
+ params.PayloadLen = len(policy)
+ params.Payload =[]byte(policy)
+ params.Mbuf = nil
+ params.Whid = ep.Whid
+ params.Callid = call_id
+ params.Timeout = 200
+ state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+ routestatus := strings.Split(retstr," ")
+ if state != C.RMR_OK && routestatus[0] == "OK" {
+ xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+ return false
+ } else {
+ xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+ return true
+ }
+
+ xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
+ return false
+}
+
+func (c *RmrPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
return c.createEndpoint(payload, c)
}
-func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
+func (c *RmrPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
xapp.Logger.Debug("args: %v", *policies)
- go c.send(ep, policies)
+ if rmrdynamiccallid == 255 {
+ rmrdynamiccallid = 201
+ }
+
+ go c.send_data(ep, policies,rmrdynamiccallid)
+ rmrdynamiccallid++
return nil
}
==================================================================================
*/
/*
- Mnemonic: nngpush_test.go
+ Mnemonic: rmrpush_test.go
Abstract:
Date: 3 May 2019
*/
/*
Resets the EndpointList according to argumnets
*/
-func resetTestPushDataset(instance NngPush, testdata []rtmgr.Endpoint) {
+func resetTestPushDataset(instance RmrPush, testdata []rtmgr.Endpoint) {
rtmgr.Eps = make(map[string]*rtmgr.Endpoint)
for _, endpoint := range testdata {
ep := endpoint
}
/*
-nngpush.Initialize() method is empty, nothing to be tested
+rmrpush.Initialize() method is empty, nothing to be tested
*/
-func TestNngPushInitialize(t *testing.T) {
- var nngpush = NngPush{}
+func TestRmrPushInitialize(t *testing.T) {
+ var rmrpush = RmrPush{}
- _ = nngpush.Initialize("")
+ _ = rmrpush.Initialize("")
}
/*
-nngpush.Terminate() method is empty, nothing to be tested
+rmrpush.Terminate() method is empty, nothing to be tested
*/
-func TestNngPushTerminate(t *testing.T) {
- var nngpush = NngPush{}
+func TestRmrPushTerminate(t *testing.T) {
+ var rmrpush = RmrPush{}
- _ = nngpush.Terminate()
+ _ = rmrpush.Terminate()
}
/*
-nngpush.UpdateEndpoints() is testd against stub.ValidXApps dataset
+rmrpush.UpdateEndpoints() is testd against stub.ValidXApps dataset
*/
-func TestNngPushUpdateEndpoints(t *testing.T) {
- var nngpush = NngPush{}
- resetTestPushDataset(nngpush, stub.ValidEndpoints)
+func TestRmrPushUpdateEndpoints(t *testing.T) {
+ var rmrpush = RmrPush{}
+ resetTestPushDataset(rmrpush, stub.ValidEndpoints)
- nngpush.UpdateEndpoints(&stub.ValidRicComponents)
+ rmrpush.UpdateEndpoints(&stub.ValidRicComponents)
if rtmgr.Eps == nil {
- t.Errorf("nngpush.UpdateEndpoints() result was incorrect, got: %v, want: %v.", nil, "rtmgr.Endpoints")
+ t.Errorf("rmrpush.UpdateEndpoints() result was incorrect, got: %v, want: %v.", nil, "rtmgr.Endpoints")
}
}
/*
-nngpush.AddEndpoint() is tested for happy path case
+rmrpush.AddEndpoint() is tested for happy path case
*/
-func TestNngPushAddEndpoint(t *testing.T) {
+func TestRmrPushAddEndpoint(t *testing.T) {
// var err error
- var nngpush = NngPush{}
- resetTestPushDataset(nngpush, stub.ValidEndpoints)
- _ = nngpush.AddEndpoint(rtmgr.Eps["localhost"])
+ var rmrpush = RmrPush{}
+ resetTestPushDataset(rmrpush, stub.ValidEndpoints)
+ _ = rmrpush.AddEndpoint(rtmgr.Eps["localhost"])
/* if err != nil {
- t.Errorf("nngpush.AddEndpoint() return was incorrect, got: %v, want: %v.", err, "nil")
+ t.Errorf("rmrpush.AddEndpoint() return was incorrect, got: %v, want: %v.", err, "nil")
}*/
}
/*
-nngpush.DistributeAll() is tested for happy path case
+rmrpush.DistributeAll() is tested for happy path case
*/
-func TestNngPushDistributeAll(t *testing.T) {
+func TestRmrPushDistributeAll(t *testing.T) {
var err error
- var nngpush = NngPush{}
- resetTestPushDataset(nngpush, stub.ValidEndpoints)
+ var rmrpush = RmrPush{}
+ resetTestPushDataset(rmrpush, stub.ValidEndpoints)
- err = nngpush.DistributeAll(stub.ValidPolicies)
+ err = rmrpush.DistributeAll(stub.ValidPolicies)
if err != nil {
- t.Errorf("nngpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil")
+ t.Errorf("rmrpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil")
}
}
/*
-nngpush.DistributeToEp() is tested for Sending case
+rmrpush.DistributeToEp() is tested for Sending case
*/
func TestDistributeToEp(t *testing.T) {
var err error
- var nngpush = NngPush{}
- resetTestPushDataset(nngpush, stub.ValidEndpoints)
+ var rmrpush = RmrPush{}
+ resetTestPushDataset(rmrpush, stub.ValidEndpoints)
- err = nngpush.DistributeToEp(stub.ValidPolicies,rtmgr.Eps["localhost"])
+ err = rmrpush.DistributeToEp(stub.ValidPolicies,rtmgr.Eps["localhost"])
if err != nil {
- t.Errorf("nngpush.DistributetoEp(policies) was incorrect, got: %v, want: %v.", err, "nil")
+ t.Errorf("rmrpush.DistributetoEp(policies) was incorrect, got: %v, want: %v.", err, "nil")
}
}
func TestDeleteEndpoint(t *testing.T) {
var err error
- var nngpush = NngPush{}
- resetTestPushDataset(nngpush, stub.ValidEndpoints)
+ var rmrpush = RmrPush{}
+ resetTestPushDataset(rmrpush, stub.ValidEndpoints)
- err = nngpush.DeleteEndpoint(rtmgr.Eps["localhost"])
+ err = rmrpush.DeleteEndpoint(rtmgr.Eps["localhost"])
if err != nil {
- t.Errorf("nngpush.DeleteEndpoint() was incorrect, got: %v, want: %v.", err, "nil")
+ t.Errorf("rmrpush.DeleteEndpoint() was incorrect, got: %v, want: %v.", err, "nil")
}
}
func TestCreateEndpoint(t *testing.T) {
- var nngpush = NngPush{}
- resetTestPushDataset(nngpush, stub.ValidEndpoints1)
- nngpush.CreateEndpoint("192.168.0.1:0")
- nngpush.CreateEndpoint("localhost:4560")
+ var rmrpush = RmrPush{}
+ resetTestPushDataset(rmrpush, stub.ValidEndpoints1)
+ rmrpush.CreateEndpoint("192.168.0.1:0")
+ rmrpush.CreateEndpoint("localhost:4560")
}
/*
Initialize and send policies
*/
-func TestNngPushInitializeandsendPolicies(t *testing.T) {
- var nngpush = NngPush{}
- resetTestPushDataset(nngpush, stub.ValidEndpoints)
+func TestRmrPushInitializeandsendPolicies(t *testing.T) {
+ var rmrpush = RmrPush{}
+ resetTestPushDataset(rmrpush, stub.ValidEndpoints)
policies := []string{"hello","welcome"}
- nngpush.send(rtmgr.Eps["localhost"],&policies)
+ rmrpush.send_data(rtmgr.Eps["localhost"],&policies,1)
}
"strings"
)
-const DefaultNngPipelineSocketPrefix = "tcp://"
-const DefaultNngPipelineSocketNumber = 4561
+const DefaultRmrPipelineSocketPrefix = "tcp://"
+const DefaultRmrPipelineSocketNumber = 4561
const PlatformType = "platform"
var (
SupportedSbis = []*EngineConfig{
{
- Name: "nngpush",
+ Name: "rmrpush",
Version: "v1",
- Protocol: "nngpipeline",
- Instance: NewNngPush(),
+ Protocol: "rmrpipeline",
+ Instance: NewRmrPush(),
IsAvailable: true,
},
}
func TestGetSbi(t *testing.T) {
var errtype = errors.New("")
- var sbitype = new(NngPush)
- var invalids = []string{"nngpus", ""}
+ var sbitype = new(RmrPush)
+ var invalids = []string{"rmrpus", ""}
- sbii, err := GetSbi("nngpush")
+ sbii, err := GetSbi("rmrpush")
if err != nil {
- t.Errorf("GetSbi(nngpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(err), nil)
+ t.Errorf("GetSbi(rmrpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(err), nil)
}
if reflect.TypeOf(sbii) != reflect.TypeOf(sbitype) {
- t.Errorf("GetSbi(nngpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(sbii), reflect.TypeOf(sbitype))
+ t.Errorf("GetSbi(rmrpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(sbii), reflect.TypeOf(sbitype))
}
for _, arg := range invalids {
func TestUpdateE2TendPoint(t *testing.T) {
var err error
var sbi = Sbi{}
- sbii, err := GetSbi("nngpush")
+ sbii, err := GetSbi("rmrpush")
var EP = make(map[string]*rtmgr.Endpoint)
EP["127.0.0.2"] = &rtmgr.Endpoint{Uuid: "127.0.0.2", Name: "E2TERM", XAppType: "app1", Ip: "127.0.0.2", Port: 4562, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: false}
rtmgr.Eps = EP
- var nngpush = NngPush{}
- nngpush.AddEndpoint(rtmgr.Eps["127.0.0.2"])
+ var rmrpush = RmrPush{}
+ rmrpush.AddEndpoint(rtmgr.Eps["127.0.0.2"])
var E2map = make(map[string]rtmgr.E2TInstance)
func TestPruneEndpointList(t *testing.T) {
var sbi = Sbi{}
var err error
- sbii, err := GetSbi("nngpush")
+ sbii, err := GetSbi("rmrpush")
var EP = make(map[string]*rtmgr.Endpoint)
EP["127.0.0.2"] = &rtmgr.Endpoint{Uuid: "127.0.0.2", Name: "E2TERM", XAppType: "app1", Ip: "127.0.0.1", Port: 4562, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: false}
rtmgr.Eps = EP
- var nngpush = NngPush{}
- nngpush.AddEndpoint(rtmgr.Eps["127.0.0.2"])
+ var rmrpush = RmrPush{}
+ rmrpush.AddEndpoint(rtmgr.Eps["127.0.0.2"])
sbi.pruneEndpointList(sbii)
t.Log(err)
DistributeToEp(*[]string, *rtmgr.Endpoint) error
}
-type NngSocket interface {
+/*type NngSocket interface {
Listen(string) error
Send([]byte) error
Close() error
DialOptions(string, map[string]interface{}) error
}
-type CreateNewNngSocketHandler func() (NngSocket, error)
+type CreateNewNngSocketHandler func() (NngSocket, error)*/