2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
19 This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 platform project (RICP).
22 ==================================================================================
25 Mnemonic: httprestful.go
26 Abstract: HTTP Restful API NBI implementation
27 Based on Swagger generated code
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
38 "github.com/go-openapi/loads"
39 "github.com/go-openapi/runtime/middleware"
42 "routing-manager/pkg/models"
43 "routing-manager/pkg/restapi"
44 "routing-manager/pkg/restapi/operations"
45 "routing-manager/pkg/restapi/operations/handle"
46 "routing-manager/pkg/rpe"
47 "routing-manager/pkg/rtmgr"
48 "routing-manager/pkg/sdl"
53 //var myClient = &http.Client{Timeout: 1 * time.Second}
55 type HttpRestful struct {
57 LaunchRest LaunchRestHandler
58 RecvXappCallbackData RecvXappCallbackDataHandler
59 ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
60 RetrieveStartupData RetrieveStartupDataHandler
63 func NewHttpRestful() *HttpRestful {
64 instance := new(HttpRestful)
65 instance.LaunchRest = launchRest
66 instance.RecvXappCallbackData = recvXappCallbackData
67 instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
68 instance.RetrieveStartupData = retrieveStartupData
72 // ToDo: Use Range over channel. Read and return only the latest one.
73 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
74 var xappData *models.XappCallbackData
75 // Drain the channel as we are only looking for the latest value until
76 // xapp manager sends all xapp data with every request.
77 length := len(dataChannel)
78 //rtmgr.Logger.Info(length)
79 for i := 0; i <= length; i++ {
80 rtmgr.Logger.Info("data received")
81 // If no data received from the REST, it blocks.
82 xappData = <-dataChannel
85 var xapps []rtmgr.XApp
86 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
89 rtmgr.Logger.Info("No data")
92 rtmgr.Logger.Debug("Nothing received on the Http interface")
96 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
97 if len(callbackData.XApps) == 0 {
98 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
100 var xapps []rtmgr.XApp
101 err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
103 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
108 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
110 rtmgr.Logger.Debug("Received callback data")
112 err := validateXappCallbackData(data)
114 rtmgr.Logger.Warn("XApp callback data validation failed: " + err.Error())
122 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
123 var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
124 for _, ep := range rtmgr.Eps {
125 if ep.Ip == *data.Address && ep.Port == *data.Port {
133 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
134 data *models.XappSubscriptionData) error {
135 rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
136 err := validateXappSubscriptionData(data)
138 rtmgr.Logger.Error(err.Error())
142 //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
143 rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
147 func subscriptionExists(data *models.XappSubscriptionData) bool {
149 sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
150 for _, elem := range rtmgr.Subs {
159 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
160 data *models.XappSubscriptionData) error {
161 rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
162 err := validateXappSubscriptionData(data)
164 rtmgr.Logger.Error(err.Error())
168 if !subscriptionExists(data) {
169 rtmgr.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
170 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
178 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
179 subdelchan chan<- *models.XappSubscriptionData) {
180 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
183 rtmgr.Logger.Error(err.Error())
186 nbiUrl, err := url.Parse(*nbiif)
188 rtmgr.Logger.Error(err.Error())
191 api := operations.NewRoutingManagerAPI(swaggerSpec)
192 server := restapi.NewServer(api)
193 defer server.Shutdown()
195 server.Port, err = strconv.Atoi(nbiUrl.Port())
197 rtmgr.Logger.Error("Invalid NBI RestAPI port")
200 server.Host = "0.0.0.0"
202 api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
203 func(params handle.ProvideXappHandleParams) middleware.Responder {
204 rtmgr.Logger.Info("Data received on Http interface")
205 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
207 rtmgr.Logger.Error("Invalid XApp callback data: " + err.Error())
208 return handle.NewProvideXappHandleBadRequest()
210 return handle.NewGetHandlesOK()
213 api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
214 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
215 err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
217 return handle.NewProvideXappSubscriptionHandleBadRequest()
219 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
220 time.Sleep(1 * time.Second)
221 return handle.NewGetHandlesOK()
224 api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
225 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
226 err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
228 return handle.NewDeleteXappSubscriptionHandleNoContent()
230 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
231 time.Sleep(1 * time.Second)
232 return handle.NewGetHandlesOK()
235 // start to serve API
236 rtmgr.Logger.Info("Starting the HTTP Rest service")
237 if err := server.Serve(); err != nil {
238 rtmgr.Logger.Error(err.Error())
242 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
243 rtmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
244 r, err := myClient.Get(xmurl)
250 if r.StatusCode == 200 {
251 rtmgr.Logger.Debug("http client raw response: %v", r)
252 var xapps []rtmgr.XApp
253 err = json.NewDecoder(r.Body).Decode(&xapps)
255 rtmgr.Logger.Warn("Json decode failed: " + err.Error())
257 rtmgr.Logger.Info("HTTP GET: OK")
258 rtmgr.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
261 rtmgr.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
265 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
268 for i := 1; i <= maxRetries; i++ {
269 time.Sleep(2 * time.Second)
270 xappData, err := httpGetXApps(xmurl)
271 if xappData != nil && err == nil {
272 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
274 rtmgr.Logger.Error(confErr.Error())
277 rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
278 // Combine the xapps data and platform data before writing to the SDL
279 ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData}
280 writeErr := sdlEngine.WriteAll(fileName, ricData)
282 rtmgr.Logger.Error(writeErr.Error())
284 // post subscription req to appmgr
285 readErr = PostSubReq(xmurl, nbiif)
289 } else if err == nil {
290 readErr = errors.New("unexpected HTTP status code")
292 rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
299 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
300 sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
301 err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
303 rtmgr.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
307 datach := make(chan *models.XappCallbackData, 10)
308 subschan := make(chan *models.XappSubscriptionData, 10)
309 subdelchan := make(chan *models.XappSubscriptionData, 10)
310 rtmgr.Logger.Info("Launching Rest Http service")
312 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
317 data, err := r.RecvXappCallbackData(datach)
319 rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
320 } else if data != nil {
321 rtmgr.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
322 alldata, err1 := httpGetXApps(xmurl)
323 if alldata != nil && err1 == nil {
324 sdlEngine.WriteXApps(fileName, alldata)
334 rtmgr.Logger.Debug("received XApp subscription data")
335 addSubscription(&rtmgr.Subs, data)
343 rtmgr.Logger.Debug("received XApp subscription delete data")
344 delSubscription(&rtmgr.Subs, data)
352 func (r *HttpRestful) Terminate() error {
356 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
358 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
359 for _, elem := range *subs {
361 rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
366 *subs = append(*subs, sub)
371 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
372 rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
374 sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
375 for i, elem := range *subs {
378 // Since the order of the list is not important, we are swapping the last element
379 // with the matching element and replacing the list with list(n-1) elements.
380 (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
381 *subs = (*subs)[:len(*subs)-1]
385 if present == false {
386 rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)