2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
19 This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 platform project (RICP).
22 ==================================================================================
25 Mnemonic: httprestful.go
26 Abstract: HTTP Restful API NBI implementation
27 Based on Swagger generated code
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
38 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39 "github.com/go-openapi/loads"
40 "github.com/go-openapi/runtime/middleware"
44 "routing-manager/pkg/models"
45 "routing-manager/pkg/restapi"
46 "routing-manager/pkg/restapi/operations"
47 "routing-manager/pkg/restapi/operations/handle"
48 "routing-manager/pkg/rpe"
49 "routing-manager/pkg/rtmgr"
50 "routing-manager/pkg/sdl"
57 //var myClient = &http.Client{Timeout: 1 * time.Second}
59 type HttpRestful struct {
61 LaunchRest LaunchRestHandler
62 RecvXappCallbackData RecvXappCallbackDataHandler
63 RecvNewE2Tdata RecvNewE2TdataHandler
64 ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
65 RetrieveStartupData RetrieveStartupDataHandler
68 func NewHttpRestful() *HttpRestful {
69 instance := new(HttpRestful)
70 instance.LaunchRest = launchRest
71 instance.RecvXappCallbackData = recvXappCallbackData
72 instance.RecvNewE2Tdata = recvNewE2Tdata
73 instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
74 instance.RetrieveStartupData = retrieveStartupData
78 // ToDo: Use Range over channel. Read and return only the latest one.
79 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
80 var xappData *models.XappCallbackData
81 // Drain the channel as we are only looking for the latest value until
82 // xapp manager sends all xapp data with every request.
83 length := len(dataChannel)
84 //xapp.Logger.Info(length)
85 for i := 0; i <= length; i++ {
86 xapp.Logger.Info("data received")
87 // If no data received from the REST, it blocks.
88 xappData = <-dataChannel
91 var xapps []rtmgr.XApp
92 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
95 xapp.Logger.Info("No data")
98 xapp.Logger.Debug("Nothing received on the Http interface")
102 func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, error) {
103 var e2tData *models.E2tData
104 xapp.Logger.Info("data received")
106 e2tData = <-dataChannel
110 e2tinst := rtmgr.E2TInstance {
111 Ranlist : make([]string, len(e2tData.RanNamelist)),
114 e2tinst.Fqdn = *e2tData.E2TAddress
115 e2tinst.Name = "E2TERMINST"
116 copy(e2tinst.Ranlist, e2tData.RanNamelist)
121 xapp.Logger.Info("No data")
124 xapp.Logger.Debug("Nothing received on the Http interface")
128 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
129 if len(callbackData.XApps) == 0 {
130 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
132 var xapps []rtmgr.XApp
133 err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
135 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
140 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
142 xapp.Logger.Debug("Received callback data")
144 err := validateXappCallbackData(data)
146 xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
154 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
155 var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
156 for _, ep := range rtmgr.Eps {
157 if ep.Ip == *data.Address && ep.Port == *data.Port {
165 func validateE2tData(data *models.E2tData) error {
167 e2taddress_key := *data.E2TAddress
168 if (e2taddress_key == "") {
169 return fmt.Errorf("E2TAddress is empty!!!")
171 stringSlice := strings.Split(e2taddress_key, ":")
172 if (len(stringSlice) == 1) {
173 return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key )
176 _, err := net.LookupIP(stringSlice[0])
178 return fmt.Errorf("E2T E2TAddress DNS look up failed, E2TAddress: %v", stringSlice[0])
181 if checkValidaE2TAddress(e2taddress_key) {
182 return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v",e2taddress_key)
188 func validateDeleteE2tData(data *models.E2tDeleteData) error {
190 if (*data.E2TAddress == "") {
191 return fmt.Errorf("E2TAddress is empty!!!")
194 for _, element := range data.RanAssocList {
195 e2taddress_key := *element.E2TAddress
196 stringSlice := strings.Split(e2taddress_key, ":")
198 if (len(stringSlice) == 1) {
199 return fmt.Errorf("E2T Delete - RanAssocList E2TAddress is not a proper format like ip:port, %v", e2taddress_key)
203 if !checkValidaE2TAddress(e2taddress_key) {
204 return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key)
211 func checkValidaE2TAddress(e2taddress string) bool {
213 _, exist := rtmgr.Eps[e2taddress]
218 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
219 data *models.XappSubscriptionData) error {
220 xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
221 err := validateXappSubscriptionData(data)
223 xapp.Logger.Error(err.Error())
227 //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
228 xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
232 func subscriptionExists(data *models.XappSubscriptionData) bool {
234 sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
235 for _, elem := range rtmgr.Subs {
244 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
245 data *models.XappSubscriptionData) error {
246 xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
247 err := validateXappSubscriptionData(data)
249 xapp.Logger.Error(err.Error())
253 if !subscriptionExists(data) {
254 xapp.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
255 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
263 func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
264 data *models.E2tData) error {
265 xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
266 err := validateE2tData(data)
268 xapp.Logger.Error(err.Error())
275 func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
277 xapp.Logger.Debug("Invoked.validateE2TAddressRANListData : %v", assRanE2tData)
279 for _, element := range assRanE2tData {
280 if *element.E2TAddress == "" {
281 return fmt.Errorf("E2T Instance - E2TAddress is empty!!!")
284 e2taddress_key := *element.E2TAddress
285 if !checkValidaE2TAddress(e2taddress_key) {
286 return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key)
293 func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap,
294 data models.RanE2tMap) error {
295 xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
296 err := validateE2TAddressRANListData(data)
298 xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error())
305 func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap,
306 data models.RanE2tMap) error {
307 xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
308 err := validateE2TAddressRANListData(data)
310 xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error())
313 disassranchan <- data
317 func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
318 data *models.E2tDeleteData) error {
319 xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
321 err := validateDeleteE2tData(data)
323 xapp.Logger.Error(err.Error())
331 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
332 subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) {
333 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
336 xapp.Logger.Error(err.Error())
339 nbiUrl, err := url.Parse(*nbiif)
341 xapp.Logger.Error(err.Error())
344 api := operations.NewRoutingManagerAPI(swaggerSpec)
345 server := restapi.NewServer(api)
346 defer server.Shutdown()
348 server.Port, err = strconv.Atoi(nbiUrl.Port())
350 xapp.Logger.Error("Invalid NBI RestAPI port")
353 server.Host = "0.0.0.0"
355 api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
356 func(params handle.ProvideXappHandleParams) middleware.Responder {
357 xapp.Logger.Info("Data received on Http interface")
358 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
360 xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
361 return handle.NewProvideXappHandleBadRequest()
363 return handle.NewGetHandlesOK()
366 api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
367 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
368 err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
370 return handle.NewProvideXappSubscriptionHandleBadRequest()
372 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
373 time.Sleep(1 * time.Second)
374 return handle.NewGetHandlesOK()
377 api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
378 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
379 err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
381 return handle.NewDeleteXappSubscriptionHandleNoContent()
383 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
384 time.Sleep(1 * time.Second)
385 return handle.NewGetHandlesOK()
388 api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
389 func(params handle.CreateNewE2tHandleParams) middleware.Responder {
390 err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
392 return handle.NewCreateNewE2tHandleBadRequest()
394 time.Sleep(1 * time.Second)
395 return handle.NewCreateNewE2tHandleCreated()
399 api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
400 func(params handle.AssociateRanToE2tHandleParams) middleware.Responder {
401 err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
403 return handle.NewAssociateRanToE2tHandleBadRequest()
405 time.Sleep(1 * time.Second)
406 return handle.NewAssociateRanToE2tHandleCreated()
410 api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
411 func(params handle.DissociateRanParams) middleware.Responder {
412 err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList)
414 return handle.NewDissociateRanBadRequest()
416 time.Sleep(1 * time.Second)
417 return handle.NewDissociateRanCreated()
421 api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
422 func(params handle.DeleteE2tHandleParams) middleware.Responder {
423 err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
425 return handle.NewDeleteE2tHandleBadRequest()
427 time.Sleep(1 * time.Second)
428 return handle.NewDeleteE2tHandleCreated()
431 // start to serve API
432 xapp.Logger.Info("Starting the HTTP Rest service")
433 if err := server.Serve(); err != nil {
434 xapp.Logger.Error(err.Error())
438 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
439 xapp.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
440 r, err := myClient.Get(xmurl)
446 if r.StatusCode == 200 {
447 xapp.Logger.Debug("http client raw response: %v", r)
448 var xapps []rtmgr.XApp
449 err = json.NewDecoder(r.Body).Decode(&xapps)
451 xapp.Logger.Warn("Json decode failed: " + err.Error())
453 xapp.Logger.Info("HTTP GET: OK")
454 xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
457 xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
461 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
464 for i := 1; i <= maxRetries; i++ {
465 time.Sleep(2 * time.Second)
466 xappData, err := httpGetXApps(xmurl)
467 if xappData != nil && err == nil {
468 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
470 xapp.Logger.Error(confErr.Error())
473 xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
474 // Combine the xapps data and platform data before writing to the SDL
475 ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: make(map[string]rtmgr.E2TInstance)}
476 writeErr := sdlEngine.WriteAll(fileName, ricData)
478 xapp.Logger.Error(writeErr.Error())
480 // post subscription req to appmgr
481 readErr = PostSubReq(xmurl, nbiif)
485 } else if err == nil {
486 readErr = errors.New("unexpected HTTP status code")
488 xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
495 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
496 sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
497 err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
499 xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
503 datach := make(chan *models.XappCallbackData, 10)
504 subschan := make(chan *models.XappSubscriptionData, 10)
505 subdelchan := make(chan *models.XappSubscriptionData, 10)
506 e2taddchan := make(chan *models.E2tData, 10)
507 associateranchan := make(chan models.RanE2tMap, 10)
508 disassociateranchan := make(chan models.RanE2tMap, 10)
509 e2tdelchan := make(chan *models.E2tDeleteData, 10)
510 xapp.Logger.Info("Launching Rest Http service")
512 r.LaunchRest(&nbiif, datach, subschan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
517 data, err := r.RecvXappCallbackData(datach)
519 xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
520 } else if data != nil {
521 xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
522 alldata, err1 := httpGetXApps(xmurl)
523 if alldata != nil && err1 == nil {
525 sdlEngine.WriteXApps(fileName, alldata)
536 xapp.Logger.Debug("received XApp subscription data")
537 addSubscription(&rtmgr.Subs, data)
545 xapp.Logger.Debug("received XApp subscription delete data")
546 delSubscription(&rtmgr.Subs, data)
553 xapp.Logger.Debug("received create New E2T data")
555 data, _ := r.RecvNewE2Tdata(e2taddchan)
558 sdlEngine.WriteNewE2TInstance(fileName, data)
567 data := <-associateranchan
568 xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
570 sdlEngine.WriteAssRANToE2TInstance(fileName, data)
579 data := <-disassociateranchan
580 xapp.Logger.Debug("received disassociate RANs from E2T instance")
582 sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
590 xapp.Logger.Debug("received Delete E2T data")
595 sdlEngine.WriteDeleteE2TInstance(fileName, data)
605 func (r *HttpRestful) Terminate() error {
609 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
611 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
612 for _, elem := range *subs {
614 xapp.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
619 *subs = append(*subs, sub)
624 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
625 xapp.Logger.Debug("Deleteing the subscription from the subscriptions list")
627 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
628 for i, elem := range *subs {
631 // Since the order of the list is not important, we are swapping the last element
632 // with the matching element and replacing the list with list(n-1) elements.
633 (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
634 *subs = (*subs)[:len(*subs)-1]
638 if present == false {
639 xapp.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)