2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
19 This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 platform project (RICP).
22 ==================================================================================
25 Mnemonic: httprestful.go
26 Abstract: HTTP Restful API NBI implementation
27 Based on Swagger generated code
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
38 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39 "github.com/go-openapi/loads"
40 "github.com/go-openapi/runtime/middleware"
43 "routing-manager/pkg/models"
44 "routing-manager/pkg/restapi"
45 "routing-manager/pkg/restapi/operations"
46 "routing-manager/pkg/restapi/operations/handle"
47 "routing-manager/pkg/rpe"
48 "routing-manager/pkg/rtmgr"
49 "routing-manager/pkg/sdl"
56 //var myClient = &http.Client{Timeout: 1 * time.Second}
58 type HttpRestful struct {
60 LaunchRest LaunchRestHandler
61 RecvXappCallbackData RecvXappCallbackDataHandler
62 RecvNewE2Tdata RecvNewE2TdataHandler
63 ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
64 RetrieveStartupData RetrieveStartupDataHandler
67 func NewHttpRestful() *HttpRestful {
68 instance := new(HttpRestful)
69 instance.LaunchRest = launchRest
70 instance.RecvXappCallbackData = recvXappCallbackData
71 instance.RecvNewE2Tdata = recvNewE2Tdata
72 instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
73 instance.RetrieveStartupData = retrieveStartupData
77 // ToDo: Use Range over channel. Read and return only the latest one.
78 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
79 var xappData *models.XappCallbackData
80 // Drain the channel as we are only looking for the latest value until
81 // xapp manager sends all xapp data with every request.
82 length := len(dataChannel)
83 //xapp.Logger.Info(length)
84 for i := 0; i <= length; i++ {
85 xapp.Logger.Info("data received")
86 // If no data received from the REST, it blocks.
87 xappData = <-dataChannel
90 var xapps []rtmgr.XApp
91 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
94 xapp.Logger.Info("No data")
97 xapp.Logger.Debug("Nothing received on the Http interface")
101 func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, error) {
102 var e2tData *models.E2tData
103 xapp.Logger.Info("data received")
105 e2tData = <-dataChannel
109 e2tinst := rtmgr.E2TInstance {
110 Ranlist : make([]string, len(e2tData.RanNamelist)),
113 e2tinst.Fqdn = *e2tData.E2TAddress
114 e2tinst.Name = "E2TERMINST"
115 copy(e2tinst.Ranlist, e2tData.RanNamelist)
120 xapp.Logger.Info("No data")
123 xapp.Logger.Debug("Nothing received on the Http interface")
127 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
128 if len(callbackData.XApps) == 0 {
129 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
131 var xapps []rtmgr.XApp
132 err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
134 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
139 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
141 xapp.Logger.Debug("Received callback data")
143 err := validateXappCallbackData(data)
145 xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
153 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
154 var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
155 for _, ep := range rtmgr.Eps {
156 if ep.Ip == *data.Address && ep.Port == *data.Port {
164 func validateE2tData(data *models.E2tData) error {
166 e2taddress_key := *data.E2TAddress
167 if (e2taddress_key == "") {
168 return fmt.Errorf("E2TAddress is empty!!!")
170 stringSlice := strings.Split(e2taddress_key, ":")
171 if (len(stringSlice) == 1) {
172 return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key )
175 if checkValidaE2TAddress(e2taddress_key) {
176 return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v",e2taddress_key)
182 func validateDeleteE2tData(data *models.E2tDeleteData) error {
184 if (*data.E2TAddress == "") {
185 return fmt.Errorf("E2TAddress is empty!!!")
188 for _, element := range data.RanAssocList {
189 e2taddress_key := *element.E2TAddress
190 stringSlice := strings.Split(e2taddress_key, ":")
192 if (len(stringSlice) == 1) {
193 return fmt.Errorf("E2T Delete - RanAssocList E2TAddress is not a proper format like ip:port, %v", e2taddress_key)
197 if !checkValidaE2TAddress(e2taddress_key) {
198 return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key)
205 func checkValidaE2TAddress(e2taddress string) bool {
207 _, exist := rtmgr.Eps[e2taddress]
212 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
213 data *models.XappSubscriptionData) error {
214 xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
215 err := validateXappSubscriptionData(data)
217 xapp.Logger.Error(err.Error())
221 //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
222 xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
226 func subscriptionExists(data *models.XappSubscriptionData) bool {
228 sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
229 for _, elem := range rtmgr.Subs {
238 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
239 data *models.XappSubscriptionData) error {
240 xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
241 err := validateXappSubscriptionData(data)
243 xapp.Logger.Error(err.Error())
247 if !subscriptionExists(data) {
248 xapp.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
249 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
257 func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
258 data *models.E2tData) error {
259 xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
260 err := validateE2tData(data)
262 xapp.Logger.Error(err.Error())
269 func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
271 xapp.Logger.Debug("Invoked.validateE2TAddressRANListData : %v", assRanE2tData)
273 for _, element := range assRanE2tData {
274 if *element.E2TAddress == "" {
275 return fmt.Errorf("E2T Instance - E2TAddress is empty!!!")
278 e2taddress_key := *element.E2TAddress
279 if !checkValidaE2TAddress(e2taddress_key) {
280 return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key)
287 func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap,
288 data models.RanE2tMap) error {
289 xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
290 err := validateE2TAddressRANListData(data)
292 xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error())
299 func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap,
300 data models.RanE2tMap) error {
301 xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
302 err := validateE2TAddressRANListData(data)
304 xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error())
307 disassranchan <- data
311 func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
312 data *models.E2tDeleteData) error {
313 xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
315 err := validateDeleteE2tData(data)
317 xapp.Logger.Error(err.Error())
325 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
326 subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) {
327 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
330 xapp.Logger.Error(err.Error())
333 nbiUrl, err := url.Parse(*nbiif)
335 xapp.Logger.Error(err.Error())
338 api := operations.NewRoutingManagerAPI(swaggerSpec)
339 server := restapi.NewServer(api)
340 defer server.Shutdown()
342 server.Port, err = strconv.Atoi(nbiUrl.Port())
344 xapp.Logger.Error("Invalid NBI RestAPI port")
347 server.Host = "0.0.0.0"
349 api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
350 func(params handle.ProvideXappHandleParams) middleware.Responder {
351 xapp.Logger.Info("Data received on Http interface")
352 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
354 xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
355 return handle.NewProvideXappHandleBadRequest()
357 return handle.NewGetHandlesOK()
360 api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
361 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
362 err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
364 return handle.NewProvideXappSubscriptionHandleBadRequest()
366 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
367 time.Sleep(1 * time.Second)
368 return handle.NewGetHandlesOK()
371 api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
372 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
373 err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
375 return handle.NewDeleteXappSubscriptionHandleNoContent()
377 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
378 time.Sleep(1 * time.Second)
379 return handle.NewGetHandlesOK()
382 api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
383 func(params handle.CreateNewE2tHandleParams) middleware.Responder {
384 err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
386 return handle.NewCreateNewE2tHandleBadRequest()
388 time.Sleep(1 * time.Second)
389 return handle.NewCreateNewE2tHandleCreated()
393 api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
394 func(params handle.AssociateRanToE2tHandleParams) middleware.Responder {
395 err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
397 return handle.NewAssociateRanToE2tHandleBadRequest()
399 time.Sleep(1 * time.Second)
400 return handle.NewAssociateRanToE2tHandleCreated()
404 api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
405 func(params handle.DissociateRanParams) middleware.Responder {
406 err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList)
408 return handle.NewDissociateRanBadRequest()
410 time.Sleep(1 * time.Second)
411 return handle.NewDissociateRanCreated()
415 api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
416 func(params handle.DeleteE2tHandleParams) middleware.Responder {
417 err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
419 return handle.NewDeleteE2tHandleBadRequest()
421 time.Sleep(1 * time.Second)
422 return handle.NewDeleteE2tHandleCreated()
425 // start to serve API
426 xapp.Logger.Info("Starting the HTTP Rest service")
427 if err := server.Serve(); err != nil {
428 xapp.Logger.Error(err.Error())
432 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
433 xapp.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
434 r, err := myClient.Get(xmurl)
440 if r.StatusCode == 200 {
441 xapp.Logger.Debug("http client raw response: %v", r)
442 var xapps []rtmgr.XApp
443 err = json.NewDecoder(r.Body).Decode(&xapps)
445 xapp.Logger.Warn("Json decode failed: " + err.Error())
447 xapp.Logger.Info("HTTP GET: OK")
448 xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
451 xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
455 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
458 for i := 1; i <= maxRetries; i++ {
459 time.Sleep(2 * time.Second)
460 xappData, err := httpGetXApps(xmurl)
461 if xappData != nil && err == nil {
462 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
464 xapp.Logger.Error(confErr.Error())
467 xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
468 // Combine the xapps data and platform data before writing to the SDL
469 ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: make(map[string]rtmgr.E2TInstance)}
470 writeErr := sdlEngine.WriteAll(fileName, ricData)
472 xapp.Logger.Error(writeErr.Error())
474 // post subscription req to appmgr
475 readErr = PostSubReq(xmurl, nbiif)
479 } else if err == nil {
480 readErr = errors.New("unexpected HTTP status code")
482 xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
489 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
490 sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
491 err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
493 xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
497 datach := make(chan *models.XappCallbackData, 10)
498 subschan := make(chan *models.XappSubscriptionData, 10)
499 subdelchan := make(chan *models.XappSubscriptionData, 10)
500 e2taddchan := make(chan *models.E2tData, 10)
501 associateranchan := make(chan models.RanE2tMap, 10)
502 disassociateranchan := make(chan models.RanE2tMap, 10)
503 e2tdelchan := make(chan *models.E2tDeleteData, 10)
504 xapp.Logger.Info("Launching Rest Http service")
506 r.LaunchRest(&nbiif, datach, subschan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
511 data, err := r.RecvXappCallbackData(datach)
513 xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
514 } else if data != nil {
515 xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
516 alldata, err1 := httpGetXApps(xmurl)
517 if alldata != nil && err1 == nil {
519 sdlEngine.WriteXApps(fileName, alldata)
530 xapp.Logger.Debug("received XApp subscription data")
531 addSubscription(&rtmgr.Subs, data)
539 xapp.Logger.Debug("received XApp subscription delete data")
540 delSubscription(&rtmgr.Subs, data)
547 xapp.Logger.Debug("received create New E2T data")
549 data, _ := r.RecvNewE2Tdata(e2taddchan)
552 sdlEngine.WriteNewE2TInstance(fileName, data)
561 data := <-associateranchan
562 xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
564 sdlEngine.WriteAssRANToE2TInstance(fileName, data)
573 data := <-disassociateranchan
574 xapp.Logger.Debug("received disassociate RANs from E2T instance")
576 sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
584 xapp.Logger.Debug("received Delete E2T data")
589 sdlEngine.WriteDeleteE2TInstance(fileName, data)
599 func (r *HttpRestful) Terminate() error {
603 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
605 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
606 for _, elem := range *subs {
608 xapp.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
613 *subs = append(*subs, sub)
618 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
619 xapp.Logger.Debug("Deleteing the subscription from the subscriptions list")
621 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
622 for i, elem := range *subs {
625 // Since the order of the list is not important, we are swapping the last element
626 // with the matching element and replacing the list with list(n-1) elements.
627 (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
628 *subs = (*subs)[:len(*subs)-1]
632 if present == false {
633 xapp.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)