From bce67475ab8f92b42841dba561fff27b7d239820 Mon Sep 17 00:00:00 2001 From: wahidw Date: Mon, 15 Jun 2020 13:52:55 +0000 Subject: [PATCH] Making Route Distribution Synchronous Change-Id: Ibe7dc23933f446fc433e6e583d6044f3e5e93d88 Signed-off-by: wahidw --- RELNOTES | 3 + cmd/rtmgr.go | 113 +------------------ container-tag.yaml | 2 +- manifests/rtmgr/rtmgr-cfg.yaml | 19 ++++ pkg/nbi/control.go | 85 +++++++++++++- pkg/nbi/httpgetter.go | 2 +- pkg/nbi/httprestful.go | 251 +++++++++++------------------------------ pkg/nbi/httprestful_test.go | 183 ++++++++++++++---------------- pkg/nbi/types.go | 4 +- pkg/rpe/rmr_test.go | 2 +- pkg/rpe/rpe.go | 13 +-- pkg/rtmgr/rtmgr.go | 13 +++ pkg/rtmgr/types.go | 8 ++ pkg/sbi/nngpush.go | 147 ++++++++++++++++-------- pkg/sbi/nngpush_test.go | 90 +++++++-------- pkg/sbi/sbi.go | 10 +- pkg/sbi/sbi_test.go | 22 ++-- pkg/sbi/types.go | 4 +- 18 files changed, 453 insertions(+), 518 deletions(-) diff --git a/RELNOTES b/RELNOTES index f6465d3..d2a370b 100644 --- a/RELNOTES +++ b/RELNOTES @@ -1,3 +1,6 @@ +### v0.6.2 +* Distribution of routes is synchronous. + ### v0.6.1 * Updating xapp_fwk to v0.4.15, that contains RIC_HEALTH_CHECK message types diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go index 63b67b7..af0aba1 100644 --- a/cmd/rtmgr.go +++ b/cmd/rtmgr.go @@ -39,109 +39,15 @@ import ( "os" "os/signal" "routing-manager/pkg/nbi" - "routing-manager/pkg/rpe" + //"routing-manager/pkg/rpe" "routing-manager/pkg/rtmgr" - "routing-manager/pkg/sbi" - "routing-manager/pkg/sdl" + //"routing-manager/pkg/sbi" + //"routing-manager/pkg/sdl" "syscall" "time" - "sync" ) const SERVICENAME = "rtmgr" -const INTERVAL time.Duration = 60 - -func initRtmgr() (nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, err error) { - if nbiEngine, err = nbi.GetNbi(xapp.Config.GetString("nbi")); err == nil && nbiEngine != nil { - if sbiEngine, err = sbi.GetSbi(xapp.Config.GetString("sbi")); err == nil && sbiEngine != nil { - if sdlEngine, err = sdl.GetSdl(xapp.Config.GetString("sdl")); err == nil && sdlEngine != nil { - if rpeEngine, err = rpe.GetRpe(xapp.Config.GetString("rpe")); err == nil && rpeEngine != nil { - return nbiEngine, sbiEngine, sdlEngine, rpeEngine, nil - } - } - } - } - return nil, nil, nil, nil, err -} - - - -func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { - for { - if <-triggerSBI { - m.Lock() - data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) - m.Unlock() - if err != nil || data == nil { - xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) - continue - } - sbiEngine.UpdateEndpoints(data) - policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) - err = sbiEngine.DistributeAll(policies) - if err != nil { - xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) - } - } - } -} - -func sendRoutesToAll(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) { - - data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) - if err != nil || data == nil { - xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) - return - } - sbiEngine.UpdateEndpoints(data) - policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) - err = sbiEngine.DistributeAll(policies) - if err != nil { - xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) - return - } -} - - -func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { - - triggerSBI := make(chan bool) - - nbiErr := nbiEngine.Initialize(xapp.Config.GetString("xmurl"), xapp.Config.GetString("nbiurl"), xapp.Config.GetString("rtfile"), xapp.Config.GetString("cfgfile"), xapp.Config.GetString("e2murl"), - sdlEngine, rpeEngine, triggerSBI, m) - if nbiErr != nil { - xapp.Logger.Error("Failed to initialize nbi due to: " + nbiErr.Error()) - return - } - - err := sbiEngine.Initialize(xapp.Config.GetString("sbiurl")) - if err != nil { - xapp.Logger.Info("Failed to open push socket due to: " + err.Error()) - return - } - defer nbiEngine.Terminate() - defer sbiEngine.Terminate() - - // This SBI Go routine is trtiggered by periodic main loop and when data is recieved on REST interface. - go serveSBI(triggerSBI, sbiEngine, sdlEngine, rpeEngine, m) - - for { - if xapp.Config.GetString("nbi") == "httpGetter" { - data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXApps(xapp.Config.GetString("xmurl")) - if err != nil { - xapp.Logger.Error("Cannot fetch xapp data due to: " + err.Error()) - } else if data != nil { - sdlEngine.WriteXApps(xapp.Config.GetString("rtfile"), data) - } - } - - sendRoutesToAll(sbiEngine, sdlEngine, rpeEngine) - - rtmgr.Rtmgr_ready = true - time.Sleep(INTERVAL * time.Second) - xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.") - } -} func SetupCloseHandler() { c := make(chan os.Signal, 2) @@ -155,23 +61,16 @@ func SetupCloseHandler() { func main() { - nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr() - if err != nil { - xapp.Logger.Error(err.Error()) - os.Exit(1) - } - SetupCloseHandler() xapp.Logger.Info("Start " + SERVICENAME + " service") rtmgr.Eps = make(rtmgr.Endpoints) + rtmgr.Mtype = make(rtmgr.MessageTypeList) rtmgr.Rtmgr_ready = false - var m sync.Mutex - // RMR thread is starting port: 4560 c := nbi.NewControl() - go c.Run(sbiEngine, sdlEngine, rpeEngine, &m) + go c.Run() // Waiting for RMR to be ready time.Sleep(time.Duration(2) * time.Second) @@ -182,6 +81,6 @@ func main() { dummy_whid := int(xapp.Rmr.Openwh("localhost:4560")) xapp.Logger.Info("created dummy Wormhole ID for routingmanager and dummy_whid :%d", dummy_whid) - serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m) + nbi.Serve() os.Exit(0) } diff --git a/container-tag.yaml b/container-tag.yaml index 944d34a..bff5250 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: 0.6.1 +tag: 0.6.2 diff --git a/manifests/rtmgr/rtmgr-cfg.yaml b/manifests/rtmgr/rtmgr-cfg.yaml index 55aab19..471dca3 100644 --- a/manifests/rtmgr/rtmgr-cfg.yaml +++ b/manifests/rtmgr/rtmgr-cfg.yaml @@ -64,6 +64,25 @@ data: "protPort": "tcp:4560" "maxSize": 2072 "numWorkers": 1 + "RTFILE": + "/db/rt.json" + "NBIURL": + "http://localhost:3800" + } subscription: host: "127.0.0.1:8089" + + #xmurl: "http://service-ricplt-appmgr-http:8080/ric/v1/xapps" + #e2murl: "http://service-ricplt-e2mgr-http:3800/v1/e2t/list" + #rtfile: "/db/rt.json" + #CFGFILE: "/cfg/rtmgr-config.yaml" + #rpe: "rmrpush" + #s#bi: "rmrpush" + #s#biurl: "0.0.0.0" + #nbi: "httpRESTful" + #nbiurl: "http://service-ricplt-rtmgr-http:3800" + ##sdl: "file" + #local: + #host: ":8080" + diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go index ccf4e57..5483aba 100644 --- a/pkg/nbi/control.go +++ b/pkg/nbi/control.go @@ -33,10 +33,20 @@ import ( "routing-manager/pkg/sdl" "strconv" "sync" + "time" + "os" ) -func NewControl() Control { +var m sync.Mutex + +var nbiEngine Engine +var sbiEngine sbi.Engine +var sdlEngine sdl.Engine +var rpeEngine rpe.Engine + +const INTERVAL time.Duration = 60 +func NewControl() Control { return Control{make(chan *xapp.RMRParams)} } @@ -44,8 +54,15 @@ type Control struct { rcChan chan *xapp.RMRParams } -func (c *Control) Run(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { - go c.controlLoop(sbiEngine, sdlEngine, rpeEngine, m) + +func (c *Control) Run() { + var err error + go c.controlLoop() + nbiEngine, sbiEngine, sdlEngine, rpeEngine, err = initRtmgr() + if err != nil { + xapp.Logger.Error(err.Error()) + os.Exit(1) + } xapp.Run(c) } @@ -54,7 +71,20 @@ func (c *Control) Consume(rp *xapp.RMRParams) (err error) { return } -func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { +func initRtmgr() (nbiEngine Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, err error) { + if nbiEngine, err = GetNbi(xapp.Config.GetString("nbi")); err == nil && nbiEngine != nil { + if sbiEngine, err = sbi.GetSbi(xapp.Config.GetString("sbi")); err == nil && sbiEngine != nil { + if sdlEngine, err = sdl.GetSdl(xapp.Config.GetString("sdl")); err == nil && sdlEngine != nil { + if rpeEngine, err = rpe.GetRpe(xapp.Config.GetString("rpe")); err == nil && rpeEngine != nil { + return nbiEngine, sbiEngine, sdlEngine, rpeEngine, nil + } + } + } + } + return nil, nil, nil, nil, err +} + +func (c *Control) controlLoop() { for { msg := <-c.rcChan xapp_msg := sbi.RMRParams{msg} @@ -64,7 +94,7 @@ func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEng xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready") } else { xapp.Logger.Info("Update Route Table Request(RMR to RM)") - go c.handleUpdateToRoutingManagerRequest(msg, sbiEngine, sdlEngine, rpeEngine, m) + go c.handleUpdateToRoutingManagerRequest(msg) } case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]: xapp.Logger.Info("state of table to route mgr %s,payload %s", xapp_msg.String(), msg.Payload) @@ -77,7 +107,7 @@ func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEng } } -func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { +func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams) { msg := sbi.RMRParams{params} @@ -105,3 +135,46 @@ func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sb return } } + +func sendRoutesToAll() (err error) { + + m.Lock() + data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) + m.Unlock() + if err != nil || data == nil { + return errors.New("Cannot get data from sdl interface due to: " + err.Error()) + } + sbiEngine.UpdateEndpoints(data) + policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) + err = sbiEngine.DistributeAll(policies) + if err != nil { + return errors.New("Routing table cannot be published due to: " + err.Error()) + } + + return nil +} + +func Serve() { + + nbiErr := nbiEngine.Initialize(xapp.Config.GetString("xmurl"), xapp.Config.GetString("nbiurl"), xapp.Config.GetString("rtfile"), xapp.Config.GetString("cfgfile"), xapp.Config.GetString("e2murl"), sdlEngine, rpeEngine, &m) + if nbiErr != nil { + xapp.Logger.Error("Failed to initialize nbi due to: " + nbiErr.Error()) + return + } + + err := sbiEngine.Initialize(xapp.Config.GetString("sbiurl")) + if err != nil { + xapp.Logger.Info("Failed to open push socket due to: " + err.Error()) + return + } + defer nbiEngine.Terminate() + defer sbiEngine.Terminate() + + for { + sendRoutesToAll() + + rtmgr.Rtmgr_ready = true + time.Sleep(INTERVAL * time.Second) + xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.") + } +} diff --git a/pkg/nbi/httpgetter.go b/pkg/nbi/httpgetter.go index fca8367..7de6b96 100644 --- a/pkg/nbi/httpgetter.go +++ b/pkg/nbi/httpgetter.go @@ -78,7 +78,7 @@ func fetchAllXApps(xmurl string) (*[]rtmgr.XApp, error) { } func (g *HttpGetter) Initialize(xmurl string, nbiif string, fileName string, configfile string, e2murl string, - sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error { + sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) error { return nil } diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index 98209e7..f49f3ae 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -56,39 +56,20 @@ import ( "time" ) -//var myClient = &http.Client{Timeout: 1 * time.Second} - type HttpRestful struct { Engine LaunchRest LaunchRestHandler - RecvXappCallbackData RecvXappCallbackDataHandler - RecvNewE2Tdata RecvNewE2TdataHandler - ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl RetrieveStartupData RetrieveStartupDataHandler } func NewHttpRestful() *HttpRestful { instance := new(HttpRestful) instance.LaunchRest = launchRest - instance.RecvXappCallbackData = recvXappCallbackData - instance.RecvNewE2Tdata = recvNewE2Tdata - instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl instance.RetrieveStartupData = retrieveStartupData return instance } -// ToDo: Use Range over channel. Read and return only the latest one. -func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) { - var xappData *models.XappCallbackData - // Drain the channel as we are only looking for the latest value until - // xapp manager sends all xapp data with every request. - length := len(dataChannel) - //xapp.Logger.Info(length) - for i := 0; i <= length; i++ { - xapp.Logger.Info("data received") - // If no data received from the REST, it blocks. - xappData = <-dataChannel - } +func recvXappCallbackData(xappData *models.XappCallbackData) (*[]rtmgr.XApp, error) { if nil != xappData { var xapps []rtmgr.XApp err := json.Unmarshal([]byte(xappData.XApps), &xapps) @@ -101,13 +82,10 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr return nil, nil } -func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) { - var e2tData *models.E2tData +func recvNewE2Tdata(e2tData *models.E2tData) (*rtmgr.E2TInstance, string, error) { var str string xapp.Logger.Info("data received") - e2tData = <-dataChannel - if nil != e2tData { e2tinst := rtmgr.E2TInstance{ @@ -146,7 +124,7 @@ func validateXappCallbackData(callbackData *models.XappCallbackData) error { return nil } -func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error { +func provideXappHandleHandlerImpl(data *models.XappCallbackData) error { if data != nil { xapp.Logger.Debug("Received callback data") } @@ -155,7 +133,20 @@ func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data * xapp.Logger.Warn("XApp callback data validation failed: " + err.Error()) return err } else { - datach <- data + appdata, err := recvXappCallbackData(data) + if err != nil { + xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error()) + } else if appdata != nil { + xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.") + alldata, err1 := httpGetXApps(xapp.Config.GetString("xmurl")) + if alldata != nil && err1 == nil { + m.Lock() + sdlEngine.WriteXApps(xapp.Config.GetString("rtfile"), alldata) + m.Unlock() + return sendRoutesToAll() + } + } + return nil } } @@ -223,18 +214,17 @@ func checkValidaE2TAddress(e2taddress string) bool { } -func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData, - data *models.XappSubscriptionData) error { +func provideXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error { xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl") err := validateXappSubscriptionData(data) if err != nil { xapp.Logger.Error(err.Error()) return err } - subchan <- data - //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port))) + xapp.Logger.Debug("received XApp subscription data") + addSubscription(&rtmgr.Subs, data) xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps) - return nil + return sendRoutesToAll() } func subscriptionExists(data *models.XappSubscriptionData) bool { @@ -249,8 +239,7 @@ func subscriptionExists(data *models.XappSubscriptionData) bool { return present } -func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData, - data *models.XappSubscriptionData) error { +func deleteXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error { xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl") err := validateXappSubscriptionData(data) if err != nil { @@ -264,11 +253,13 @@ func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscription return err } - subdelchan <- data - return nil + xapp.Logger.Debug("received XApp subscription delete data") + delSubscription(&rtmgr.Subs, data) + return sendRoutesToAll() + } -func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data *models.XappList, subid uint16) error { +func updateXappSubscriptionHandleImpl(data *models.XappList, subid uint16) error { xapp.Logger.Debug("Invoked updateXappSubscriptionHandleImpl") var fqdnlist []rtmgr.FqDn @@ -289,20 +280,26 @@ func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data return err } } - subupdatechan <- &xapplist - return nil + xapp.Logger.Debug("received XApp subscription Merge data") + updateSubscription(&xapplist) + return sendRoutesToAll() } -func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData, - data *models.E2tData) error { +func createNewE2tHandleHandlerImpl(data *models.E2tData) error { xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl") err := validateE2tData(data) if err != nil { xapp.Logger.Error(err.Error()) return err } - e2taddchan <- data - return nil + //e2taddchan <- data + e2data, meiddata, _ := recvNewE2Tdata(data) + xapp.Logger.Debug("received create New E2T data") + m.Lock() + sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata) + m.Unlock() + return sendRoutesToAll() + } func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error { @@ -323,32 +320,37 @@ func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error { return nil } -func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap, - data models.RanE2tMap) error { +func associateRanToE2THandlerImpl(data models.RanE2tMap) error { xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl") err := validateE2TAddressRANListData(data) if err != nil { xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error()) return err } - assranchan <- data - return nil + xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager") + m.Lock() + sdlEngine.WriteAssRANToE2TInstance(xapp.Config.GetString("rtfile"), data) + m.Unlock() + return sendRoutesToAll() + } -func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap, - data models.RanE2tMap) error { +func disassociateRanToE2THandlerImpl(data models.RanE2tMap) error { xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl") err := validateE2TAddressRANListData(data) if err != nil { xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error()) return err } - disassranchan <- data - return nil + xapp.Logger.Debug("received disassociate RANs from E2T instance") + m.Lock() + sdlEngine.WriteDisAssRANFromE2TInstance(xapp.Config.GetString("rtfile"), data) + m.Unlock() + return sendRoutesToAll() + } -func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData, - data *models.E2tDeleteData) error { +func deleteE2tHandleHandlerImpl(data *models.E2tDeleteData) error { xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl") err := validateDeleteE2tData(data) @@ -356,9 +358,11 @@ func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData, xapp.Logger.Error(err.Error()) return err } + m.Lock() + sdlEngine.WriteDeleteE2TInstance(xapp.Config.GetString("rtfile"), data) + m.Unlock() + return sendRoutesToAll() - e2tdelchan <- data - return nil } func dumpDebugData() (models.Debuginfo, error) { @@ -378,8 +382,7 @@ func dumpDebugData() (models.Debuginfo, error) { return response, err } -func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, subupdatechan chan<- *rtmgr.XappList, - subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) { +func launchRest(nbiif *string){ swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON) if err != nil { //log.Fatalln(err) @@ -405,7 +408,7 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc( func(params handle.ProvideXappHandleParams) middleware.Responder { xapp.Logger.Info("Data received on Http interface") - err := provideXappHandleHandlerImpl(datach, params.XappCallbackData) + err := provideXappHandleHandlerImpl(params.XappCallbackData) if err != nil { xapp.Logger.Error("Invalid XApp callback data: " + err.Error()) return handle.NewProvideXappHandleBadRequest() @@ -415,77 +418,67 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c }) api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc( func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder { - err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData) + err := provideXappSubscriptionHandleImpl(params.XappSubscriptionData) if err != nil { return handle.NewProvideXappSubscriptionHandleBadRequest() } else { - //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints - time.Sleep(1 * time.Second) return handle.NewGetHandlesOK() } }) api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc( func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder { - err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData) + err := deleteXappSubscriptionHandleImpl(params.XappSubscriptionData) if err != nil { return handle.NewDeleteXappSubscriptionHandleNoContent() } else { - //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints - time.Sleep(1 * time.Second) return handle.NewGetHandlesOK() } }) api.HandleUpdateXappSubscriptionHandleHandler = handle.UpdateXappSubscriptionHandleHandlerFunc( func(params handle.UpdateXappSubscriptionHandleParams) middleware.Responder { - err := updateXappSubscriptionHandleImpl(subupdatechan, ¶ms.XappList, params.SubscriptionID) + err := updateXappSubscriptionHandleImpl(¶ms.XappList, params.SubscriptionID) if err != nil { return handle.NewUpdateXappSubscriptionHandleBadRequest() } else { - //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints - time.Sleep(1 * time.Second) return handle.NewUpdateXappSubscriptionHandleCreated() } }) api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc( func(params handle.CreateNewE2tHandleParams) middleware.Responder { - err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData) + err := createNewE2tHandleHandlerImpl(params.E2tData) if err != nil { return handle.NewCreateNewE2tHandleBadRequest() } else { - time.Sleep(1 * time.Second) return handle.NewCreateNewE2tHandleCreated() } }) api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc( func(params handle.AssociateRanToE2tHandleParams) middleware.Responder { - err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList) + err := associateRanToE2THandlerImpl(params.RanE2tList) if err != nil { return handle.NewAssociateRanToE2tHandleBadRequest() } else { - time.Sleep(1 * time.Second) return handle.NewAssociateRanToE2tHandleCreated() } }) api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc( func(params handle.DissociateRanParams) middleware.Responder { - err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList) + err := disassociateRanToE2THandlerImpl(params.DissociateList) if err != nil { return handle.NewDissociateRanBadRequest() } else { - time.Sleep(1 * time.Second) return handle.NewDissociateRanCreated() } }) api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc( func(params handle.DeleteE2tHandleParams) middleware.Responder { - err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData) + err := deleteE2tHandleHandlerImpl(params.E2tData) if err != nil { return handle.NewDeleteE2tHandleBadRequest() } else { - time.Sleep(1 * time.Second) return handle.NewDeleteE2tHandleCreated() } }) @@ -670,120 +663,14 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile } func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string, e2murl string, - sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error { + sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) error { err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, e2murl, sdlEngine) if err != nil { xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error()) return err } - - datach := make(chan *models.XappCallbackData, 10) - subschan := make(chan *models.XappSubscriptionData, 10) - subdelchan := make(chan *models.XappSubscriptionData, 10) - subupdatechan := make(chan *rtmgr.XappList, 10) - e2taddchan := make(chan *models.E2tData, 10) - associateranchan := make(chan models.RanE2tMap, 10) - disassociateranchan := make(chan models.RanE2tMap, 10) - e2tdelchan := make(chan *models.E2tDeleteData, 10) - xapp.Logger.Info("Launching Rest Http service") - go func() { - r.LaunchRest(&nbiif, datach, subschan, subupdatechan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan) - }() - - go func() { - for { - data, err := r.RecvXappCallbackData(datach) - if err != nil { - xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error()) - } else if data != nil { - xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.") - alldata, err1 := httpGetXApps(xmurl) - if alldata != nil && err1 == nil { - m.Lock() - sdlEngine.WriteXApps(fileName, alldata) - m.Unlock() - triggerSBI <- true - } - } - } - }() - - go func() { - for { - data := <-subschan - xapp.Logger.Debug("received XApp subscription data") - addSubscription(&rtmgr.Subs, data) - triggerSBI <- true - } - }() - - go func() { - for { - data := <-subdelchan - xapp.Logger.Debug("received XApp subscription delete data") - delSubscription(&rtmgr.Subs, data) - triggerSBI <- true - } - }() - - go func() { - for { - data := <-subupdatechan - xapp.Logger.Debug("received XApp subscription Merge data") - updateSubscription(data) - triggerSBI <- true - } - }() - - go func() { - for { - - data, meiddata, _ := r.RecvNewE2Tdata(e2taddchan) - if data != nil { - xapp.Logger.Debug("received create New E2T data") - m.Lock() - sdlEngine.WriteNewE2TInstance(fileName, data, meiddata) - m.Unlock() - triggerSBI <- true - } - } - }() - - go func() { - for { - data := <-associateranchan - xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager") - m.Lock() - sdlEngine.WriteAssRANToE2TInstance(fileName, data) - m.Unlock() - triggerSBI <- true - } - }() - go func() { - for { - - data := <-disassociateranchan - xapp.Logger.Debug("received disassociate RANs from E2T instance") - m.Lock() - sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data) - m.Unlock() - triggerSBI <- true - } - }() - - go func() { - for { - - data := <-e2tdelchan - xapp.Logger.Debug("received Delete E2T data") - if data != nil { - m.Lock() - sdlEngine.WriteDeleteE2TInstance(fileName, data) - m.Unlock() - triggerSBI <- true - } - } + r.LaunchRest(&nbiif) }() return nil diff --git a/pkg/nbi/httprestful_test.go b/pkg/nbi/httprestful_test.go index 5a9ae8c..bebc4f7 100644 --- a/pkg/nbi/httprestful_test.go +++ b/pkg/nbi/httprestful_test.go @@ -43,7 +43,6 @@ import ( "routing-manager/pkg/sdl" "routing-manager/pkg/stub" "testing" - "time" "sync" "github.com/go-openapi/swag" ) @@ -113,7 +112,6 @@ func TestValidateXappSubscriptionsData(t *testing.T) { Port: &p, SubscriptionID: swag.Int32(123456)} err = validateXappSubscriptionData(&data1) - t.Log(err) //Validate E2tData data2 := models.E2tData{ @@ -121,24 +119,31 @@ func TestValidateXappSubscriptionsData(t *testing.T) { } /*err = validateE2tData(&data2)*/ - e2tchannel := make(chan *models.E2tData, 10) - _ = createNewE2tHandleHandlerImpl(e2tchannel, &data2) - defer close(e2tchannel) + //e2tchannel := make(chan *models.E2tData, 10) + _ = createNewE2tHandleHandlerImpl(&data2) + //defer close(e2tchannel) //test case for provideXappSubscriptionHandleImp - datachannel := make(chan *models.XappSubscriptionData, 10) - _ = provideXappSubscriptionHandleImpl(datachannel, &data1) - defer close(datachannel) + //datachannel := make(chan *models.XappSubscriptionData, 10) + sdlEngine, _ = sdl.GetSdl("file") + _ = provideXappSubscriptionHandleImpl( &data1) + //defer close(datachannel) //test case for deleteXappSubscriptionHandleImpl - _ = deleteXappSubscriptionHandleImpl(datachannel, &data1) + _ = deleteXappSubscriptionHandleImpl(&data1) data3 := models.XappSubscriptionData{ Address: swag.String("10.55.55.5"), Port: &p, SubscriptionID: swag.Int32(123456)} //test case for deleteXappSubscriptionHandleImpl - _ = deleteXappSubscriptionHandleImpl(datachannel, &data3) + _ = deleteXappSubscriptionHandleImpl(&data3) + data4 := models.XappSubscriptionData{ + Address: swag.String("1.5.5.5"), + Port: &p, + SubscriptionID: swag.Int32(1236)} + _ = deleteXappSubscriptionHandleImpl(&data4) + } func TestValidateE2tDataEmpty(t *testing.T) { @@ -174,6 +179,8 @@ func TestValidateE2tDatavalid(t *testing.T) { err := validateE2tData(&data) t.Log(err) + _ = createNewE2tHandleHandlerImpl(&data) + } func TestValidateE2tDatavalidEndpointPresent(t *testing.T) { @@ -321,13 +328,12 @@ func TestValidateE2TAddressRANListData(t *testing.T) { func TestAssociateRanToE2THandlerImpl(t *testing.T) { - associateranchan := make(chan models.RanE2tMap, 10) data := models.RanE2tMap{ { E2TAddress: swag.String("10.101.01.1:8098"), }, } - err := associateRanToE2THandlerImpl(associateranchan, data) + err := associateRanToE2THandlerImpl( data) if (err != nil ) { t.Log(err) } @@ -345,13 +351,11 @@ func TestAssociateRanToE2THandlerImpl(t *testing.T) { E2TAddress: swag.String("10.101.01.1:8098"), }, } - err = associateRanToE2THandlerImpl(associateranchan, data) + err = associateRanToE2THandlerImpl(data) if (err != nil ) { t.Log(err) } - data1 := <-associateranchan - fmt.Println(data1) //################ Delete End Point dummy entry delete(rtmgr.Eps, uuid); //##################### @@ -359,14 +363,13 @@ func TestAssociateRanToE2THandlerImpl(t *testing.T) { func TestDisassociateRanToE2THandlerImpl(t *testing.T) { - disassranchan := make(chan models.RanE2tMap, 10) data := models.RanE2tMap{ { E2TAddress: swag.String("10.101.01.1:8098"), }, } - err := disassociateRanToE2THandlerImpl(disassranchan, data) + err := disassociateRanToE2THandlerImpl(data) if (err != nil ) { t.Log(err) } @@ -383,13 +386,11 @@ func TestDisassociateRanToE2THandlerImpl(t *testing.T) { E2TAddress: swag.String("10.101.01.1:8098"), }, } - err = disassociateRanToE2THandlerImpl(disassranchan, data) + err = disassociateRanToE2THandlerImpl(data) if (err != nil ) { t.Log(err) } - data1 := <-disassranchan - fmt.Println(data1) //################ Delete End Point dummy entry delete(rtmgr.Eps, uuid); //##################### @@ -397,11 +398,10 @@ func TestDisassociateRanToE2THandlerImpl(t *testing.T) { func TestDeleteE2tHandleHandlerImpl(t *testing.T) { - e2tdelchan := make(chan *models.E2tDeleteData, 10) data := models.E2tDeleteData{ E2TAddress: swag.String(""), } - err := deleteE2tHandleHandlerImpl(e2tdelchan, &data) + err := deleteE2tHandleHandlerImpl(&data) if (err != nil ) { t.Log(err) } @@ -417,13 +417,10 @@ func TestDeleteE2tHandleHandlerImpl(t *testing.T) { data = models.E2tDeleteData{ E2TAddress: swag.String("10.101.01.1:8098"), } - err = deleteE2tHandleHandlerImpl(e2tdelchan, &data) + err = deleteE2tHandleHandlerImpl(&data) if (err != nil ) { t.Log(err) } - data1 := <-e2tdelchan - - fmt.Println(data1) //################ Delete End Point dummy entry delete(rtmgr.Eps, uuid); //##################### @@ -466,47 +463,38 @@ func TestHttpInstance(t *testing.T) { err := httpinstance.Terminate() t.Log(err) - triggerSBI := make(chan bool) createMockPlatformComponents() //ts := createMockAppmgrWithData("127.0.0.1:3000", BasicXAppLists, nil) //ts.Start() //defer ts.Close() var m sync.Mutex - err = httpinstance.Initialize(XMURL, "httpgetter", "rt.json", "config.json", E2MURL, sdlEngine, rpeEngine, triggerSBI, &m) + err = httpinstance.Initialize(XMURL, "httpgetter", "rt.json", "config.json", E2MURL, sdlEngine, rpeEngine, &m) } -func TestXappCallbackDataChannelwithdata(t *testing.T) { +func TestXappCallbackWithData(t *testing.T) { data := models.XappCallbackData{ XApps: *swag.String("[]"), Version: *swag.Int64(1), Event: *swag.String("someevent"), ID: *swag.String("123456")} - datach := make(chan *models.XappCallbackData, 1) - go func() { _, _ = recvXappCallbackData(datach) }() - defer close(datach) - datach <- &data + _, _ = recvXappCallbackData(&data) } -func TestXappCallbackDataChannelNodata(t *testing.T) { - datach := make(chan *models.XappCallbackData, 1) - go func() { _, _ = recvXappCallbackData(datach) }() - defer close(datach) + +func TestXappCallbackNodata(t *testing.T) { + //data := *models.XappCallbackData + _, _ = recvXappCallbackData(nil) } -func TestE2TChannelwithData(t *testing.T) { - data2 := models.E2tData{ - E2TAddress: swag.String("1.2.3.4"), - RanNamelist: []string{"ran1","ran2"}, - } - dataChannel := make(chan *models.E2tData, 10) - go func() { _, _,_ = recvNewE2Tdata(dataChannel) }() - defer close(dataChannel) - dataChannel <- &data2 +func TestE2TwithData(t *testing.T) { + data2 := models.E2tData{ + E2TAddress: swag.String("1.2.3.4"), + RanNamelist: []string{"ran1","ran2"}, + } + _, _,_ = recvNewE2Tdata(&data2) } -func TestE2TChannelwithNoData(t *testing.T) { - dataChannel := make(chan *models.E2tData, 10) - go func() { _, _ ,_= recvNewE2Tdata(dataChannel) }() - defer close(dataChannel) +func TestE2TwithNoData(t *testing.T) { + _, _,_ = recvNewE2Tdata(nil) } func TestProvideXappSubscriptionHandleImpl(t *testing.T) { @@ -515,12 +503,7 @@ func TestProvideXappSubscriptionHandleImpl(t *testing.T) { Address: swag.String("10.0.0.0"), Port: &p, SubscriptionID: swag.Int32(1234)} - datachannel := make(chan *models.XappSubscriptionData, 10) - go func() { _ = provideXappSubscriptionHandleImpl(datachannel, &data) }() - defer close(datachannel) - datachannel <- &data - - //subdel test + _ = provideXappSubscriptionHandleImpl(&data) } func createMockAppmgrWithData(url string, g []byte, p []byte, t []byte) *httptest.Server { @@ -579,58 +562,18 @@ func createMockPlatformComponents() { _ = ioutil.WriteFile(filename, file, 644) } -func TestRecvXappCallbackData(t *testing.T) { - data := models.XappCallbackData{ - XApps: *swag.String("[]"), - Version: *swag.Int64(1), - Event: *swag.String("any"), - ID: *swag.String("123456"), - } - - ch := make(chan *models.XappCallbackData) - defer close(ch) - httpRestful := NewHttpRestful() - go func() { ch <- &data }() - time.Sleep(1 * time.Second) - t.Log(string(len(ch))) - xappList, err := httpRestful.RecvXappCallbackData(ch) - if err != nil { - t.Error("Receive failed: " + err.Error()) - } else { - if xappList == nil { - t.Error("Expected an XApp notification list") - } else { - t.Log("whatever") - } - } -} - func TestProvideXappHandleHandlerImpl(t *testing.T) { - datach := make(chan *models.XappCallbackData, 10) - defer close(datach) data := models.XappCallbackData{ XApps: *swag.String("[]"), Version: *swag.Int64(1), Event: *swag.String("someevent"), ID: *swag.String("123456")} - var httpRestful, _ = GetNbi("httpRESTful") - err := httpRestful.(*HttpRestful).ProvideXappHandleHandlerImpl(datach, &data) - if err != nil { - t.Error("Error occured: " + err.Error()) - } else { - recv := <-datach - if recv == nil { - t.Error("Something gone wrong: " + err.Error()) - } else { - if recv != &data { - t.Error("Malformed data on channel") - } - } - } + err := provideXappHandleHandlerImpl( &data) //Empty XappCallbackdata data1 := models.XappCallbackData{} - err = httpRestful.(*HttpRestful).ProvideXappHandleHandlerImpl(datach, &data1) + err = provideXappHandleHandlerImpl(&data1) + t.Log(err) } func TestValidateXappCallbackData(t *testing.T) { @@ -738,3 +681,45 @@ func TestInvalidarguments(t *testing.T) { _ = PostSubReq("\n","nbifinterface") _ = PostSubReq("xmurl","\n") } + +func TestInitEngine(t *testing.T) { + initRtmgr() +} + +func TestUpdateXappSubscription(t *testing.T) { + ep := make(map[string]*rtmgr.Endpoint) + ep["dummy"] = &rtmgr.Endpoint{Uuid: "10.0.0.1:0", Name: "E2TERM", XAppType: "app1", Ip: "10.1.1.1", Port: 1234, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: true} + + rtmgr.Eps = ep + + + p := uint16(1234) + xapp := models.XappElement{ + Address: swag.String("10.1.1.1"), + Port: &p, + } + + var b models.XappList + b = append(b,&xapp) + _ = updateXappSubscriptionHandleImpl(&b, 10) + + //Test case when subscriptions already exist + data := models.XappSubscriptionData{ + Address: swag.String("10.0.0.0"), + Port: &p, + SubscriptionID: swag.Int32(12345)} + + rtmgr.Subs = *stub.ValidSubscriptions + + subscriptionExists(&data) + addSubscription(&rtmgr.Subs, &data) + _ = updateXappSubscriptionHandleImpl(&b, 10) + + +} + +func TestDumpDebugdata(t *testing.T) { + _,_ = dumpDebugData() +} + + diff --git a/pkg/nbi/types.go b/pkg/nbi/types.go index f538ac4..722289e 100644 --- a/pkg/nbi/types.go +++ b/pkg/nbi/types.go @@ -40,7 +40,7 @@ import ( type FetchAllXAppsHandler func(string) (*[]rtmgr.XApp, error) type RecvXappCallbackDataHandler func(<-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) type RecvNewE2TdataHandler func(<-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) -type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData, chan<- *rtmgr.XappList, chan<- *models.XappSubscriptionData, chan<- *models.E2tData, chan<- models.RanE2tMap, chan<- models.RanE2tMap, chan<- *models.E2tDeleteData) +type LaunchRestHandler func(*string) type ProvideXappHandleHandlerImpl func(chan<- *models.XappCallbackData, *models.XappCallbackData) error type RetrieveStartupDataHandler func(string, string, string, string, string, sdl.Engine) error @@ -53,6 +53,6 @@ type EngineConfig struct { } type Engine interface { - Initialize(string, string, string, string, string, sdl.Engine, rpe.Engine, chan<- bool, *sync.Mutex) error + Initialize(string, string, string, string, string, sdl.Engine, rpe.Engine, *sync.Mutex) error Terminate() error } diff --git a/pkg/rpe/rmr_test.go b/pkg/rpe/rmr_test.go index 950cd99..fe61356 100644 --- a/pkg/rpe/rmr_test.go +++ b/pkg/rpe/rmr_test.go @@ -22,7 +22,7 @@ ================================================================================== */ /* - Mnemonic: nngpub_test.go + Mnemonic: rmrpub_test.go Abstract: Date: 25 April 2019 */ diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go index b080bcb..39c2bb1 100644 --- a/pkg/rpe/rpe.go +++ b/pkg/rpe/rpe.go @@ -115,7 +115,8 @@ func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoin if rx != nil { rxList = []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}} } - messageId := strconv.Itoa(xapp.RICMessageTypes[messageType]) + //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType]) + messageId := rtmgr.Mtype[messageType] route := rtmgr.RouteTableEntry{ MessageType: messageId, TxList: txList, @@ -143,7 +144,8 @@ func (r *Rpe) addRoute_rx_list(messageType string, tx *rtmgr.Endpoint, rx []rtmg } } - messageId := strconv.Itoa(xapp.RICMessageTypes[messageType]) + //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType]) + messageId := rtmgr.Mtype[messageType] route := rtmgr.RouteTableEntry{ MessageType: messageId, TxList: txList, @@ -250,8 +252,6 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr. sendEp = subManEp case "E2MAN": sendEp = e2ManEp - //case "UEMAN": - // sendEp = ueManEp case "RSM": sendEp = rsmEp case "A1MEDIATOR": @@ -300,11 +300,6 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable xapp.Logger.Error("Platform component not found: %v", "E2 Manager") xapp.Logger.Debug("Endpoints: %v", endPointList) } - /*ueManEp := getEndpointByName(&endPointList, "UEMAN") - if ueManEp == nil { - xapp.Logger.Error("Platform component not found: %v", "UE Manger") - xapp.Logger.Debug("Endpoints: %v", endPointList) - }*/ rsmEp := getEndpointByName(&endPointList, "RSM") if rsmEp == nil { xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager") diff --git a/pkg/rtmgr/rtmgr.go b/pkg/rtmgr/rtmgr.go index 06d0f15..b636e12 100644 --- a/pkg/rtmgr/rtmgr.go +++ b/pkg/rtmgr/rtmgr.go @@ -35,6 +35,7 @@ import ( "github.com/ghodss/yaml" "io/ioutil" "os" + "strings" ) var ( @@ -122,12 +123,14 @@ var ( Eps Endpoints Subs SubscriptionList PrsCfg *PlatformRoutes + Mtype MessageTypeList ) func GetPlatformComponents(configfile string) (*PlatformComponents, error) { xapp.Logger.Debug("Invoked rtmgr.GetPlatformComponents(" + configfile + ")") var rcfg ConfigRtmgr var rtroutes RtmgrRoutes + var mtypes MessageTypeIdentifier yamlFile, err := os.Open(configfile) if err != nil { return nil, errors.New("cannot open the file due to: " + err.Error()) @@ -147,6 +150,16 @@ func GetPlatformComponents(configfile string) (*PlatformComponents, error) { } PrsCfg = &(rtroutes.Prs) + err = json.Unmarshal(jsonByteValue,&mtypes) + if err != nil { + return nil, errors.New("cannot parse data due to: " + err.Error()) + } else { + xapp.Logger.Debug("Messgaetypes = %v", mtypes) + for _,m := range mtypes.Mit { + splitstr := strings.Split(m,"=") + Mtype[splitstr[0]] = splitstr[1] + } + } err = json.Unmarshal(jsonByteValue, &rcfg) if err != nil { return nil, errors.New("cannot parse data due to: " + err.Error()) diff --git a/pkg/rtmgr/types.go b/pkg/rtmgr/types.go index 2846173..138c2b5 100644 --- a/pkg/rtmgr/types.go +++ b/pkg/rtmgr/types.go @@ -39,6 +39,8 @@ type Endpoints map[string]*Endpoint type SubscriptionList []Subscription +type MessageTypeList map[string]string + //TODO: uuid is not a real UUID but a string of "ip:port" // this should be changed to real UUID later on which should come from xApp Manager // petszila type Endpoint struct { @@ -102,6 +104,12 @@ type ConfigRtmgr struct { Pcs PlatformComponents `json:"PlatformComponents"` } + +type MessageTypeIdentifier struct { + Mit []string `json:"messagetypes"` +} + + type RicComponents struct { XApps []XApp E2Ts map [string]E2TInstance diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index 74e077b..1f0e0e6 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -21,13 +21,18 @@ ================================================================================== */ /* - Mnemonic: nngpipe.go - Abstract: mangos (NNG) Pipeline SBI implementation + Mnemonic: rmrpipe.go + Abstract: mangos (RMR) Pipeline SBI implementation Date: 12 March 2019 */ package sbi +/* +#include +*/ +import "C" + import ( "bytes" "crypto/md5" @@ -35,15 +40,23 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "routing-manager/pkg/rtmgr" "strconv" - //"time" + "strings" "fmt" ) -type NngPush struct { +var rmrcallid = 1 +var rmrdynamiccallid = 201 + +type RmrPush struct { Sbi rcChan chan *xapp.RMRParams } +type EPStatus struct { + endpoint string + status bool +} + type RMRParams struct { *xapp.RMRParams } @@ -55,23 +68,23 @@ func (params *RMRParams) String() string { return b.String() } -func NewNngPush() *NngPush { - instance := new(NngPush) +func NewRmrPush() *RmrPush { + instance := new(RmrPush) return instance } -func (c *NngPush) Initialize(ip string) error { +func (c *RmrPush) Initialize(ip string) error { return nil } -func (c *NngPush) Terminate() error { +func (c *RmrPush) Terminate() error { return nil } -func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error { +func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error { xapp.Logger.Debug("Invoked sbi.AddEndpoint") - endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber) + endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber) ep.Whid = int(xapp.Rmr.Openwh(endpoint)) if ep.Whid < 0 { return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid)) @@ -82,7 +95,7 @@ func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error { return nil } -func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error { +func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error { xapp.Logger.Debug("Invoked sbi. DeleteEndpoint") xapp.Logger.Debug("args: %v", *ep) @@ -90,65 +103,105 @@ func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error { return nil } -func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) { +func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) { c.updateEndpoints(rcs, c) } -func (c *NngPush) DistributeAll(policies *[]string) error { +func (c *RmrPush) DistributeAll(policies *[]string) error { xapp.Logger.Debug("Invoked: sbi.DistributeAll") xapp.Logger.Debug("args: %v", *policies) - for _, ep := range rtmgr.Eps { + /*for _, ep := range rtmgr.Eps { go c.send(ep, policies) + }*/ + channel := make(chan EPStatus) + + if rmrcallid == 200 { + rmrcallid = 1 } + for _, ep := range rtmgr.Eps { + go c.send_sync(ep, policies, channel, rmrcallid) + } + rmrcallid++ + + count := 0 + result := make([]EPStatus, len(rtmgr.Eps)) + for i, _ := range result { + result[i] = <-channel + if result[i].status == true { + count++ + } else { + xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint) + } + } + + if count < len(rtmgr.Eps) { + return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps))) + } + + return nil } -func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) { - xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid) +func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) { + xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid) - var policy = []byte{} - cumulative_policy := 0 - count := 0 - maxrecord := xapp.Config.GetInt("maxrecord") - if maxrecord == 0 { - maxrecord = 10 - } + ret := c.send_data(ep, policies, call_id) - for _, pe := range *policies { - b := []byte(pe) - for j := 0; j < len(b); j++ { - policy = append(policy, b[j]) - } - count++ - cumulative_policy++ - if count == maxrecord || cumulative_policy == len(*policies) { - params := &RMRParams{&xapp.RMRParams{}} - params.Mtype = 20 - params.PayloadLen = len(policy) - params.Payload = []byte(policy) - params.Mbuf = nil - params.Whid = ep.Whid - xapp.Rmr.SendMsg(params.RMRParams) - count = 0 - policy = nil - xapp.Logger.Debug("Sent message with payload len = %d to %s", params.PayloadLen, ep.Uuid) - } - } + channel <- EPStatus{ep.Uuid, ret} - xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")") } -func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint { +func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool { + xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id)) + var state int + var retstr string + + var policy = []byte{} + + for _, pe := range *policies { + b:= []byte(pe) + for j:=0; j