2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
19 This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 platform project (RICP).
22 ==================================================================================
25 Mnemonic: httprestful.go
26 Abstract: HTTP Restful API NBI implementation
27 Based on Swagger generated code
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
38 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39 "github.com/go-openapi/loads"
40 "github.com/go-openapi/runtime/middleware"
44 "routing-manager/pkg/models"
45 "routing-manager/pkg/restapi"
46 "routing-manager/pkg/restapi/operations"
47 "routing-manager/pkg/restapi/operations/handle"
48 "routing-manager/pkg/rpe"
49 "routing-manager/pkg/rtmgr"
50 "routing-manager/pkg/sdl"
57 //var myClient = &http.Client{Timeout: 1 * time.Second}
59 type HttpRestful struct {
61 LaunchRest LaunchRestHandler
62 RecvXappCallbackData RecvXappCallbackDataHandler
63 RecvNewE2Tdata RecvNewE2TdataHandler
64 ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
65 RetrieveStartupData RetrieveStartupDataHandler
68 func NewHttpRestful() *HttpRestful {
69 instance := new(HttpRestful)
70 instance.LaunchRest = launchRest
71 instance.RecvXappCallbackData = recvXappCallbackData
72 instance.RecvNewE2Tdata = recvNewE2Tdata
73 instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
74 instance.RetrieveStartupData = retrieveStartupData
78 // ToDo: Use Range over channel. Read and return only the latest one.
79 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
80 var xappData *models.XappCallbackData
81 // Drain the channel as we are only looking for the latest value until
82 // xapp manager sends all xapp data with every request.
83 length := len(dataChannel)
84 //xapp.Logger.Info(length)
85 for i := 0; i <= length; i++ {
86 xapp.Logger.Info("data received")
87 // If no data received from the REST, it blocks.
88 xappData = <-dataChannel
91 var xapps []rtmgr.XApp
92 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
95 xapp.Logger.Info("No data")
98 xapp.Logger.Debug("Nothing received on the Http interface")
102 func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance,string,error) {
103 var e2tData *models.E2tData
105 xapp.Logger.Info("data received")
107 e2tData = <-dataChannel
111 e2tinst := rtmgr.E2TInstance {
112 Ranlist : make([]string, len(e2tData.RanNamelist)),
115 e2tinst.Fqdn = *e2tData.E2TAddress
116 e2tinst.Name = "E2TERMINST"
117 copy(e2tinst.Ranlist, e2tData.RanNamelist)
118 if (len(e2tData.RanNamelist) > 0) {
120 for _, meid := range e2tData.RanNamelist {
123 str = "mme_ar|" + *e2tData.E2TAddress + "|" + strings.TrimSuffix(meidar," ")
125 return &e2tinst,str,nil
128 xapp.Logger.Info("No data")
131 xapp.Logger.Debug("Nothing received on the Http interface")
135 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
136 if len(callbackData.XApps) == 0 {
137 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
139 var xapps []rtmgr.XApp
140 err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
142 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
147 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
149 xapp.Logger.Debug("Received callback data")
151 err := validateXappCallbackData(data)
153 xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
161 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
162 var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
163 for _, ep := range rtmgr.Eps {
164 if ep.Ip == *data.Address && ep.Port == *data.Port {
172 func validateE2tData(data *models.E2tData) error {
174 e2taddress_key := *data.E2TAddress
175 if (e2taddress_key == "") {
176 return fmt.Errorf("E2TAddress is empty!!!")
178 stringSlice := strings.Split(e2taddress_key, ":")
179 if (len(stringSlice) == 1) {
180 return fmt.Errorf("E2T E2TAddress is not a proper format like ip:port, %v", e2taddress_key )
183 _, err := net.LookupIP(stringSlice[0])
185 return fmt.Errorf("E2T E2TAddress DNS look up failed, E2TAddress: %v", stringSlice[0])
188 if checkValidaE2TAddress(e2taddress_key) {
189 return fmt.Errorf("E2TAddress already exist!!!, E2TAddress: %v",e2taddress_key)
195 func validateDeleteE2tData(data *models.E2tDeleteData) error {
197 if (*data.E2TAddress == "") {
198 return fmt.Errorf("E2TAddress is empty!!!")
201 for _, element := range data.RanAssocList {
202 e2taddress_key := *element.E2TAddress
203 stringSlice := strings.Split(e2taddress_key, ":")
205 if (len(stringSlice) == 1) {
206 return fmt.Errorf("E2T Delete - RanAssocList E2TAddress is not a proper format like ip:port, %v", e2taddress_key)
210 if !checkValidaE2TAddress(e2taddress_key) {
211 return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key)
218 func checkValidaE2TAddress(e2taddress string) bool {
220 _, exist := rtmgr.Eps[e2taddress]
225 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
226 data *models.XappSubscriptionData) error {
227 xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
228 err := validateXappSubscriptionData(data)
230 xapp.Logger.Error(err.Error())
234 //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
235 xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
239 func subscriptionExists(data *models.XappSubscriptionData) bool {
241 sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
242 for _, elem := range rtmgr.Subs {
251 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
252 data *models.XappSubscriptionData) error {
253 xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
254 err := validateXappSubscriptionData(data)
256 xapp.Logger.Error(err.Error())
260 if !subscriptionExists(data) {
261 xapp.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
262 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
270 func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
271 data *models.E2tData) error {
272 xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
273 err := validateE2tData(data)
275 xapp.Logger.Error(err.Error())
282 func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
284 xapp.Logger.Debug("Invoked.validateE2TAddressRANListData : %v", assRanE2tData)
286 for _, element := range assRanE2tData {
287 if *element.E2TAddress == "" {
288 return fmt.Errorf("E2T Instance - E2TAddress is empty!!!")
291 e2taddress_key := *element.E2TAddress
292 if !checkValidaE2TAddress(e2taddress_key) {
293 return fmt.Errorf("E2TAddress doesn't exist!!!, E2TAddress: %v",e2taddress_key)
300 func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap,
301 data models.RanE2tMap) error {
302 xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
303 err := validateE2TAddressRANListData(data)
305 xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error())
312 func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap,
313 data models.RanE2tMap) error {
314 xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
315 err := validateE2TAddressRANListData(data)
317 xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error())
320 disassranchan <- data
324 func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
325 data *models.E2tDeleteData) error {
326 xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
328 err := validateDeleteE2tData(data)
330 xapp.Logger.Error(err.Error())
338 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
339 subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) {
340 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
343 xapp.Logger.Error(err.Error())
346 nbiUrl, err := url.Parse(*nbiif)
348 xapp.Logger.Error(err.Error())
351 api := operations.NewRoutingManagerAPI(swaggerSpec)
352 server := restapi.NewServer(api)
353 defer server.Shutdown()
355 server.Port, err = strconv.Atoi(nbiUrl.Port())
357 xapp.Logger.Error("Invalid NBI RestAPI port")
360 server.Host = "0.0.0.0"
362 api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
363 func(params handle.ProvideXappHandleParams) middleware.Responder {
364 xapp.Logger.Info("Data received on Http interface")
365 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
367 xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
368 return handle.NewProvideXappHandleBadRequest()
370 return handle.NewGetHandlesOK()
373 api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
374 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
375 err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
377 return handle.NewProvideXappSubscriptionHandleBadRequest()
379 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
380 time.Sleep(1 * time.Second)
381 return handle.NewGetHandlesOK()
384 api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
385 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
386 err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
388 return handle.NewDeleteXappSubscriptionHandleNoContent()
390 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
391 time.Sleep(1 * time.Second)
392 return handle.NewGetHandlesOK()
395 api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
396 func(params handle.CreateNewE2tHandleParams) middleware.Responder {
397 err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
399 return handle.NewCreateNewE2tHandleBadRequest()
401 time.Sleep(1 * time.Second)
402 return handle.NewCreateNewE2tHandleCreated()
406 api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
407 func(params handle.AssociateRanToE2tHandleParams) middleware.Responder {
408 err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
410 return handle.NewAssociateRanToE2tHandleBadRequest()
412 time.Sleep(1 * time.Second)
413 return handle.NewAssociateRanToE2tHandleCreated()
417 api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
418 func(params handle.DissociateRanParams) middleware.Responder {
419 err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList)
421 return handle.NewDissociateRanBadRequest()
423 time.Sleep(1 * time.Second)
424 return handle.NewDissociateRanCreated()
428 api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
429 func(params handle.DeleteE2tHandleParams) middleware.Responder {
430 err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
432 return handle.NewDeleteE2tHandleBadRequest()
434 time.Sleep(1 * time.Second)
435 return handle.NewDeleteE2tHandleCreated()
438 // start to serve API
439 xapp.Logger.Info("Starting the HTTP Rest service")
440 if err := server.Serve(); err != nil {
441 xapp.Logger.Error(err.Error())
445 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
446 xapp.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
447 r, err := myClient.Get(xmurl)
453 if r.StatusCode == 200 {
454 xapp.Logger.Debug("http client raw response: %v", r)
455 var xapps []rtmgr.XApp
456 err = json.NewDecoder(r.Body).Decode(&xapps)
458 xapp.Logger.Warn("Json decode failed: " + err.Error())
460 xapp.Logger.Info("HTTP GET: OK")
461 xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
464 xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
468 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
471 for i := 1; i <= maxRetries; i++ {
472 time.Sleep(2 * time.Second)
473 xappData, err := httpGetXApps(xmurl)
474 if xappData != nil && err == nil {
475 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
477 xapp.Logger.Error(confErr.Error())
480 xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
481 // Combine the xapps data and platform data before writing to the SDL
482 ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: make(map[string]rtmgr.E2TInstance)}
483 writeErr := sdlEngine.WriteAll(fileName, ricData)
485 xapp.Logger.Error(writeErr.Error())
487 // post subscription req to appmgr
488 readErr = PostSubReq(xmurl, nbiif)
492 } else if err == nil {
493 readErr = errors.New("unexpected HTTP status code")
495 xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
502 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
503 sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
504 err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
506 xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
510 datach := make(chan *models.XappCallbackData, 10)
511 subschan := make(chan *models.XappSubscriptionData, 10)
512 subdelchan := make(chan *models.XappSubscriptionData, 10)
513 e2taddchan := make(chan *models.E2tData, 10)
514 associateranchan := make(chan models.RanE2tMap, 10)
515 disassociateranchan := make(chan models.RanE2tMap, 10)
516 e2tdelchan := make(chan *models.E2tDeleteData, 10)
517 xapp.Logger.Info("Launching Rest Http service")
519 r.LaunchRest(&nbiif, datach, subschan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
524 data, err := r.RecvXappCallbackData(datach)
526 xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
527 } else if data != nil {
528 xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
529 alldata, err1 := httpGetXApps(xmurl)
530 if alldata != nil && err1 == nil {
532 sdlEngine.WriteXApps(fileName, alldata)
543 xapp.Logger.Debug("received XApp subscription data")
544 addSubscription(&rtmgr.Subs, data)
552 xapp.Logger.Debug("received XApp subscription delete data")
553 delSubscription(&rtmgr.Subs, data)
560 xapp.Logger.Debug("received create New E2T data")
562 data, meiddata,_ := r.RecvNewE2Tdata(e2taddchan)
565 sdlEngine.WriteNewE2TInstance(fileName, data,meiddata)
574 data := <-associateranchan
575 xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
577 sdlEngine.WriteAssRANToE2TInstance(fileName, data)
586 data := <-disassociateranchan
587 xapp.Logger.Debug("received disassociate RANs from E2T instance")
589 sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
597 xapp.Logger.Debug("received Delete E2T data")
602 sdlEngine.WriteDeleteE2TInstance(fileName, data)
612 func (r *HttpRestful) Terminate() error {
616 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
618 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
619 for _, elem := range *subs {
621 xapp.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
626 *subs = append(*subs, sub)
631 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
632 xapp.Logger.Debug("Deleteing the subscription from the subscriptions list")
634 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
635 for i, elem := range *subs {
638 // Since the order of the list is not important, we are swapping the last element
639 // with the matching element and replacing the list with list(n-1) elements.
640 (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
641 *subs = (*subs)[:len(*subs)-1]
645 if present == false {
646 xapp.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)