4124c021c3bbea4e079972169502004a62c950ed
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /*
20   Mnemonic:     httprestful.go
21   Abstract:     HTTP Restful API NBI implementation
22                 Based on Swagger generated code
23   Date:         25 March 2019
24 */
25
26 package nbi
27
28 import (
29         "encoding/json"
30         "errors"
31         "fmt"
32         "github.com/go-openapi/loads"
33         "github.com/go-openapi/runtime/middleware"
34         "net/url"
35         "os"
36         "routing-manager/pkg/models"
37         "routing-manager/pkg/restapi"
38         "routing-manager/pkg/restapi/operations"
39         "routing-manager/pkg/restapi/operations/handle"
40         "routing-manager/pkg/rpe"
41         "routing-manager/pkg/rtmgr"
42         "routing-manager/pkg/sdl"
43         "strconv"
44         "time"
45 )
46
47 //var myClient = &http.Client{Timeout: 1 * time.Second}
48
49 type HttpRestful struct {
50         NbiEngine
51         LaunchRest                   LaunchRestHandler
52         RecvXappCallbackData         RecvXappCallbackDataHandler
53         ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
54         RetrieveStartupData          RetrieveStartupDataHandler
55 }
56
57 func NewHttpRestful() *HttpRestful {
58         instance := new(HttpRestful)
59         instance.LaunchRest = launchRest
60         instance.RecvXappCallbackData = recvXappCallbackData
61         instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
62         instance.RetrieveStartupData = retrieveStartupData
63         return instance
64 }
65
66 // ToDo: Use Range over channel. Read and return only the latest one.
67 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
68         var xappData *models.XappCallbackData
69         // Drain the channel as we are only looking for the latest value until
70         // xapp manager sends all xapp data with every request.
71         length := len(dataChannel)
72         //rtmgr.Logger.Info(length)
73         for i := 0; i <= length; i++ {
74                 rtmgr.Logger.Info("data received")
75                 // If no data received from the REST, it blocks.
76                 xappData = <-dataChannel
77         }
78         if nil != xappData {
79                 var xapps []rtmgr.XApp
80                 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
81                 return &xapps, err
82         } else {
83                 rtmgr.Logger.Info("No data")
84         }
85
86         rtmgr.Logger.Debug("Nothing received on the Http interface")
87         return nil, nil
88 }
89
90 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
91         if len(callbackData.XApps) == 0 {
92                 return fmt.Errorf("Invalid Data field: \"%s\"", callbackData.XApps)
93         }
94         var xapps []rtmgr.XApp
95         err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
96         if err != nil {
97                 return fmt.Errorf("Unmarshal failed: \"%s\"", err.Error())
98         }
99         return nil
100 }
101
102 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
103         if data != nil {
104                 rtmgr.Logger.Debug("Received callback data")
105         }
106         err := validateXappCallbackData(data)
107         if err != nil {
108                 rtmgr.Logger.Warn("XApp callback data validation failed: " + err.Error())
109                 return err
110         } else {
111                 datach <- data
112                 return nil
113         }
114 }
115
116 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
117         var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
118         for _, ep := range rtmgr.Eps {
119                 if ep.Ip == *data.Address && ep.Port == *data.Port {
120                         err = nil
121                         break
122                 }
123         }
124         return err
125 }
126
127 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
128         data *models.XappSubscriptionData) error {
129         rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
130         err := validateXappSubscriptionData(data)
131         if err != nil {
132                 rtmgr.Logger.Error(err.Error())
133                 return err
134         }
135         subchan <- data
136         //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
137         rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
138         return nil
139 }
140
141 func subscriptionExists(data *models.XappSubscriptionData) bool {
142         present := false
143         sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
144         for _, elem := range rtmgr.Subs {
145                 if elem == sub {
146                         present = true
147                         break
148                 }
149         }
150         return present
151 }
152
153 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
154         data *models.XappSubscriptionData) error {
155         rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
156         err := validateXappSubscriptionData(data)
157         if err != nil {
158                 rtmgr.Logger.Error(err.Error())
159                 return err
160         }
161
162         if !subscriptionExists(data) {
163                 rtmgr.Logger.Warn("Subscription not found: %d", *data.SubscriptionID)
164                 err := fmt.Errorf("Subscription not found: %d", *data.SubscriptionID)
165                 return err
166         }
167
168         subdelchan <- data
169         return nil
170 }
171
172 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
173         subdelchan chan<- *models.XappSubscriptionData) {
174         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
175         if err != nil {
176                 //log.Fatalln(err)
177                 rtmgr.Logger.Error(err.Error())
178                 os.Exit(1)
179         }
180         nbiUrl, err := url.Parse(*nbiif)
181         if err != nil {
182                 rtmgr.Logger.Error(err.Error())
183                 os.Exit(1)
184         }
185         api := operations.NewRoutingManagerAPI(swaggerSpec)
186         server := restapi.NewServer(api)
187         defer server.Shutdown()
188
189         server.Port, err = strconv.Atoi(nbiUrl.Port())
190         if err != nil {
191                 rtmgr.Logger.Error("Invalid NBI RestAPI port")
192                 os.Exit(1)
193         }
194         server.Host = "0.0.0.0"
195         // set handlers
196         api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
197                 func(params handle.ProvideXappHandleParams) middleware.Responder {
198                         rtmgr.Logger.Info("Data received on Http interface")
199                         err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
200                         if err != nil {
201                                 rtmgr.Logger.Error("Invalid XApp callback data: " + err.Error())
202                                 return handle.NewProvideXappHandleBadRequest()
203                         } else {
204                                 return handle.NewGetHandlesOK()
205                         }
206                 })
207         api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
208                 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
209                         err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
210                         if err != nil {
211                                 return handle.NewProvideXappSubscriptionHandleBadRequest()
212                         } else {
213                                 return handle.NewGetHandlesOK()
214                         }
215                 })
216         api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
217                 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
218                         err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
219                         if err != nil {
220                                 return handle.NewDeleteXappSubscriptionHandleNoContent()
221                         } else {
222                                 return handle.NewGetHandlesOK()
223                         }
224                 })
225         // start to serve API
226         rtmgr.Logger.Info("Starting the HTTP Rest service")
227         if err := server.Serve(); err != nil {
228                 rtmgr.Logger.Error(err.Error())
229         }
230 }
231
232 func httpGetXapps(xmurl string) (*[]rtmgr.XApp, error) {
233         rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
234         r, err := myClient.Get(xmurl)
235         if err != nil {
236                 return nil, err
237         }
238         defer r.Body.Close()
239
240         if r.StatusCode == 200 {
241                 rtmgr.Logger.Debug("http client raw response: %v", r)
242                 var xapps []rtmgr.XApp
243                 err = json.NewDecoder(r.Body).Decode(&xapps)
244                 if err != nil {
245                         rtmgr.Logger.Warn("Json decode failed: " + err.Error())
246                 }
247                 rtmgr.Logger.Info("HTTP GET: OK")
248                 rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
249                 return &xapps, err
250         }
251         rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
252         return nil, nil
253 }
254
255 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.SdlEngine) error {
256         var readErr error
257         var maxRetries = 10
258         for i := 1; i <= maxRetries; i++ {
259                 time.Sleep(2 * time.Second)
260                 xappData, err := httpGetXapps(xmurl)
261                 if xappData != nil && err == nil {
262                         pcData, confErr := rtmgr.GetPlatformComponents(configfile)
263                         if confErr != nil {
264                                 rtmgr.Logger.Error(confErr.Error())
265                                 return confErr
266                         }
267                         rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
268                         // Combine the xapps data and platform data before writing to the SDL
269                         ricData := &rtmgr.RicComponents{Xapps: *xappData, Pcs: *pcData}
270                         writeErr := sdlEngine.WriteAll(fileName, ricData)
271                         if writeErr != nil {
272                                 rtmgr.Logger.Error(writeErr.Error())
273                         }
274                         // post subscription req to appmgr
275                         readErr = PostSubReq(xmurl, nbiif)
276                         if readErr == nil {
277                                 return nil
278                         }
279                 } else if err == nil {
280                         readErr = errors.New("Unexpected HTTP status code")
281                 } else {
282                         rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
283                         readErr = err
284                 }
285         }
286         return readErr
287 }
288
289 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
290         sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
291         err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
292         if err != nil {
293                 rtmgr.Logger.Error("Exiting as nbi failed to get the intial startup data from the xapp manager: " + err.Error())
294                 return err
295         }
296
297         datach := make(chan *models.XappCallbackData, 10)
298         subschan := make(chan *models.XappSubscriptionData, 10)
299         subdelchan := make(chan *models.XappSubscriptionData, 10)
300         rtmgr.Logger.Info("Launching Rest Http service")
301         go func() {
302                 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
303         }()
304
305         go func() {
306                 for {
307                         data, err := r.RecvXappCallbackData(datach)
308                         if err != nil {
309                                 rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
310                         } else if data != nil {
311                                 sdlEngine.WriteXapps(fileName, data)
312                                 triggerSBI <- true
313                         }
314                 }
315         }()
316
317         go func() {
318                 for {
319                         data := <-subschan
320                         rtmgr.Logger.Debug("received XApp subscription data")
321                         addSubscription(&rtmgr.Subs, data)
322                         triggerSBI <- true
323                 }
324         }()
325
326         go func() {
327                 for {
328                         data := <-subdelchan
329                         rtmgr.Logger.Debug("received XApp subscription delete data")
330                         delSubscription(&rtmgr.Subs, data)
331                         triggerSBI <- true
332                 }
333         }()
334
335         return nil
336 }
337
338 func (r *HttpRestful) Terminate() error {
339         return nil
340 }
341
342 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
343         var b bool = false
344         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
345         for _, elem := range *subs {
346                 if elem == sub {
347                         rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
348                         b = true
349                 }
350         }
351         if b == false {
352                 *subs = append(*subs, sub)
353         }
354         return b
355 }
356
357 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
358         rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
359         var present bool = false
360         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
361         for i, elem := range *subs {
362                 if elem == sub {
363                         present = true
364                         // Since the order of the list is not important, we are swapping the last element
365                         // with the matching element and replacing the list with list(n-1) elements.
366                         (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
367                         *subs = (*subs)[:len(*subs)-1]
368                         break
369                 }
370         }
371         if present == false {
372                 rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
373         }
374         return present
375 }