Routing manager v0.3.3
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /*
20   Mnemonic:     httprestful.go
21   Abstract:     HTTP Restful API NBI implementation
22                 Based on Swagger generated code
23   Date:         25 March 2019
24 */
25
26 package nbi
27
28 import (
29         "fmt"
30         "os"
31         "time"
32         "net/url"
33         "strconv"
34         "errors"
35         "encoding/json"
36         "routing-manager/pkg/rtmgr"
37         "routing-manager/pkg/rpe"
38         "routing-manager/pkg/sdl"
39         "routing-manager/pkg/models"
40         "routing-manager/pkg/restapi"
41         "routing-manager/pkg/restapi/operations"
42         "github.com/go-openapi/runtime/middleware"
43         "routing-manager/pkg/restapi/operations/handle"
44         loads "github.com/go-openapi/loads"
45 )
46
47 //var myClient = &http.Client{Timeout: 1 * time.Second}
48
49 type HttpRestful struct {
50         NbiEngine
51         LaunchRest                   LaunchRestHandler
52         RecvXappCallbackData         RecvXappCallbackDataHandler
53         ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
54         RetrieveStartupData          RetrieveStartupDataHandler
55 }
56
57 func NewHttpRestful() *HttpRestful {
58         instance := new(HttpRestful)
59         instance.LaunchRest = launchRest
60         instance.RecvXappCallbackData = recvXappCallbackData
61         instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
62         instance.RetrieveStartupData = retrieveStartupData
63         return instance
64 }
65
66 // ToDo: Use Range over channel. Read and return only the latest one.
67 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
68         var xappData *models.XappCallbackData
69         // Drain the channel as we are only looking for the latest value until
70         // xapp manager sends all xapp data with every request.
71         length := len(dataChannel)
72         //rtmgr.Logger.Info(length)
73         for i := 0; i <= length; i++ {
74                 rtmgr.Logger.Info("data received")
75                 // If no data received from the REST, it blocks.
76                 xappData = <-dataChannel
77         }
78         if nil != xappData {
79                 var xapps []rtmgr.XApp
80                 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
81                 return &xapps, err
82         } else {
83                 rtmgr.Logger.Info("No data")
84         }
85
86         rtmgr.Logger.Debug("Nothing received on the Http interface")
87         return nil, nil
88
89 }
90
91 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
92         if len(callbackData.XApps) == 0 {
93                 return fmt.Errorf("Invalid Data field: \"%s\"", callbackData.XApps)
94         }
95         var xapps []rtmgr.XApp
96         err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
97         if err != nil {
98                 return fmt.Errorf("Unmarshal failed: \"%s\"", err.Error())
99         }
100         return nil
101 }
102
103 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
104         if data != nil {
105                 rtmgr.Logger.Debug("Received callback data")
106         }
107         err := validateXappCallbackData(data)
108         if err != nil {
109                 rtmgr.Logger.Warn("XApp callback data validation failed: "+err.Error())
110                 return err
111         } else {
112                 datach<-data
113                 return nil
114         }
115 }
116
117 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
118         var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
119         for _, ep := range rtmgr.Eps {
120                 if ep.Ip == *data.Address && ep.Port == *data.Port {
121                         err = nil
122                         break
123                 }
124         }
125         return err
126 }
127
128 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
129                                         data *models.XappSubscriptionData) error {
130         rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
131         err := validateXappSubscriptionData(data)
132         if err != nil {
133                 rtmgr.Logger.Error(err.Error())
134                 return err
135         }
136         subchan <- data
137         //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
138         rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
139         return nil
140 }
141
142 func subscriptionExists(data *models.XappSubscriptionData) bool {
143         present := false
144         sub := rtmgr.Subscription{SubID:*data.SubscriptionID, Fqdn:*data.Address, Port:*data.Port,}
145         for _, elem := range rtmgr.Subs {
146                 if elem == sub {
147                         present = true
148                         break
149                 }
150         }
151         return present
152 }
153
154 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
155                                         data *models.XappSubscriptionData) error {
156         rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
157         err := validateXappSubscriptionData(data)
158         if err != nil {
159                 rtmgr.Logger.Error(err.Error())
160                 return err
161         }
162
163         if !subscriptionExists(data) {
164                 rtmgr.Logger.Warn("Subscription not found: %d", *data.SubscriptionID)
165                 err := fmt.Errorf("Subscription not found: %d", *data.SubscriptionID)
166                 return err
167         }
168
169         subdelchan <- data
170         return nil
171 }
172
173 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
174                                                                 subdelchan chan<- *models.XappSubscriptionData) {
175         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
176         if err != nil {
177                 //log.Fatalln(err)
178                 rtmgr.Logger.Error(err.Error())
179                 os.Exit(1)
180         }
181         nbiUrl, err := url.Parse(*nbiif)
182         if err != nil {
183                 rtmgr.Logger.Error(err.Error())
184                 os.Exit(1)
185         }
186         api := operations.NewRoutingManagerAPI(swaggerSpec)
187         server := restapi.NewServer(api)
188         defer server.Shutdown()
189
190         server.Port, err = strconv.Atoi(nbiUrl.Port())
191         if err != nil {
192                 rtmgr.Logger.Error("Invalid NBI RestAPI port")
193                 os.Exit(1)
194         }
195         server.Host = "0.0.0.0"
196         // set handlers
197         api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
198                 func(params handle.ProvideXappHandleParams) middleware.Responder {
199                 rtmgr.Logger.Info("Data received on Http interface")
200                 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
201                 if err != nil {
202                         rtmgr.Logger.Error("Invalid XApp callback data: "+err.Error())
203                         return handle.NewProvideXappHandleBadRequest()
204                 } else {
205                         return handle.NewGetHandlesOK()
206                 }
207         })
208         api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
209                 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
210                         err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
211                         if err != nil {
212                                 return handle.NewProvideXappSubscriptionHandleBadRequest()
213                         } else {
214                                 return handle.NewGetHandlesOK()
215                         }
216                 })
217         api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
218                 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
219                         err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
220                         if err != nil {
221                                 return handle.NewDeleteXappSubscriptionHandleNoContent()
222                         } else {
223                                 return handle.NewGetHandlesOK()
224                         }
225                 })
226         // start to serve API
227         rtmgr.Logger.Info("Starting the HTTP Rest service")
228         if err := server.Serve(); err != nil {
229                 rtmgr.Logger.Error(err.Error())
230         }
231 }
232
233 func httpGetXapps(xmurl string) (*[]rtmgr.XApp, error) {
234         rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
235         r, err := myClient.Get(xmurl)
236         if err != nil {
237                 return nil, err
238         }
239         defer r.Body.Close()
240
241         if r.StatusCode == 200 {
242                 rtmgr.Logger.Debug("http client raw response: %v", r)
243                 var xapps []rtmgr.XApp
244                 err = json.NewDecoder(r.Body).Decode(&xapps)
245                 if err != nil {
246                         rtmgr.Logger.Warn("Json decode failed: " + err.Error())
247                 }
248                 rtmgr.Logger.Info("HTTP GET: OK")
249                 rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
250                 return &xapps, err
251         }
252         rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
253         return nil, nil
254 }
255
256 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.SdlEngine) error {
257         var readErr error
258         var maxRetries = 10
259
260                 for i := 1; i <= maxRetries; i++ {
261                         time.Sleep(2 * time.Second)
262
263                         xappData, err := httpGetXapps(xmurl)
264
265                         if xappData != nil && err == nil {
266                                 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
267                                 if confErr != nil {
268                                         rtmgr.Logger.Error(confErr.Error())
269                                         return confErr
270                                 }
271
272                                 rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
273                                 // Combine the xapps data and platform data before writing to the SDL
274                                 ricData := &rtmgr.RicComponents{Xapps: *xappData, Pcs: *pcData}
275
276                                 writeErr := sdlEngine.WriteAll(fileName, ricData)
277                                 if writeErr != nil {
278                                         rtmgr.Logger.Error(writeErr.Error())
279                                 }
280                                 // post subscription req to appmgr
281                                 readErr = PostSubReq(xmurl, nbiif)
282                                 if readErr == nil {
283                                         return nil
284                                 }
285                         } else if err == nil {
286                                 readErr = errors.New("Unexpected HTTP status code")
287                         } else {
288                                 rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
289                                 readErr = err
290                         }
291                 }
292         return readErr
293 }
294
295 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
296                                  sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
297         err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
298         if err != nil {
299                 rtmgr.Logger.Error("Exiting as nbi failed to get the intial startup data from the xapp manager: " + err.Error())
300                 return err
301         }
302
303         datach := make(chan *models.XappCallbackData, 10)
304         subschan := make(chan *models.XappSubscriptionData, 10)
305         subdelchan := make(chan *models.XappSubscriptionData, 10)
306         rtmgr.Logger.Info("Launching Rest Http service")
307         go func() {
308                 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
309         }()
310
311         go func() {
312                 for {
313                         data, err := r.RecvXappCallbackData(datach)
314                         if err != nil {
315                                 rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
316                         } else if data != nil {
317                                 sdlEngine.WriteXapps(fileName, data)
318                                 triggerSBI <- true
319                         }
320                 }
321         }()
322
323         go func() {
324                 for {
325                         data := <-subschan
326                         rtmgr.Logger.Debug("received XApp subscription data")
327                         addSubscription(&rtmgr.Subs, data)
328                         triggerSBI <- true
329                 }
330         }()
331
332         go func() {
333                 for {
334                         data := <-subdelchan
335                         rtmgr.Logger.Debug("received XApp subscription delete data")
336                         delSubscription(&rtmgr.Subs, data)
337                         triggerSBI <- true
338                 }
339         }()
340
341         return nil
342 }
343
344 func (r *HttpRestful) Terminate() error {
345         return nil
346 }
347
348 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
349         var b bool = false
350         sub := rtmgr.Subscription{SubID:*xappSubData.SubscriptionID, Fqdn:*xappSubData.Address, Port:*xappSubData.Port,}
351         for _, elem := range *subs {
352                 if elem == sub {
353                         rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
354                         b = true
355                 }
356         }
357         if b == false {
358                 *subs = append(*subs, sub)
359         }
360         return b
361 }
362
363 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
364         rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
365         var present bool = false
366         sub := rtmgr.Subscription{SubID:*xappSubData.SubscriptionID, Fqdn:*xappSubData.Address, Port:*xappSubData.Port,}
367         for i, elem := range *subs {
368                 if elem == sub {
369                         present = true
370                         // Since the order of the list is not important, we are swapping the last element
371                         // with the matching element and replacing the list with list(n-1) elements.
372                         (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
373                         *subs = (*subs)[:len(*subs)-1]
374                         break
375                 }
376         }
377         if present == false {
378                 rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
379         }
380         return present
381 }