2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
20 Mnemonic: httprestful.go
21 Abstract: HTTP Restful API NBI implementation
22 Based on Swagger generated code
36 "routing-manager/pkg/rtmgr"
37 "routing-manager/pkg/rpe"
38 "routing-manager/pkg/sdl"
39 "routing-manager/pkg/models"
40 "routing-manager/pkg/restapi"
41 "routing-manager/pkg/restapi/operations"
42 "github.com/go-openapi/runtime/middleware"
43 "routing-manager/pkg/restapi/operations/handle"
44 loads "github.com/go-openapi/loads"
47 //var myClient = &http.Client{Timeout: 1 * time.Second}
49 type HttpRestful struct {
51 LaunchRest LaunchRestHandler
52 RecvXappCallbackData RecvXappCallbackDataHandler
53 ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
54 RetrieveStartupData RetrieveStartupDataHandler
57 func NewHttpRestful() *HttpRestful {
58 instance := new(HttpRestful)
59 instance.LaunchRest = launchRest
60 instance.RecvXappCallbackData = recvXappCallbackData
61 instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
62 instance.RetrieveStartupData = retrieveStartupData
66 // ToDo: Use Range over channel. Read and return only the latest one.
67 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
68 var xappData *models.XappCallbackData
69 // Drain the channel as we are only looking for the latest value until
70 // xapp manager sends all xapp data with every request.
71 length := len(dataChannel)
72 //rtmgr.Logger.Info(length)
73 for i := 0; i <= length; i++ {
74 rtmgr.Logger.Info("data received")
75 // If no data received from the REST, it blocks.
76 xappData = <-dataChannel
79 var xapps []rtmgr.XApp
80 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
83 rtmgr.Logger.Info("No data")
86 rtmgr.Logger.Debug("Nothing received on the Http interface")
91 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
92 if len(callbackData.XApps) == 0 {
93 return fmt.Errorf("Invalid Data field: \"%s\"", callbackData.XApps)
95 var xapps []rtmgr.XApp
96 err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
98 return fmt.Errorf("Unmarshal failed: \"%s\"", err.Error())
103 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
105 rtmgr.Logger.Debug("Received callback data")
107 err := validateXappCallbackData(data)
109 rtmgr.Logger.Warn("XApp callback data validation failed: "+err.Error())
117 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
118 var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
119 for _, ep := range rtmgr.Eps {
120 if ep.Ip == *data.Address && ep.Port == *data.Port {
128 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
129 data *models.XappSubscriptionData) error {
130 rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
131 err := validateXappSubscriptionData(data)
133 rtmgr.Logger.Error(err.Error())
137 //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
138 rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
142 func subscriptionExists(data *models.XappSubscriptionData) bool {
144 sub := rtmgr.Subscription{SubID:*data.SubscriptionID, Fqdn:*data.Address, Port:*data.Port,}
145 for _, elem := range rtmgr.Subs {
154 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
155 data *models.XappSubscriptionData) error {
156 rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
157 err := validateXappSubscriptionData(data)
159 rtmgr.Logger.Error(err.Error())
163 if !subscriptionExists(data) {
164 rtmgr.Logger.Warn("Subscription not found: %d", *data.SubscriptionID)
165 err := fmt.Errorf("Subscription not found: %d", *data.SubscriptionID)
173 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
174 subdelchan chan<- *models.XappSubscriptionData) {
175 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
178 rtmgr.Logger.Error(err.Error())
181 nbiUrl, err := url.Parse(*nbiif)
183 rtmgr.Logger.Error(err.Error())
186 api := operations.NewRoutingManagerAPI(swaggerSpec)
187 server := restapi.NewServer(api)
188 defer server.Shutdown()
190 server.Port, err = strconv.Atoi(nbiUrl.Port())
192 rtmgr.Logger.Error("Invalid NBI RestAPI port")
195 server.Host = "0.0.0.0"
197 api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
198 func(params handle.ProvideXappHandleParams) middleware.Responder {
199 rtmgr.Logger.Info("Data received on Http interface")
200 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
202 rtmgr.Logger.Error("Invalid XApp callback data: "+err.Error())
203 return handle.NewProvideXappHandleBadRequest()
205 return handle.NewGetHandlesOK()
208 api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
209 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
210 err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
212 return handle.NewProvideXappSubscriptionHandleBadRequest()
214 return handle.NewGetHandlesOK()
217 api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
218 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
219 err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
221 return handle.NewDeleteXappSubscriptionHandleNoContent()
223 return handle.NewGetHandlesOK()
226 // start to serve API
227 rtmgr.Logger.Info("Starting the HTTP Rest service")
228 if err := server.Serve(); err != nil {
229 rtmgr.Logger.Error(err.Error())
233 func httpGetXapps(xmurl string) (*[]rtmgr.XApp, error) {
234 rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
235 r, err := myClient.Get(xmurl)
241 if r.StatusCode == 200 {
242 rtmgr.Logger.Debug("http client raw response: %v", r)
243 var xapps []rtmgr.XApp
244 err = json.NewDecoder(r.Body).Decode(&xapps)
246 rtmgr.Logger.Warn("Json decode failed: " + err.Error())
248 rtmgr.Logger.Info("HTTP GET: OK")
249 rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
252 rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
256 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.SdlEngine) error {
260 for i := 1; i <= maxRetries; i++ {
261 time.Sleep(2 * time.Second)
263 xappData, err := httpGetXapps(xmurl)
265 if xappData != nil && err == nil {
266 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
268 rtmgr.Logger.Error(confErr.Error())
272 rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
273 // Combine the xapps data and platform data before writing to the SDL
274 ricData := &rtmgr.RicComponents{Xapps: *xappData, Pcs: *pcData}
276 writeErr := sdlEngine.WriteAll(fileName, ricData)
278 rtmgr.Logger.Error(writeErr.Error())
280 // post subscription req to appmgr
281 readErr = PostSubReq(xmurl, nbiif)
285 } else if err == nil {
286 readErr = errors.New("Unexpected HTTP status code")
288 rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
295 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
296 sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
297 err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
299 rtmgr.Logger.Error("Exiting as nbi failed to get the intial startup data from the xapp manager: " + err.Error())
303 datach := make(chan *models.XappCallbackData, 10)
304 subschan := make(chan *models.XappSubscriptionData, 10)
305 subdelchan := make(chan *models.XappSubscriptionData, 10)
306 rtmgr.Logger.Info("Launching Rest Http service")
308 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
313 data, err := r.RecvXappCallbackData(datach)
315 rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
316 } else if data != nil {
317 sdlEngine.WriteXapps(fileName, data)
326 rtmgr.Logger.Debug("received XApp subscription data")
327 addSubscription(&rtmgr.Subs, data)
335 rtmgr.Logger.Debug("received XApp subscription delete data")
336 delSubscription(&rtmgr.Subs, data)
344 func (r *HttpRestful) Terminate() error {
348 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
350 sub := rtmgr.Subscription{SubID:*xappSubData.SubscriptionID, Fqdn:*xappSubData.Address, Port:*xappSubData.Port,}
351 for _, elem := range *subs {
353 rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
358 *subs = append(*subs, sub)
363 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
364 rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
365 var present bool = false
366 sub := rtmgr.Subscription{SubID:*xappSubData.SubscriptionID, Fqdn:*xappSubData.Address, Port:*xappSubData.Port,}
367 for i, elem := range *subs {
370 // Since the order of the list is not important, we are swapping the last element
371 // with the matching element and replacing the list with list(n-1) elements.
372 (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
373 *subs = (*subs)[:len(*subs)-1]
377 if present == false {
378 rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)