2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
19 This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 platform project (RICP).
22 ==================================================================================
25 Mnemonic: httprestful.go
26 Abstract: HTTP Restful API NBI implementation
27 Based on Swagger generated code
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
38 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39 "github.com/go-openapi/loads"
40 "github.com/go-openapi/runtime/middleware"
43 "routing-manager/pkg/models"
44 "routing-manager/pkg/restapi"
45 "routing-manager/pkg/restapi/operations"
46 "routing-manager/pkg/restapi/operations/handle"
47 "routing-manager/pkg/rpe"
48 "routing-manager/pkg/rtmgr"
49 "routing-manager/pkg/sdl"
54 //var myClient = &http.Client{Timeout: 1 * time.Second}
56 type HttpRestful struct {
58 LaunchRest LaunchRestHandler
59 RecvXappCallbackData RecvXappCallbackDataHandler
60 ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
61 RetrieveStartupData RetrieveStartupDataHandler
64 func NewHttpRestful() *HttpRestful {
65 instance := new(HttpRestful)
66 instance.LaunchRest = launchRest
67 instance.RecvXappCallbackData = recvXappCallbackData
68 instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
69 instance.RetrieveStartupData = retrieveStartupData
73 // ToDo: Use Range over channel. Read and return only the latest one.
74 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
75 var xappData *models.XappCallbackData
76 // Drain the channel as we are only looking for the latest value until
77 // xapp manager sends all xapp data with every request.
78 length := len(dataChannel)
79 //xapp.Logger.Info(length)
80 for i := 0; i <= length; i++ {
81 xapp.Logger.Info("data received")
82 // If no data received from the REST, it blocks.
83 xappData = <-dataChannel
86 var xapps []rtmgr.XApp
87 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
90 xapp.Logger.Info("No data")
93 xapp.Logger.Debug("Nothing received on the Http interface")
97 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
98 if len(callbackData.XApps) == 0 {
99 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
101 var xapps []rtmgr.XApp
102 err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
104 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
109 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
111 xapp.Logger.Debug("Received callback data")
113 err := validateXappCallbackData(data)
115 xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
123 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
124 var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
125 for _, ep := range rtmgr.Eps {
126 if ep.Ip == *data.Address && ep.Port == *data.Port {
134 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
135 data *models.XappSubscriptionData) error {
136 xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
137 err := validateXappSubscriptionData(data)
139 xapp.Logger.Error(err.Error())
143 //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
144 xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
148 func subscriptionExists(data *models.XappSubscriptionData) bool {
150 sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
151 for _, elem := range rtmgr.Subs {
160 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
161 data *models.XappSubscriptionData) error {
162 xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
163 err := validateXappSubscriptionData(data)
165 xapp.Logger.Error(err.Error())
169 if !subscriptionExists(data) {
170 xapp.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
171 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
179 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
180 subdelchan chan<- *models.XappSubscriptionData) {
181 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
184 xapp.Logger.Error(err.Error())
187 nbiUrl, err := url.Parse(*nbiif)
189 xapp.Logger.Error(err.Error())
192 api := operations.NewRoutingManagerAPI(swaggerSpec)
193 server := restapi.NewServer(api)
194 defer server.Shutdown()
196 server.Port, err = strconv.Atoi(nbiUrl.Port())
198 xapp.Logger.Error("Invalid NBI RestAPI port")
201 server.Host = "0.0.0.0"
203 api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
204 func(params handle.ProvideXappHandleParams) middleware.Responder {
205 xapp.Logger.Info("Data received on Http interface")
206 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
208 xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
209 return handle.NewProvideXappHandleBadRequest()
211 return handle.NewGetHandlesOK()
214 api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
215 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
216 err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
218 return handle.NewProvideXappSubscriptionHandleBadRequest()
220 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
221 time.Sleep(1 * time.Second)
222 return handle.NewGetHandlesOK()
225 api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
226 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
227 err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
229 return handle.NewDeleteXappSubscriptionHandleNoContent()
231 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
232 time.Sleep(1 * time.Second)
233 return handle.NewGetHandlesOK()
236 // start to serve API
237 xapp.Logger.Info("Starting the HTTP Rest service")
238 if err := server.Serve(); err != nil {
239 xapp.Logger.Error(err.Error())
243 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
244 xapp.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
245 r, err := myClient.Get(xmurl)
251 if r.StatusCode == 200 {
252 xapp.Logger.Debug("http client raw response: %v", r)
253 var xapps []rtmgr.XApp
254 err = json.NewDecoder(r.Body).Decode(&xapps)
256 xapp.Logger.Warn("Json decode failed: " + err.Error())
258 xapp.Logger.Info("HTTP GET: OK")
259 xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
262 xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
266 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
269 for i := 1; i <= maxRetries; i++ {
270 time.Sleep(2 * time.Second)
271 xappData, err := httpGetXApps(xmurl)
272 if xappData != nil && err == nil {
273 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
275 xapp.Logger.Error(confErr.Error())
278 xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
279 // Combine the xapps data and platform data before writing to the SDL
280 ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData}
281 writeErr := sdlEngine.WriteAll(fileName, ricData)
283 xapp.Logger.Error(writeErr.Error())
285 // post subscription req to appmgr
286 readErr = PostSubReq(xmurl, nbiif)
290 } else if err == nil {
291 readErr = errors.New("unexpected HTTP status code")
293 xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
300 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
301 sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
302 err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
304 xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
308 datach := make(chan *models.XappCallbackData, 10)
309 subschan := make(chan *models.XappSubscriptionData, 10)
310 subdelchan := make(chan *models.XappSubscriptionData, 10)
311 xapp.Logger.Info("Launching Rest Http service")
313 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
318 data, err := r.RecvXappCallbackData(datach)
320 xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
321 } else if data != nil {
322 xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
323 alldata, err1 := httpGetXApps(xmurl)
324 if alldata != nil && err1 == nil {
325 sdlEngine.WriteXApps(fileName, alldata)
335 xapp.Logger.Debug("received XApp subscription data")
336 addSubscription(&rtmgr.Subs, data)
344 xapp.Logger.Debug("received XApp subscription delete data")
345 delSubscription(&rtmgr.Subs, data)
353 func (r *HttpRestful) Terminate() error {
357 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
359 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
360 for _, elem := range *subs {
362 xapp.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
367 *subs = append(*subs, sub)
372 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
373 xapp.Logger.Debug("Deleteing the subscription from the subscriptions list")
375 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
376 for i, elem := range *subs {
379 // Since the order of the list is not important, we are swapping the last element
380 // with the matching element and replacing the list with list(n-1) elements.
381 (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
382 *subs = (*subs)[:len(*subs)-1]
386 if present == false {
387 xapp.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)