X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fnbi%2Fhttprestful.go;h=5e90bfac41d1742674b73f2b95314bc8f6122813;hb=749099bc00ec6cad5da19846e65bd5b4bd9b8de4;hp=96062885cf1bab3fc8b311a5f3829f7cdbd8bed2;hpb=761934a3d9f7a1426d1e14e34fb9a4c16599a237;p=ric-plt%2Frtmgr.git diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index 9606288..5e90bfa 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -35,6 +35,9 @@ import ( "encoding/json" "errors" "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "github.com/go-openapi/loads" + "github.com/go-openapi/runtime/middleware" "net/url" "os" "routing-manager/pkg/models" @@ -46,9 +49,6 @@ import ( "routing-manager/pkg/sdl" "strconv" "time" - - "github.com/go-openapi/loads" - "github.com/go-openapi/runtime/middleware" ) //var myClient = &http.Client{Timeout: 1 * time.Second} @@ -57,6 +57,8 @@ type HttpRestful struct { Engine LaunchRest LaunchRestHandler RecvXappCallbackData RecvXappCallbackDataHandler + RecvNewE2Tdata RecvNewE2TdataHandler + ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl RetrieveStartupData RetrieveStartupDataHandler } @@ -65,6 +67,7 @@ func NewHttpRestful() *HttpRestful { instance := new(HttpRestful) instance.LaunchRest = launchRest instance.RecvXappCallbackData = recvXappCallbackData + instance.RecvNewE2Tdata = recvNewE2Tdata instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl instance.RetrieveStartupData = retrieveStartupData return instance @@ -76,9 +79,9 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr // Drain the channel as we are only looking for the latest value until // xapp manager sends all xapp data with every request. length := len(dataChannel) - //rtmgr.Logger.Info(length) + //xapp.Logger.Info(length) for i := 0; i <= length; i++ { - rtmgr.Logger.Info("data received") + xapp.Logger.Info("data received") // If no data received from the REST, it blocks. xappData = <-dataChannel } @@ -87,13 +90,35 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr err := json.Unmarshal([]byte(xappData.XApps), &xapps) return &xapps, err } else { - rtmgr.Logger.Info("No data") + xapp.Logger.Info("No data") } - rtmgr.Logger.Debug("Nothing received on the Http interface") + xapp.Logger.Debug("Nothing received on the Http interface") return nil, nil } +func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, error) { + var e2tData *models.E2tData + xapp.Logger.Info("data received") + + e2tData = <-dataChannel + + if nil != e2tData { + var e2tinst rtmgr.E2TInstance + e2tinst.Fqdn = *e2tData.E2TAddress + e2tinst.Name = "E2TERMINST" + return &e2tinst,nil + } else { + xapp.Logger.Info("No data") + } + + xapp.Logger.Debug("Nothing received on the Http interface") + return nil, nil +} + + + + func validateXappCallbackData(callbackData *models.XappCallbackData) error { if len(callbackData.XApps) == 0 { return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps) @@ -108,11 +133,11 @@ func validateXappCallbackData(callbackData *models.XappCallbackData) error { func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error { if data != nil { - rtmgr.Logger.Debug("Received callback data") + xapp.Logger.Debug("Received callback data") } err := validateXappCallbackData(data) if err != nil { - rtmgr.Logger.Warn("XApp callback data validation failed: " + err.Error()) + xapp.Logger.Warn("XApp callback data validation failed: " + err.Error()) return err } else { datach <- data @@ -131,17 +156,32 @@ func validateXappSubscriptionData(data *models.XappSubscriptionData) error { return err } +func validateE2tData(data *models.E2tData) error { + var err = fmt.Errorf("E2T E2TAddress is not proper: %v", *data.E2TAddress) +/* for _, ep := range rtmgr.Eps { + if ep.Ip == *data.Address && ep.Port == *data.Port { + err = nil + break + } + }*/ + + if (*data.E2TAddress != "") { + err = nil + } + return err +} + func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData, data *models.XappSubscriptionData) error { - rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl") + xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl") err := validateXappSubscriptionData(data) if err != nil { - rtmgr.Logger.Error(err.Error()) + xapp.Logger.Error(err.Error()) return err } subchan <- data //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port))) - rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps) + xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps) return nil } @@ -159,15 +199,15 @@ func subscriptionExists(data *models.XappSubscriptionData) bool { func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData, data *models.XappSubscriptionData) error { - rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl") + xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl") err := validateXappSubscriptionData(data) if err != nil { - rtmgr.Logger.Error(err.Error()) + xapp.Logger.Error(err.Error()) return err } if !subscriptionExists(data) { - rtmgr.Logger.Warn("subscription not found: %d", *data.SubscriptionID) + xapp.Logger.Warn("subscription not found: %d", *data.SubscriptionID) err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID) return err } @@ -176,17 +216,30 @@ func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscription return nil } +func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData, + data *models.E2tData) error { + xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl") + err := validateE2tData(data) + if err != nil { + xapp.Logger.Error(err.Error()) + return err + } + + e2taddchan <- data + return nil +} + func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, - subdelchan chan<- *models.XappSubscriptionData) { + subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData) { swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON) if err != nil { //log.Fatalln(err) - rtmgr.Logger.Error(err.Error()) + xapp.Logger.Error(err.Error()) os.Exit(1) } nbiUrl, err := url.Parse(*nbiif) if err != nil { - rtmgr.Logger.Error(err.Error()) + xapp.Logger.Error(err.Error()) os.Exit(1) } api := operations.NewRoutingManagerAPI(swaggerSpec) @@ -195,17 +248,17 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c server.Port, err = strconv.Atoi(nbiUrl.Port()) if err != nil { - rtmgr.Logger.Error("Invalid NBI RestAPI port") + xapp.Logger.Error("Invalid NBI RestAPI port") os.Exit(1) } server.Host = "0.0.0.0" // set handlers api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc( func(params handle.ProvideXappHandleParams) middleware.Responder { - rtmgr.Logger.Info("Data received on Http interface") + xapp.Logger.Info("Data received on Http interface") err := provideXappHandleHandlerImpl(datach, params.XappCallbackData) if err != nil { - rtmgr.Logger.Error("Invalid XApp callback data: " + err.Error()) + xapp.Logger.Error("Invalid XApp callback data: " + err.Error()) return handle.NewProvideXappHandleBadRequest() } else { return handle.NewGetHandlesOK() @@ -217,6 +270,8 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c if err != nil { return handle.NewProvideXappSubscriptionHandleBadRequest() } else { + //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints + time.Sleep(1 * time.Second) return handle.NewGetHandlesOK() } }) @@ -226,18 +281,31 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c if err != nil { return handle.NewDeleteXappSubscriptionHandleNoContent() } else { + //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints + time.Sleep(1 * time.Second) return handle.NewGetHandlesOK() } }) + api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc( + func(params handle.CreateNewE2tHandleParams) middleware.Responder { + err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData) + if err != nil { + return handle.NewCreateNewE2tHandleBadRequest() + } else { + time.Sleep(1 * time.Second) + return handle.NewCreateNewE2tHandleCreated() + } + }) + // start to serve API - rtmgr.Logger.Info("Starting the HTTP Rest service") + xapp.Logger.Info("Starting the HTTP Rest service") if err := server.Serve(); err != nil { - rtmgr.Logger.Error(err.Error()) + xapp.Logger.Error(err.Error()) } } func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) { - rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl) + xapp.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl) r, err := myClient.Get(xmurl) if err != nil { return nil, err @@ -245,17 +313,17 @@ func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) { defer r.Body.Close() if r.StatusCode == 200 { - rtmgr.Logger.Debug("http client raw response: %v", r) + xapp.Logger.Debug("http client raw response: %v", r) var xapps []rtmgr.XApp err = json.NewDecoder(r.Body).Decode(&xapps) if err != nil { - rtmgr.Logger.Warn("Json decode failed: " + err.Error()) + xapp.Logger.Warn("Json decode failed: " + err.Error()) } - rtmgr.Logger.Info("HTTP GET: OK") - rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps) + xapp.Logger.Info("HTTP GET: OK") + xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps) return &xapps, err } - rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode) + xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode) return nil, nil } @@ -268,15 +336,15 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile if xappData != nil && err == nil { pcData, confErr := rtmgr.GetPlatformComponents(configfile) if confErr != nil { - rtmgr.Logger.Error(confErr.Error()) + xapp.Logger.Error(confErr.Error()) return confErr } - rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.") + xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.") // Combine the xapps data and platform data before writing to the SDL - ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData} + ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: make(map[string]rtmgr.E2TInstance)} writeErr := sdlEngine.WriteAll(fileName, ricData) if writeErr != nil { - rtmgr.Logger.Error(writeErr.Error()) + xapp.Logger.Error(writeErr.Error()) } // post subscription req to appmgr readErr = PostSubReq(xmurl, nbiif) @@ -286,7 +354,7 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile } else if err == nil { readErr = errors.New("unexpected HTTP status code") } else { - rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error()) + xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) readErr = err } } @@ -297,26 +365,31 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error { err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine) if err != nil { - rtmgr.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error()) + xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error()) return err } datach := make(chan *models.XappCallbackData, 10) subschan := make(chan *models.XappSubscriptionData, 10) subdelchan := make(chan *models.XappSubscriptionData, 10) - rtmgr.Logger.Info("Launching Rest Http service") + e2taddchan := make(chan *models.E2tData, 10) + xapp.Logger.Info("Launching Rest Http service") go func() { - r.LaunchRest(&nbiif, datach, subschan, subdelchan) + r.LaunchRest(&nbiif, datach, subschan, subdelchan, e2taddchan) }() go func() { for { data, err := r.RecvXappCallbackData(datach) if err != nil { - rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error()) + xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error()) } else if data != nil { - sdlEngine.WriteXApps(fileName, data) - triggerSBI <- true + xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.") + alldata, err1 := httpGetXApps(xmurl) + if alldata != nil && err1 == nil { + sdlEngine.WriteXApps(fileName, alldata) + triggerSBI <- true + } } } }() @@ -324,7 +397,7 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co go func() { for { data := <-subschan - rtmgr.Logger.Debug("received XApp subscription data") + xapp.Logger.Debug("received XApp subscription data") addSubscription(&rtmgr.Subs, data) triggerSBI <- true } @@ -333,12 +406,26 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co go func() { for { data := <-subdelchan - rtmgr.Logger.Debug("received XApp subscription delete data") + xapp.Logger.Debug("received XApp subscription delete data") delSubscription(&rtmgr.Subs, data) triggerSBI <- true } }() + go func() { + for { + xapp.Logger.Debug("received create New E2T data") + + data, err := r.RecvNewE2Tdata(e2taddchan) + if err != nil { + xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error()) + } else if data != nil { + sdlEngine.WriteNewE2TInstance(fileName, data) + triggerSBI <- true + } + } + }() + return nil } @@ -351,7 +438,7 @@ func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubsc sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port} for _, elem := range *subs { if elem == sub { - rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem) + xapp.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem) b = true } } @@ -362,7 +449,7 @@ func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubsc } func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool { - rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list") + xapp.Logger.Debug("Deleteing the subscription from the subscriptions list") var present = false sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port} for i, elem := range *subs { @@ -376,7 +463,7 @@ func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubsc } } if present == false { - rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData) + xapp.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData) } return present }