X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=pkg%2Fnbi%2Fhttprestful.go;h=25be854501839d3e236a7c2010130a20e4d0b6ff;hb=b177846d915b7a70c97a74ca73c26d0964fd6a49;hp=db8129ea7f44338519195ff7e5246d7abdd557d4;hpb=a8596ec6db91b8a45a1a21421a726b9b05ce7d48;p=ric-plt%2Frtmgr.git diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index db8129e..25be854 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -36,18 +36,23 @@ import ( "errors" "fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + xfmodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "github.com/go-openapi/loads" "github.com/go-openapi/runtime/middleware" + "net" "net/url" "os" "routing-manager/pkg/models" "routing-manager/pkg/restapi" "routing-manager/pkg/restapi/operations" + "routing-manager/pkg/restapi/operations/debug" "routing-manager/pkg/restapi/operations/handle" "routing-manager/pkg/rpe" "routing-manager/pkg/rtmgr" "routing-manager/pkg/sdl" "strconv" + "strings" + "sync" "time" ) @@ -57,6 +62,7 @@ type HttpRestful struct { Engine LaunchRest LaunchRestHandler RecvXappCallbackData RecvXappCallbackDataHandler + RecvNewE2Tdata RecvNewE2TdataHandler ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl RetrieveStartupData RetrieveStartupDataHandler } @@ -65,6 +71,7 @@ func NewHttpRestful() *HttpRestful { instance := new(HttpRestful) instance.LaunchRest = launchRest instance.RecvXappCallbackData = recvXappCallbackData + instance.RecvNewE2Tdata = recvNewE2Tdata instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl instance.RetrieveStartupData = retrieveStartupData return instance @@ -94,6 +101,39 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr return nil, nil } +func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) { + var e2tData *models.E2tData + var str string + xapp.Logger.Info("data received") + + e2tData = <-dataChannel + + if nil != e2tData { + + e2tinst := rtmgr.E2TInstance{ + Ranlist: make([]string, len(e2tData.RanNamelist)), + } + + e2tinst.Fqdn = *e2tData.E2TAddress + e2tinst.Name = "E2TERMINST" + copy(e2tinst.Ranlist, e2tData.RanNamelist) + if len(e2tData.RanNamelist) > 0 { + var meidar string + for _, meid := range e2tData.RanNamelist { + meidar += meid + " " + } + str = "mme_ar|" + *e2tData.E2TAddress + "|" + strings.TrimSuffix(meidar, " ") + } + return &e2tinst, str, nil + + } else { + xapp.Logger.Info("No data") + } + + xapp.Logger.Debug("Nothing received on the Http interface") + return nil, str, nil +} + func validateXappCallbackData(callbackData *models.XappCallbackData) error { if len(callbackData.XApps) == 0 { return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps) @@ -131,6 +171,58 @@ func validateXappSubscriptionData(data *models.XappSubscriptionData) error { return err } +func validateE2tData(data *models.E2tData) error { + + e2taddress_key := *data.E2TAddress + if e2taddress_key == "" { + return fmt.Errorf("E2TAddress is empty!!!") + } + stringSlice := strings.Split(e2taddress_key, ":") + if len(stringSlice) == 1 { + return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key) + } + + _, err := net.LookupIP(stringSlice[0]) + if err != nil { + return fmt.Errorf("E2T E2TAddress DNS look up failed, E2TAddress: %v", stringSlice[0]) + } + + if checkValidaE2TAddress(e2taddress_key) { + return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v", e2taddress_key) + } + + return nil +} + +func validateDeleteE2tData(data *models.E2tDeleteData) error { + + if *data.E2TAddress == "" { + return fmt.Errorf("E2TAddress is empty!!!") + } + + for _, element := range data.RanAssocList { + e2taddress_key := *element.E2TAddress + stringSlice := strings.Split(e2taddress_key, ":") + + if len(stringSlice) == 1 { + return fmt.Errorf("E2T Delete - RanAssocList E2TAddress is not a proper format like ip:port, %v", e2taddress_key) + } + + if !checkValidaE2TAddress(e2taddress_key) { + return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v", e2taddress_key) + } + + } + return nil +} + +func checkValidaE2TAddress(e2taddress string) bool { + + _, exist := rtmgr.Eps[e2taddress] + return exist + +} + func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData, data *models.XappSubscriptionData) error { xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl") @@ -176,8 +268,118 @@ func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscription return nil } -func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, - subdelchan chan<- *models.XappSubscriptionData) { +func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data *models.XappList, subid uint16) error { + xapp.Logger.Debug("Invoked updateXappSubscriptionHandleImpl") + + var fqdnlist []rtmgr.FqDn + for _, item := range *data { + fqdnlist = append(fqdnlist, rtmgr.FqDn(*item)) + } + xapplist := rtmgr.XappList{SubscriptionID: subid, FqdnList: fqdnlist} + var subdata models.XappSubscriptionData + var id int32 + id = int32(subid) + subdata.SubscriptionID = &id + for _, items := range fqdnlist { + subdata.Address = items.Address + subdata.Port = items.Port + err := validateXappSubscriptionData(&subdata) + if err != nil { + xapp.Logger.Error(err.Error()) + return err + } + } + subupdatechan <- &xapplist + return nil +} + +func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData, + data *models.E2tData) error { + xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl") + err := validateE2tData(data) + if err != nil { + xapp.Logger.Error(err.Error()) + return err + } + e2taddchan <- data + return nil +} + +func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error { + + xapp.Logger.Debug("Invoked.validateE2TAddressRANListData : %v", assRanE2tData) + + for _, element := range assRanE2tData { + if *element.E2TAddress == "" { + return fmt.Errorf("E2T Instance - E2TAddress is empty!!!") + } + + e2taddress_key := *element.E2TAddress + if !checkValidaE2TAddress(e2taddress_key) { + return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v", e2taddress_key) + } + + } + return nil +} + +func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap, + data models.RanE2tMap) error { + xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl") + err := validateE2TAddressRANListData(data) + if err != nil { + xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error()) + return err + } + assranchan <- data + return nil +} + +func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap, + data models.RanE2tMap) error { + xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl") + err := validateE2TAddressRANListData(data) + if err != nil { + xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error()) + return err + } + disassranchan <- data + return nil +} + +func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData, + data *models.E2tDeleteData) error { + xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl") + + err := validateDeleteE2tData(data) + if err != nil { + xapp.Logger.Error(err.Error()) + return err + } + + e2tdelchan <- data + return nil +} + +func dumpDebugData() (models.Debuginfo, error) { + var response models.Debuginfo + sdlEngine, _ := sdl.GetSdl("file") + rpeEngine, _ := rpe.GetRpe("rmrpush") + data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) + if err != nil || data == nil { + xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) + return response, err + } + response.RouteTable = *rpeEngine.GeneratePolicies(rtmgr.Eps, data) + + prettyJSON, err := json.MarshalIndent(data, "", "") + response.RouteConfigs = string(prettyJSON) + + return response, err +} + +func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, subupdatechan chan<- *rtmgr.XappList, + subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) { swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON) if err != nil { //log.Fatalln(err) @@ -233,6 +435,69 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c return handle.NewGetHandlesOK() } }) + api.HandleUpdateXappSubscriptionHandleHandler = handle.UpdateXappSubscriptionHandleHandlerFunc( + func(params handle.UpdateXappSubscriptionHandleParams) middleware.Responder { + err := updateXappSubscriptionHandleImpl(subupdatechan, ¶ms.XappList, params.SubscriptionID) + if err != nil { + return handle.NewUpdateXappSubscriptionHandleBadRequest() + } else { + //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints + time.Sleep(1 * time.Second) + return handle.NewUpdateXappSubscriptionHandleCreated() + } + }) + api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc( + func(params handle.CreateNewE2tHandleParams) middleware.Responder { + err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData) + if err != nil { + return handle.NewCreateNewE2tHandleBadRequest() + } else { + time.Sleep(1 * time.Second) + return handle.NewCreateNewE2tHandleCreated() + } + }) + + api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc( + func(params handle.AssociateRanToE2tHandleParams) middleware.Responder { + err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList) + if err != nil { + return handle.NewAssociateRanToE2tHandleBadRequest() + } else { + time.Sleep(1 * time.Second) + return handle.NewAssociateRanToE2tHandleCreated() + } + }) + + api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc( + func(params handle.DissociateRanParams) middleware.Responder { + err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList) + if err != nil { + return handle.NewDissociateRanBadRequest() + } else { + time.Sleep(1 * time.Second) + return handle.NewDissociateRanCreated() + } + }) + + api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc( + func(params handle.DeleteE2tHandleParams) middleware.Responder { + err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData) + if err != nil { + return handle.NewDeleteE2tHandleBadRequest() + } else { + time.Sleep(1 * time.Second) + return handle.NewDeleteE2tHandleCreated() + } + }) + api.DebugGetDebuginfoHandler = debug.GetDebuginfoHandlerFunc( + func(params debug.GetDebuginfoParams) middleware.Responder { + response, err := dumpDebugData() + if err != nil { + return debug.NewGetDebuginfoCreated() + } else { + return debug.NewGetDebuginfoOK().WithPayload(&response) + } + }) // start to serve API xapp.Logger.Info("Starting the HTTP Rest service") if err := server.Serve(); err != nil { @@ -263,43 +528,150 @@ func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) { return nil, nil } -func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error { - var readErr error - var maxRetries = 10 - for i := 1; i <= maxRetries; i++ { - time.Sleep(2 * time.Second) - xappData, err := httpGetXApps(xmurl) - if xappData != nil && err == nil { - pcData, confErr := rtmgr.GetPlatformComponents(configfile) - if confErr != nil { - xapp.Logger.Error(confErr.Error()) - return confErr - } - xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.") - // Combine the xapps data and platform data before writing to the SDL - ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData} - writeErr := sdlEngine.WriteAll(fileName, ricData) - if writeErr != nil { - xapp.Logger.Error(writeErr.Error()) - } - // post subscription req to appmgr - readErr = PostSubReq(xmurl, nbiif) - if readErr == nil { - return nil +func httpGetE2TList(e2murl string) (*[]rtmgr.E2tIdentity, error) { + xapp.Logger.Info("Invoked httprestful.httpGetE2TList: " + e2murl) + r, err := myClient.Get(e2murl) + if err != nil { + return nil, err + } + defer r.Body.Close() + + if r.StatusCode == 200 { + xapp.Logger.Debug("http client raw response: %v", r) + var E2Tlist []rtmgr.E2tIdentity + err = json.NewDecoder(r.Body).Decode(&E2Tlist) + if err != nil { + xapp.Logger.Warn("Json decode failed: " + err.Error()) + } + xapp.Logger.Info("HTTP GET: OK") + xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", E2Tlist) + return &E2Tlist, err + } + xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode) + return nil, nil +} + +func PopulateE2TMap(e2tDataList *[]rtmgr.E2tIdentity, e2ts map[string]rtmgr.E2TInstance, meids []string) { + xapp.Logger.Info("Invoked httprestful.PopulateE2TMap ") + + for _, e2tData := range *e2tDataList { + var str string + + e2tinst := rtmgr.E2TInstance{ + Ranlist: make([]string, len(e2tData.Rannames)), + } + + e2tinst.Fqdn = e2tData.E2taddress + e2tinst.Name = "E2TERMINST" + copy(e2tinst.Ranlist, e2tData.Rannames) + + if len(e2tData.Rannames) > 0 { + var meidar string + for _, meid := range e2tData.Rannames { + meidar += meid + " " } + str += "mme_ar|" + e2tData.E2taddress + "|" + strings.TrimSuffix(meidar, " ") + } + + e2ts[e2tinst.Fqdn] = e2tinst + meids = append(meids,str) + } +} + +func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, e2murl string, sdlEngine sdl.Engine) error { + xapp.Logger.Info("Invoked retrieveStartupData ") + var readErr error + var err error + var maxRetries = 10 + var xappData *[]rtmgr.XApp + xappData = new([]rtmgr.XApp) + xapp.Logger.Info("Trying to fetch XApps data from XAPP manager") + for i := 1; i <= maxRetries; i++ { + time.Sleep(2 * time.Second) + + readErr = nil + xappData, err = httpGetXApps(xmurl) + if xappData != nil && err == nil { + break + } else if err == nil { + readErr = errors.New("unexpected HTTP status code") + } else { + xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) + readErr = err + } + } + + if ( readErr != nil) { + return readErr + } + + var meids []string + e2ts := make(map[string]rtmgr.E2TInstance) + xapp.Logger.Info("Trying to fetch E2T data from E2manager") + for i := 1; i <= maxRetries; i++ { + + readErr = nil + e2tDataList, err := httpGetE2TList(e2murl) + if e2tDataList != nil && err == nil { + PopulateE2TMap(e2tDataList, e2ts, meids[:]) + break } else if err == nil { readErr = errors.New("unexpected HTTP status code") } else { - xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) + xapp.Logger.Warn("cannot get E2T data from E2M due to: " + err.Error()) readErr = err } + time.Sleep(2 * time.Second) + } + + if ( readErr != nil) { + return readErr } + + pcData, confErr := rtmgr.GetPlatformComponents(configfile) + if confErr != nil { + xapp.Logger.Error(confErr.Error()) + return confErr + } + xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.") + // Combine the xapps data and platform data before writing to the SDL + ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: e2ts, MeidMap: meids} + writeErr := sdlEngine.WriteAll(fileName, ricData) + if writeErr != nil { + xapp.Logger.Error(writeErr.Error()) + } + + xapp.Logger.Info("Trying to fetch Subscriptions data from Subscription manager") + for i := 1; i <= maxRetries; i++ { + readErr = nil + sub_list, err := xapp.Subscription.QuerySubscriptions() + + if sub_list != nil && err == nil { + PopulateSubscription(sub_list) + break + } else { + readErr = err + xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error()) + } + time.Sleep(2 * time.Second) + } + + if (readErr != nil) { + return readErr + } + + // post subscription req to appmgr + readErr = PostSubReq(xmurl, nbiif) + if readErr == nil { + return nil + } + return readErr } -func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string, - sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error { - err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine) +func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string, e2murl string, + sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error { + err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, e2murl, sdlEngine) if err != nil { xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error()) return err @@ -308,9 +680,14 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co datach := make(chan *models.XappCallbackData, 10) subschan := make(chan *models.XappSubscriptionData, 10) subdelchan := make(chan *models.XappSubscriptionData, 10) + subupdatechan := make(chan *rtmgr.XappList, 10) + e2taddchan := make(chan *models.E2tData, 10) + associateranchan := make(chan models.RanE2tMap, 10) + disassociateranchan := make(chan models.RanE2tMap, 10) + e2tdelchan := make(chan *models.E2tDeleteData, 10) xapp.Logger.Info("Launching Rest Http service") go func() { - r.LaunchRest(&nbiif, datach, subschan, subdelchan) + r.LaunchRest(&nbiif, datach, subschan, subupdatechan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan) }() go func() { @@ -322,7 +699,9 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.") alldata, err1 := httpGetXApps(xmurl) if alldata != nil && err1 == nil { + m.Lock() sdlEngine.WriteXApps(fileName, alldata) + m.Unlock() triggerSBI <- true } } @@ -347,6 +726,66 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co } }() + go func() { + for { + data := <-subupdatechan + xapp.Logger.Debug("received XApp subscription Merge data") + updateSubscription(data) + triggerSBI <- true + } + }() + + go func() { + for { + + data, meiddata, _ := r.RecvNewE2Tdata(e2taddchan) + if data != nil { + xapp.Logger.Debug("received create New E2T data") + m.Lock() + sdlEngine.WriteNewE2TInstance(fileName, data, meiddata) + m.Unlock() + triggerSBI <- true + } + } + }() + + go func() { + for { + data := <-associateranchan + xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager") + m.Lock() + sdlEngine.WriteAssRANToE2TInstance(fileName, data) + m.Unlock() + triggerSBI <- true + } + }() + + go func() { + for { + + data := <-disassociateranchan + xapp.Logger.Debug("received disassociate RANs from E2T instance") + m.Lock() + sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data) + m.Unlock() + triggerSBI <- true + } + }() + + go func() { + for { + + data := <-e2tdelchan + xapp.Logger.Debug("received Delete E2T data") + if data != nil { + m.Lock() + sdlEngine.WriteDeleteE2TInstance(fileName, data) + m.Unlock() + triggerSBI <- true + } + } + }() + return nil } @@ -355,6 +794,7 @@ func (r *HttpRestful) Terminate() error { } func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool { + xapp.Logger.Debug("Adding the subscription into the subscriptions list") var b = false sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port} for _, elem := range *subs { @@ -388,3 +828,57 @@ func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubsc } return present } + +func updateSubscription(data *rtmgr.XappList) { + + var subdata models.XappSubscriptionData + var id int32 + var matchingsubid, deletecount uint8 + id = int32(data.SubscriptionID) + subdata.SubscriptionID = &id + for _, subs := range rtmgr.Subs { + if int32(data.SubscriptionID) == subs.SubID { + matchingsubid++ + } + } + + for deletecount < matchingsubid { + for _, subs := range rtmgr.Subs { + if int32(data.SubscriptionID) == subs.SubID { + subdata.SubscriptionID = &subs.SubID + subdata.Address = &subs.Fqdn + subdata.Port = &subs.Port + xapp.Logger.Debug("Deletion Subscription List has %v", subdata) + delSubscription(&rtmgr.Subs, &subdata) + break + } + } + deletecount++ + } + + for _, items := range data.FqdnList { + subdata.Address = items.Address + subdata.Port = items.Port + xapp.Logger.Debug("Adding Subscription List has %v", subdata) + addSubscription(&rtmgr.Subs, &subdata) + } + +} + +func PopulateSubscription(sub_list xfmodel.SubscriptionList) { + for _, sub_row := range sub_list { + var subdata models.XappSubscriptionData + id := int32(sub_row.SubscriptionID) + subdata.SubscriptionID = &id + for _, ep := range sub_row.Endpoint { + + stringSlice := strings.Split(ep, ":") + subdata.Address = &stringSlice[0] + intportval, _ := strconv.Atoi( stringSlice[1]) + value := uint16(intportval) + subdata.Port = &value + xapp.Logger.Debug("Adding Subscription List has Address :%v, port :%v, SubscriptionID :%v ", subdata.Address, subdata.Address, subdata.SubscriptionID) + addSubscription(&rtmgr.Subs, &subdata) + } + } +}