From: wahidw Date: Wed, 5 Feb 2020 10:01:12 +0000 (+0000) Subject: Handling of subscription merge and inclusion of RMR lib from xapp-framework X-Git-Tag: 0.4.15~9 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=cd7867c8f527f46fd8702b0b8d6b380a8e134bea;p=ric-plt%2Frtmgr.git Handling of subscription merge and inclusion of RMR lib from xapp-framework Change-Id: I51f28bf758e542629263fcc83c1e9c13b2b26f3e Signed-off-by: wahidw --- diff --git a/RELNOTES b/RELNOTES index c9f5423..e78a029 100644 --- a/RELNOTES +++ b/RELNOTES @@ -1,3 +1,6 @@ +### v0.4.11 +* Added code for subscription merge and added RMR from xapp-framework + ### v0.4.10 * Temporary Fix for R3 (E2M->E2T issue) - retrying when is_Ready flag in socket handle is false diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go index ceb28e8..bd17d4f 100644 --- a/cmd/rtmgr.go +++ b/cmd/rtmgr.go @@ -145,6 +145,9 @@ func main() { var m sync.Mutex + c := sbi.NewControl() + go c.Run() + serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m) os.Exit(0) } diff --git a/container-tag.yaml b/container-tag.yaml index 42ff158..f8f7249 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: 0.4.10 +tag: 0.4.11 diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index 1d5e1fa..ac56aa0 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -38,8 +38,8 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "github.com/go-openapi/loads" "github.com/go-openapi/runtime/middleware" - "net/url" "net" + "net/url" "os" "routing-manager/pkg/models" "routing-manager/pkg/restapi" @@ -49,9 +49,9 @@ import ( "routing-manager/pkg/rtmgr" "routing-manager/pkg/sdl" "strconv" - "time" - "sync" "strings" + "sync" + "time" ) //var myClient = &http.Client{Timeout: 1 * time.Second} @@ -60,7 +60,7 @@ type HttpRestful struct { Engine LaunchRest LaunchRestHandler RecvXappCallbackData RecvXappCallbackDataHandler - RecvNewE2Tdata RecvNewE2TdataHandler + RecvNewE2Tdata RecvNewE2TdataHandler ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl RetrieveStartupData RetrieveStartupDataHandler } @@ -69,7 +69,7 @@ func NewHttpRestful() *HttpRestful { instance := new(HttpRestful) instance.LaunchRest = launchRest instance.RecvXappCallbackData = recvXappCallbackData - instance.RecvNewE2Tdata = recvNewE2Tdata + instance.RecvNewE2Tdata = recvNewE2Tdata instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl instance.RetrieveStartupData = retrieveStartupData return instance @@ -99,37 +99,37 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr return nil, nil } -func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance,string,error) { - var e2tData *models.E2tData +func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) { + var e2tData *models.E2tData var str string - xapp.Logger.Info("data received") + xapp.Logger.Info("data received") - e2tData = <-dataChannel + e2tData = <-dataChannel - if nil != e2tData { + if nil != e2tData { - e2tinst := rtmgr.E2TInstance { - Ranlist : make([]string, len(e2tData.RanNamelist)), + e2tinst := rtmgr.E2TInstance{ + Ranlist: make([]string, len(e2tData.RanNamelist)), + } + + e2tinst.Fqdn = *e2tData.E2TAddress + e2tinst.Name = "E2TERMINST" + copy(e2tinst.Ranlist, e2tData.RanNamelist) + if len(e2tData.RanNamelist) > 0 { + var meidar string + for _, meid := range e2tData.RanNamelist { + meidar += meid + " " } + str = "mme_ar|" + *e2tData.E2TAddress + "|" + strings.TrimSuffix(meidar, " ") + } + return &e2tinst, str, nil - e2tinst.Fqdn = *e2tData.E2TAddress - e2tinst.Name = "E2TERMINST" - copy(e2tinst.Ranlist, e2tData.RanNamelist) - if (len(e2tData.RanNamelist) > 0) { - var meidar string - for _, meid := range e2tData.RanNamelist { - meidar += meid + " " - } - str = "mme_ar|" + *e2tData.E2TAddress + "|" + strings.TrimSuffix(meidar," ") + } else { + xapp.Logger.Info("No data") } - return &e2tinst,str,nil - - } else { - xapp.Logger.Info("No data") - } - xapp.Logger.Debug("Nothing received on the Http interface") - return nil,str,nil + xapp.Logger.Debug("Nothing received on the Http interface") + return nil, str, nil } func validateXappCallbackData(callbackData *models.XappCallbackData) error { @@ -172,21 +172,21 @@ func validateXappSubscriptionData(data *models.XappSubscriptionData) error { func validateE2tData(data *models.E2tData) error { e2taddress_key := *data.E2TAddress - if (e2taddress_key == "") { - return fmt.Errorf("E2TAddress is empty!!!") - } + if e2taddress_key == "" { + return fmt.Errorf("E2TAddress is empty!!!") + } stringSlice := strings.Split(e2taddress_key, ":") - if (len(stringSlice) == 1) { - return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key ) + if len(stringSlice) == 1 { + return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key) } _, err := net.LookupIP(stringSlice[0]) if err != nil { return fmt.Errorf("E2T E2TAddress DNS look up failed, E2TAddress: %v", stringSlice[0]) - } + } if checkValidaE2TAddress(e2taddress_key) { - return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v",e2taddress_key) + return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v", e2taddress_key) } return nil @@ -194,21 +194,20 @@ func validateE2tData(data *models.E2tData) error { func validateDeleteE2tData(data *models.E2tDeleteData) error { - if (*data.E2TAddress == "") { - return fmt.Errorf("E2TAddress is empty!!!") - } + if *data.E2TAddress == "" { + return fmt.Errorf("E2TAddress is empty!!!") + } for _, element := range data.RanAssocList { e2taddress_key := *element.E2TAddress stringSlice := strings.Split(e2taddress_key, ":") - if (len(stringSlice) == 1) { + if len(stringSlice) == 1 { return fmt.Errorf("E2T Delete - RanAssocList E2TAddress is not a proper format like ip:port, %v", e2taddress_key) } - if !checkValidaE2TAddress(e2taddress_key) { - return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key) + return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v", e2taddress_key) } } @@ -267,16 +266,41 @@ func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscription return nil } +func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data *models.XappList, subid uint16) error { + xapp.Logger.Debug("Invoked updateXappSubscriptionHandleImpl") + + var fqdnlist []rtmgr.FqDn + for _, item := range *data { + fqdnlist = append(fqdnlist, rtmgr.FqDn(*item)) + } + xapplist := rtmgr.XappList{SubscriptionID: subid, FqdnList: fqdnlist} + var subdata models.XappSubscriptionData + var id int32 + id = int32(subid) + subdata.SubscriptionID = &id + for _, items := range fqdnlist { + subdata.Address = items.Address + subdata.Port = items.Port + err := validateXappSubscriptionData(&subdata) + if err != nil { + xapp.Logger.Error(err.Error()) + return err + } + } + subupdatechan <- &xapplist + return nil +} + func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData, - data *models.E2tData) error { - xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl") - err := validateE2tData(data) - if err != nil { - xapp.Logger.Error(err.Error()) - return err - } - e2taddchan <- data - return nil + data *models.E2tData) error { + xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl") + err := validateE2tData(data) + if err != nil { + xapp.Logger.Error(err.Error()) + return err + } + e2taddchan <- data + return nil } func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error { @@ -290,7 +314,7 @@ func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error { e2taddress_key := *element.E2TAddress if !checkValidaE2TAddress(e2taddress_key) { - return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key) + return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v", e2taddress_key) } } @@ -298,44 +322,44 @@ func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error { } func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap, - data models.RanE2tMap) error { - xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl") + data models.RanE2tMap) error { + xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl") err := validateE2TAddressRANListData(data) if err != nil { xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error()) return err } assranchan <- data - return nil + return nil } func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap, - data models.RanE2tMap) error { - xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl") + data models.RanE2tMap) error { + xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl") err := validateE2TAddressRANListData(data) if err != nil { xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error()) return err } disassranchan <- data - return nil + return nil } func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData, - data *models.E2tDeleteData) error { - xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl") + data *models.E2tDeleteData) error { + xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl") - err := validateDeleteE2tData(data) - if err != nil { - xapp.Logger.Error(err.Error()) - return err - } + err := validateDeleteE2tData(data) + if err != nil { + xapp.Logger.Error(err.Error()) + return err + } - e2tdelchan <- data - return nil + e2tdelchan <- data + return nil } -func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, +func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, subupdatechan chan<- *rtmgr.XappList, subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) { swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON) if err != nil { @@ -392,49 +416,60 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c return handle.NewGetHandlesOK() } }) - api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc( - func(params handle.CreateNewE2tHandleParams) middleware.Responder { - err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData) - if err != nil { - return handle.NewCreateNewE2tHandleBadRequest() - } else { + api.HandleUpdateXappSubscriptionHandleHandler = handle.UpdateXappSubscriptionHandleHandlerFunc( + func(params handle.UpdateXappSubscriptionHandleParams) middleware.Responder { + err := updateXappSubscriptionHandleImpl(subupdatechan, ¶ms.XappList, params.SubscriptionID) + if err != nil { + return handle.NewUpdateXappSubscriptionHandleBadRequest() + } else { + //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints + time.Sleep(1 * time.Second) + return handle.NewUpdateXappSubscriptionHandleCreated() + } + }) + api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc( + func(params handle.CreateNewE2tHandleParams) middleware.Responder { + err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData) + if err != nil { + return handle.NewCreateNewE2tHandleBadRequest() + } else { time.Sleep(1 * time.Second) - return handle.NewCreateNewE2tHandleCreated() - } - }) + return handle.NewCreateNewE2tHandleCreated() + } + }) - api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc( + api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc( func(params handle.AssociateRanToE2tHandleParams) middleware.Responder { - err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList) + err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList) if err != nil { - return handle.NewAssociateRanToE2tHandleBadRequest() - } else { + return handle.NewAssociateRanToE2tHandleBadRequest() + } else { time.Sleep(1 * time.Second) - return handle.NewAssociateRanToE2tHandleCreated() - } - }) + return handle.NewAssociateRanToE2tHandleCreated() + } + }) - api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc( - func(params handle.DissociateRanParams) middleware.Responder { + api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc( + func(params handle.DissociateRanParams) middleware.Responder { err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList) if err != nil { - return handle.NewDissociateRanBadRequest() - } else { + return handle.NewDissociateRanBadRequest() + } else { time.Sleep(1 * time.Second) - return handle.NewDissociateRanCreated() - } - }) - - api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc( - func(params handle.DeleteE2tHandleParams) middleware.Responder { - err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData) - if err != nil { - return handle.NewDeleteE2tHandleBadRequest() - } else { + return handle.NewDissociateRanCreated() + } + }) + + api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc( + func(params handle.DeleteE2tHandleParams) middleware.Responder { + err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData) + if err != nil { + return handle.NewDeleteE2tHandleBadRequest() + } else { time.Sleep(1 * time.Second) - return handle.NewDeleteE2tHandleCreated() - } - }) + return handle.NewDeleteE2tHandleCreated() + } + }) // start to serve API xapp.Logger.Info("Starting the HTTP Rest service") if err := server.Serve(); err != nil { @@ -479,7 +514,7 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile } xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.") // Combine the xapps data and platform data before writing to the SDL - ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: make(map[string]rtmgr.E2TInstance)} + ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: make(map[string]rtmgr.E2TInstance)} writeErr := sdlEngine.WriteAll(fileName, ricData) if writeErr != nil { xapp.Logger.Error(writeErr.Error()) @@ -510,13 +545,14 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co datach := make(chan *models.XappCallbackData, 10) subschan := make(chan *models.XappSubscriptionData, 10) subdelchan := make(chan *models.XappSubscriptionData, 10) + subupdatechan := make(chan *rtmgr.XappList, 10) e2taddchan := make(chan *models.E2tData, 10) associateranchan := make(chan models.RanE2tMap, 10) disassociateranchan := make(chan models.RanE2tMap, 10) e2tdelchan := make(chan *models.E2tDeleteData, 10) xapp.Logger.Info("Launching Rest Http service") go func() { - r.LaunchRest(&nbiif, datach, subschan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan) + r.LaunchRest(&nbiif, datach, subschan, subupdatechan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan) }() go func() { @@ -555,56 +591,65 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co } }() - go func() { - for { - xapp.Logger.Debug("received create New E2T data") + go func() { + for { + data := <-subupdatechan + xapp.Logger.Debug("received XApp subscription Merge data") + updateSubscription(data) + triggerSBI <- true + } + }() + + go func() { + for { - data, meiddata,_ := r.RecvNewE2Tdata(e2taddchan) - if data != nil { + data, meiddata, _ := r.RecvNewE2Tdata(e2taddchan) + if data != nil { + xapp.Logger.Debug("received create New E2T data") m.Lock() - sdlEngine.WriteNewE2TInstance(fileName, data,meiddata) + sdlEngine.WriteNewE2TInstance(fileName, data, meiddata) m.Unlock() - triggerSBI <- true - } - } - }() + triggerSBI <- true + } + } + }() - go func() { - for { + go func() { + for { data := <-associateranchan - xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager") + xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager") m.Lock() - sdlEngine.WriteAssRANToE2TInstance(fileName, data) + sdlEngine.WriteAssRANToE2TInstance(fileName, data) m.Unlock() - triggerSBI <- true - } - }() + triggerSBI <- true + } + }() - go func() { - for { + go func() { + for { data := <-disassociateranchan - xapp.Logger.Debug("received disassociate RANs from E2T instance") + xapp.Logger.Debug("received disassociate RANs from E2T instance") m.Lock() - sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data) + sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data) m.Unlock() - triggerSBI <- true - } - }() + triggerSBI <- true + } + }() - go func() { - for { - xapp.Logger.Debug("received Delete E2T data") + go func() { + for { data := <-e2tdelchan - if data != nil { + xapp.Logger.Debug("received Delete E2T data") + if data != nil { m.Lock() - sdlEngine.WriteDeleteE2TInstance(fileName, data) + sdlEngine.WriteDeleteE2TInstance(fileName, data) m.Unlock() - triggerSBI <- true - } - } - }() + triggerSBI <- true + } + } + }() return nil } @@ -614,6 +659,7 @@ func (r *HttpRestful) Terminate() error { } func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool { + xapp.Logger.Debug("Adding the subscription into the subscriptions list") var b = false sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port} for _, elem := range *subs { @@ -647,3 +693,39 @@ func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubsc } return present } + +func updateSubscription(data *rtmgr.XappList) { + + var subdata models.XappSubscriptionData + var id int32 + var matchingsubid, deletecount uint8 + id = int32(data.SubscriptionID) + subdata.SubscriptionID = &id + for _, subs := range rtmgr.Subs { + if int32(data.SubscriptionID) == subs.SubID { + matchingsubid++ + } + } + + for deletecount < matchingsubid { + for _, subs := range rtmgr.Subs { + if int32(data.SubscriptionID) == subs.SubID { + subdata.SubscriptionID = &subs.SubID + subdata.Address = &subs.Fqdn + subdata.Port = &subs.Port + xapp.Logger.Debug("Deletion Subscription List has %v", subdata) + delSubscription(&rtmgr.Subs, &subdata) + break + } + } + deletecount++ + } + + for _, items := range data.FqdnList { + subdata.Address = items.Address + subdata.Port = items.Port + xapp.Logger.Debug("Adding Subscription List has %v", subdata) + addSubscription(&rtmgr.Subs, &subdata) + } + +} diff --git a/pkg/nbi/types.go b/pkg/nbi/types.go index 1acf09c..80e7fd2 100644 --- a/pkg/nbi/types.go +++ b/pkg/nbi/types.go @@ -40,7 +40,7 @@ import ( type FetchAllXAppsHandler func(string) (*[]rtmgr.XApp, error) type RecvXappCallbackDataHandler func(<-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) type RecvNewE2TdataHandler func(<-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) -type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData, chan<- *models.XappSubscriptionData, chan<- *models.E2tData, chan<- models.RanE2tMap, chan<- models.RanE2tMap, chan<- *models.E2tDeleteData) +type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData, chan<- *rtmgr.XappList, chan<- *models.XappSubscriptionData, chan<- *models.E2tData, chan<- models.RanE2tMap, chan<- models.RanE2tMap, chan<- *models.E2tDeleteData) type ProvideXappHandleHandlerImpl func(chan<- *models.XappCallbackData, *models.XappCallbackData) error type RetrieveStartupDataHandler func(string, string, string, string, sdl.Engine) error diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go index 8ee9b52..1c006b4 100644 --- a/pkg/rpe/rpe.go +++ b/pkg/rpe/rpe.go @@ -152,7 +152,7 @@ func (r *Rpe) addRoute_rx_list(messageType string, tx *rtmgr.Endpoint, rx []rtmg // xapp.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId) } -func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { +func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { xapp.Logger.Debug("rpe.generateXappRoutes invoked") xapp.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType) if xAppEp.XAppType != sbi.PlatformType && (len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0) { @@ -179,7 +179,7 @@ func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoin } -func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { +func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { xapp.Logger.Debug("rpe.addSubscriptionRoutes invoked") subscriptionList := &rtmgr.Subs for _, subscription := range *subscriptionList { @@ -196,14 +196,14 @@ func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, e2TermE r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID, "") r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID, "") //E2 Termination -> xApp - r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID, "") - r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID, "") - r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID, "") + r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, subscription.SubID, "") + r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, subscription.SubID, "") + r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, subscription.SubID, "") } } } -func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, ueManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { +func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { xapp.Logger.Debug("rpe.generatePlatformRoutes invoked") //Platform Routes --- Subscription Routes //Subscription Manager -> E2 Termination @@ -215,8 +215,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr. sendEp = subManEp case "E2MAN": sendEp = e2ManEp - case "UEMAN": - sendEp = ueManEp + //case "UEMAN": + // sendEp = ueManEp case "RSM": sendEp = rsmEp case "A1MEDIATOR": @@ -227,8 +227,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr. Ep = subManEp case "E2MAN": Ep = e2ManEp - case "UEMAN": - Ep = ueManEp + //case "UEMAN": + // Ep = ueManEp case "RSM": Ep = rsmEp case "A1MEDIATOR": @@ -248,11 +248,11 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable xapp.Logger.Debug("rpe.generateRouteTable invoked") xapp.Logger.Debug("Endpoint List: %v", endPointList) routeTable := &rtmgr.RouteTable{} - e2TermEp := getEndpointByName(&endPointList, "E2TERM") + /*e2TermEp := getEndpointByName(&endPointList, "E2TERM") if e2TermEp == nil { xapp.Logger.Error("Platform component not found: %v", "E2 Termination") xapp.Logger.Debug("Endpoints: %v", endPointList) - } + }*/ subManEp := getEndpointByName(&endPointList, "SUBMAN") if subManEp == nil { xapp.Logger.Error("Platform component not found: %v", "Subscription Manager") @@ -263,11 +263,11 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable xapp.Logger.Error("Platform component not found: %v", "E2 Manager") xapp.Logger.Debug("Endpoints: %v", endPointList) } - ueManEp := getEndpointByName(&endPointList, "UEMAN") + /*ueManEp := getEndpointByName(&endPointList, "UEMAN") if ueManEp == nil { xapp.Logger.Error("Platform component not found: %v", "UE Manger") xapp.Logger.Debug("Endpoints: %v", endPointList) - } + }*/ rsmEp := getEndpointByName(&endPointList, "RSM") if rsmEp == nil { xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager") @@ -284,13 +284,13 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable xapp.Logger.Error("Platform component not found: %v", "E2 Termination List") xapp.Logger.Debug("Endpoints: %v", endPointList) } - r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, ueManEp, rsmEp, A1MediatorEp, routeTable) + r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable) for _, endPoint := range endPointList { xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType) if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) { - r.generateXappRoutes(endPoint, e2TermEp, subManEp, routeTable) - r.generateSubscriptionRoutes(endPoint, e2TermEp, subManEp, routeTable) + r.generateXappRoutes(endPoint, subManEp, routeTable) + r.generateSubscriptionRoutes(endPoint, subManEp, routeTable) } } return routeTable diff --git a/pkg/rtmgr/types.go b/pkg/rtmgr/types.go index 91a8aeb..b9c4cd6 100644 --- a/pkg/rtmgr/types.go +++ b/pkg/rtmgr/types.go @@ -105,8 +105,8 @@ type RicComponents struct { type Subscription struct { SubID int32 - Fqdn string - Port uint16 + Fqdn string + Port uint16 } type PlatformRoutes []struct { @@ -121,4 +121,12 @@ type RtmgrRoutes struct { Prs PlatformRoutes `json:"PlatformRoutes"` } +type FqDn struct { + Address *string + Port *uint16 +} +type XappList struct { + SubscriptionID uint16 + FqdnList []FqDn +} diff --git a/pkg/sbi/control.go b/pkg/sbi/control.go new file mode 100644 index 0000000..67d8eca --- /dev/null +++ b/pkg/sbi/control.go @@ -0,0 +1,67 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + This source code is part of the near-RT RIC (RAN Intelligent Controller) + platform project (RICP). + +================================================================================== +*/ +package sbi + +import "C" + +import ( + "errors" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "strconv" +) + + +func NewControl() Control { + + return Control{make(chan *xapp.RMRParams)} +} + + +type Control struct { + rcChan chan *xapp.RMRParams +} + + +func (c *Control) Run() { + go c.controlLoop() + xapp.Run(c) +} + +func (c *Control) Consume(rp *xapp.RMRParams) (err error) { + c.rcChan <- rp + return +} + +func (c *Control) controlLoop() { + for { + msg := <-c.rcChan + switch msg.Mtype { + case xapp.RICMessageTypes["RIC_SUB_REQ"]: + xapp.Logger.Info("Message handling when RMR instance queries for Routes") + default: + err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded") + xapp.Logger.Error("Unknown message type: %v", err) + } + } +} + diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index ac61c0b..1404319 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -28,6 +28,20 @@ package sbi +/* +#include +#include +#include +#include +#include +#include + + +#cgo CFLAGS: -I../ +#cgo LDFLAGS: -lrmr_nng -lnng +*/ +import "C" + import ( "errors" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" @@ -36,12 +50,14 @@ import ( _ "nanomsg.org/go/mangos/v2/transport/all" "routing-manager/pkg/rtmgr" "strconv" - "time" ) + + type NngPush struct { Sbi NewSocket CreateNewNngSocketHandler + rcChan chan *xapp.RMRParams } func NewNngPush() *NngPush { @@ -130,40 +146,14 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error { return nil } -/* func (c *NngPush) DistributeAll(policies *[]string) error { xapp.Logger.Debug("Invoked: sbi.DistributeAll") xapp.Logger.Debug("args: %v", *policies) for _, ep := range rtmgr.Eps { - if ep.IsReady { - go c.send(ep, policies) - } else { - xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready") - } - } - } - return nil -} - -*/ - -/* - Temporary solution for R3 - E2M -> E2T issue -*/ -func (c *NngPush) DistributeAll(policies *[]string) error { - xapp.Logger.Debug("Invoked: sbi.DistributeAll") - xapp.Logger.Debug("args: %v", *policies) - for _, ep := range rtmgr.Eps { - i := 1 - for i< 5 { - if ep.IsReady { - go c.send(ep, policies) - break - } else { - xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i)) - time.Sleep(10 * time.Millisecond) - i++ - } + if ep.IsReady { + go c.send(ep, policies) + } else { + xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready") } } return nil