2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
19 This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 platform project (RICP).
22 ==================================================================================
25 Mnemonic: httprestful.go
26 Abstract: HTTP Restful API NBI implementation
27 Based on Swagger generated code
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
38 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39 "github.com/go-openapi/loads"
40 "github.com/go-openapi/runtime/middleware"
44 "routing-manager/pkg/models"
45 "routing-manager/pkg/restapi"
46 "routing-manager/pkg/restapi/operations"
47 "routing-manager/pkg/restapi/operations/debug"
48 "routing-manager/pkg/restapi/operations/handle"
49 "routing-manager/pkg/rpe"
50 "routing-manager/pkg/rtmgr"
51 "routing-manager/pkg/sdl"
58 //var myClient = &http.Client{Timeout: 1 * time.Second}
60 type HttpRestful struct {
62 LaunchRest LaunchRestHandler
63 RecvXappCallbackData RecvXappCallbackDataHandler
64 RecvNewE2Tdata RecvNewE2TdataHandler
65 ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
66 RetrieveStartupData RetrieveStartupDataHandler
69 func NewHttpRestful() *HttpRestful {
70 instance := new(HttpRestful)
71 instance.LaunchRest = launchRest
72 instance.RecvXappCallbackData = recvXappCallbackData
73 instance.RecvNewE2Tdata = recvNewE2Tdata
74 instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
75 instance.RetrieveStartupData = retrieveStartupData
79 // ToDo: Use Range over channel. Read and return only the latest one.
80 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
81 var xappData *models.XappCallbackData
82 // Drain the channel as we are only looking for the latest value until
83 // xapp manager sends all xapp data with every request.
84 length := len(dataChannel)
85 //xapp.Logger.Info(length)
86 for i := 0; i <= length; i++ {
87 xapp.Logger.Info("data received")
88 // If no data received from the REST, it blocks.
89 xappData = <-dataChannel
92 var xapps []rtmgr.XApp
93 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
96 xapp.Logger.Info("No data")
99 xapp.Logger.Debug("Nothing received on the Http interface")
103 func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) {
104 var e2tData *models.E2tData
106 xapp.Logger.Info("data received")
108 e2tData = <-dataChannel
112 e2tinst := rtmgr.E2TInstance{
113 Ranlist: make([]string, len(e2tData.RanNamelist)),
116 e2tinst.Fqdn = *e2tData.E2TAddress
117 e2tinst.Name = "E2TERMINST"
118 copy(e2tinst.Ranlist, e2tData.RanNamelist)
119 if len(e2tData.RanNamelist) > 0 {
121 for _, meid := range e2tData.RanNamelist {
124 str = "mme_ar|" + *e2tData.E2TAddress + "|" + strings.TrimSuffix(meidar, " ")
126 return &e2tinst, str, nil
129 xapp.Logger.Info("No data")
132 xapp.Logger.Debug("Nothing received on the Http interface")
136 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
137 if len(callbackData.XApps) == 0 {
138 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
140 var xapps []rtmgr.XApp
141 err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
143 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
148 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
150 xapp.Logger.Debug("Received callback data")
152 err := validateXappCallbackData(data)
154 xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
162 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
163 var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
164 for _, ep := range rtmgr.Eps {
165 if ep.Ip == *data.Address && ep.Port == *data.Port {
173 func validateE2tData(data *models.E2tData) error {
175 e2taddress_key := *data.E2TAddress
176 if e2taddress_key == "" {
177 return fmt.Errorf("E2TAddress is empty!!!")
179 stringSlice := strings.Split(e2taddress_key, ":")
180 if len(stringSlice) == 1 {
181 return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key)
184 _, err := net.LookupIP(stringSlice[0])
186 return fmt.Errorf("E2T E2TAddress DNS look up failed, E2TAddress: %v", stringSlice[0])
189 if checkValidaE2TAddress(e2taddress_key) {
190 return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v", e2taddress_key)
196 func validateDeleteE2tData(data *models.E2tDeleteData) error {
198 if *data.E2TAddress == "" {
199 return fmt.Errorf("E2TAddress is empty!!!")
202 for _, element := range data.RanAssocList {
203 e2taddress_key := *element.E2TAddress
204 stringSlice := strings.Split(e2taddress_key, ":")
206 if len(stringSlice) == 1 {
207 return fmt.Errorf("E2T Delete - RanAssocList E2TAddress is not a proper format like ip:port, %v", e2taddress_key)
210 if !checkValidaE2TAddress(e2taddress_key) {
211 return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v", e2taddress_key)
218 func checkValidaE2TAddress(e2taddress string) bool {
220 _, exist := rtmgr.Eps[e2taddress]
225 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
226 data *models.XappSubscriptionData) error {
227 xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
228 err := validateXappSubscriptionData(data)
230 xapp.Logger.Error(err.Error())
234 //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
235 xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
239 func subscriptionExists(data *models.XappSubscriptionData) bool {
241 sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
242 for _, elem := range rtmgr.Subs {
251 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
252 data *models.XappSubscriptionData) error {
253 xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
254 err := validateXappSubscriptionData(data)
256 xapp.Logger.Error(err.Error())
260 if !subscriptionExists(data) {
261 xapp.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
262 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
270 func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data *models.XappList, subid uint16) error {
271 xapp.Logger.Debug("Invoked updateXappSubscriptionHandleImpl")
273 var fqdnlist []rtmgr.FqDn
274 for _, item := range *data {
275 fqdnlist = append(fqdnlist, rtmgr.FqDn(*item))
277 xapplist := rtmgr.XappList{SubscriptionID: subid, FqdnList: fqdnlist}
278 var subdata models.XappSubscriptionData
281 subdata.SubscriptionID = &id
282 for _, items := range fqdnlist {
283 subdata.Address = items.Address
284 subdata.Port = items.Port
285 err := validateXappSubscriptionData(&subdata)
287 xapp.Logger.Error(err.Error())
291 subupdatechan <- &xapplist
295 func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
296 data *models.E2tData) error {
297 xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
298 err := validateE2tData(data)
300 xapp.Logger.Error(err.Error())
307 func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
309 xapp.Logger.Debug("Invoked.validateE2TAddressRANListData : %v", assRanE2tData)
311 for _, element := range assRanE2tData {
312 if *element.E2TAddress == "" {
313 return fmt.Errorf("E2T Instance - E2TAddress is empty!!!")
316 e2taddress_key := *element.E2TAddress
317 if !checkValidaE2TAddress(e2taddress_key) {
318 return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v", e2taddress_key)
325 func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap,
326 data models.RanE2tMap) error {
327 xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
328 err := validateE2TAddressRANListData(data)
330 xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error())
337 func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap,
338 data models.RanE2tMap) error {
339 xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
340 err := validateE2TAddressRANListData(data)
342 xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error())
345 disassranchan <- data
349 func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
350 data *models.E2tDeleteData) error {
351 xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
353 err := validateDeleteE2tData(data)
355 xapp.Logger.Error(err.Error())
363 func dumpDebugData() (models.Debuginfo, error) {
364 var response models.Debuginfo
365 sdlEngine, _ := sdl.GetSdl("file")
366 rpeEngine, _ := rpe.GetRpe("rmrpush")
367 data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
368 if err != nil || data == nil {
369 xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
372 response.RouteTable = *rpeEngine.GeneratePolicies(rtmgr.Eps, data)
374 prettyJSON, err := json.MarshalIndent(data, "", "")
375 response.RouteConfigs = string(prettyJSON)
380 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, subupdatechan chan<- *rtmgr.XappList,
381 subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) {
382 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
385 xapp.Logger.Error(err.Error())
388 nbiUrl, err := url.Parse(*nbiif)
390 xapp.Logger.Error(err.Error())
393 api := operations.NewRoutingManagerAPI(swaggerSpec)
394 server := restapi.NewServer(api)
395 defer server.Shutdown()
397 server.Port, err = strconv.Atoi(nbiUrl.Port())
399 xapp.Logger.Error("Invalid NBI RestAPI port")
402 server.Host = "0.0.0.0"
404 api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
405 func(params handle.ProvideXappHandleParams) middleware.Responder {
406 xapp.Logger.Info("Data received on Http interface")
407 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
409 xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
410 return handle.NewProvideXappHandleBadRequest()
412 return handle.NewGetHandlesOK()
415 api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
416 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
417 err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
419 return handle.NewProvideXappSubscriptionHandleBadRequest()
421 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
422 time.Sleep(1 * time.Second)
423 return handle.NewGetHandlesOK()
426 api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
427 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
428 err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
430 return handle.NewDeleteXappSubscriptionHandleNoContent()
432 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
433 time.Sleep(1 * time.Second)
434 return handle.NewGetHandlesOK()
437 api.HandleUpdateXappSubscriptionHandleHandler = handle.UpdateXappSubscriptionHandleHandlerFunc(
438 func(params handle.UpdateXappSubscriptionHandleParams) middleware.Responder {
439 err := updateXappSubscriptionHandleImpl(subupdatechan, ¶ms.XappList, params.SubscriptionID)
441 return handle.NewUpdateXappSubscriptionHandleBadRequest()
443 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
444 time.Sleep(1 * time.Second)
445 return handle.NewUpdateXappSubscriptionHandleCreated()
448 api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
449 func(params handle.CreateNewE2tHandleParams) middleware.Responder {
450 err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
452 return handle.NewCreateNewE2tHandleBadRequest()
454 time.Sleep(1 * time.Second)
455 return handle.NewCreateNewE2tHandleCreated()
459 api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
460 func(params handle.AssociateRanToE2tHandleParams) middleware.Responder {
461 err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
463 return handle.NewAssociateRanToE2tHandleBadRequest()
465 time.Sleep(1 * time.Second)
466 return handle.NewAssociateRanToE2tHandleCreated()
470 api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
471 func(params handle.DissociateRanParams) middleware.Responder {
472 err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList)
474 return handle.NewDissociateRanBadRequest()
476 time.Sleep(1 * time.Second)
477 return handle.NewDissociateRanCreated()
481 api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
482 func(params handle.DeleteE2tHandleParams) middleware.Responder {
483 err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
485 return handle.NewDeleteE2tHandleBadRequest()
487 time.Sleep(1 * time.Second)
488 return handle.NewDeleteE2tHandleCreated()
491 api.DebugGetDebuginfoHandler = debug.GetDebuginfoHandlerFunc(
492 func(params debug.GetDebuginfoParams) middleware.Responder {
493 response, err := dumpDebugData()
495 return debug.NewGetDebuginfoCreated()
497 return debug.NewGetDebuginfoOK().WithPayload(&response)
500 // start to serve API
501 xapp.Logger.Info("Starting the HTTP Rest service")
502 if err := server.Serve(); err != nil {
503 xapp.Logger.Error(err.Error())
507 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
508 xapp.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
509 r, err := myClient.Get(xmurl)
515 if r.StatusCode == 200 {
516 xapp.Logger.Debug("http client raw response: %v", r)
517 var xapps []rtmgr.XApp
518 err = json.NewDecoder(r.Body).Decode(&xapps)
520 xapp.Logger.Warn("Json decode failed: " + err.Error())
522 xapp.Logger.Info("HTTP GET: OK")
523 xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
526 xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
530 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
533 for i := 1; i <= maxRetries; i++ {
534 time.Sleep(2 * time.Second)
535 xappData, err := httpGetXApps(xmurl)
536 if xappData != nil && err == nil {
537 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
539 xapp.Logger.Error(confErr.Error())
542 xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
543 // Combine the xapps data and platform data before writing to the SDL
544 ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: make(map[string]rtmgr.E2TInstance)}
545 writeErr := sdlEngine.WriteAll(fileName, ricData)
547 xapp.Logger.Error(writeErr.Error())
549 // post subscription req to appmgr
550 readErr = PostSubReq(xmurl, nbiif)
554 } else if err == nil {
555 readErr = errors.New("unexpected HTTP status code")
557 xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
564 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
565 sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
566 err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
568 xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
572 datach := make(chan *models.XappCallbackData, 10)
573 subschan := make(chan *models.XappSubscriptionData, 10)
574 subdelchan := make(chan *models.XappSubscriptionData, 10)
575 subupdatechan := make(chan *rtmgr.XappList, 10)
576 e2taddchan := make(chan *models.E2tData, 10)
577 associateranchan := make(chan models.RanE2tMap, 10)
578 disassociateranchan := make(chan models.RanE2tMap, 10)
579 e2tdelchan := make(chan *models.E2tDeleteData, 10)
580 xapp.Logger.Info("Launching Rest Http service")
582 r.LaunchRest(&nbiif, datach, subschan, subupdatechan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
587 data, err := r.RecvXappCallbackData(datach)
589 xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
590 } else if data != nil {
591 xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
592 alldata, err1 := httpGetXApps(xmurl)
593 if alldata != nil && err1 == nil {
595 sdlEngine.WriteXApps(fileName, alldata)
606 xapp.Logger.Debug("received XApp subscription data")
607 addSubscription(&rtmgr.Subs, data)
615 xapp.Logger.Debug("received XApp subscription delete data")
616 delSubscription(&rtmgr.Subs, data)
623 data := <-subupdatechan
624 xapp.Logger.Debug("received XApp subscription Merge data")
625 updateSubscription(data)
633 data, meiddata, _ := r.RecvNewE2Tdata(e2taddchan)
635 xapp.Logger.Debug("received create New E2T data")
637 sdlEngine.WriteNewE2TInstance(fileName, data, meiddata)
646 data := <-associateranchan
647 xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
649 sdlEngine.WriteAssRANToE2TInstance(fileName, data)
658 data := <-disassociateranchan
659 xapp.Logger.Debug("received disassociate RANs from E2T instance")
661 sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
671 xapp.Logger.Debug("received Delete E2T data")
674 sdlEngine.WriteDeleteE2TInstance(fileName, data)
684 func (r *HttpRestful) Terminate() error {
688 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
689 xapp.Logger.Debug("Adding the subscription into the subscriptions list")
691 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
692 for _, elem := range *subs {
694 xapp.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
699 *subs = append(*subs, sub)
704 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
705 xapp.Logger.Debug("Deleteing the subscription from the subscriptions list")
707 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
708 for i, elem := range *subs {
711 // Since the order of the list is not important, we are swapping the last element
712 // with the matching element and replacing the list with list(n-1) elements.
713 (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
714 *subs = (*subs)[:len(*subs)-1]
718 if present == false {
719 xapp.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
724 func updateSubscription(data *rtmgr.XappList) {
726 var subdata models.XappSubscriptionData
728 var matchingsubid, deletecount uint8
729 id = int32(data.SubscriptionID)
730 subdata.SubscriptionID = &id
731 for _, subs := range rtmgr.Subs {
732 if int32(data.SubscriptionID) == subs.SubID {
737 for deletecount < matchingsubid {
738 for _, subs := range rtmgr.Subs {
739 if int32(data.SubscriptionID) == subs.SubID {
740 subdata.SubscriptionID = &subs.SubID
741 subdata.Address = &subs.Fqdn
742 subdata.Port = &subs.Port
743 xapp.Logger.Debug("Deletion Subscription List has %v", subdata)
744 delSubscription(&rtmgr.Subs, &subdata)
751 for _, items := range data.FqdnList {
752 subdata.Address = items.Address
753 subdata.Port = items.Port
754 xapp.Logger.Debug("Adding Subscription List has %v", subdata)
755 addSubscription(&rtmgr.Subs, &subdata)