a9aeaa2b216b8645824878ed4891e3ddfbc2aee4
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /*
20   Mnemonic:     httprestful.go
21   Abstract:     HTTP Restful API NBI implementation
22                 Based on Swagger generated code
23   Date:         25 March 2019
24 */
25
26 package nbi
27
28 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
29 import (
30         "encoding/json"
31         "errors"
32         "fmt"
33         "github.com/go-openapi/loads"
34         "github.com/go-openapi/runtime/middleware"
35         "net/url"
36         "os"
37         "routing-manager/pkg/models"
38         "routing-manager/pkg/restapi"
39         "routing-manager/pkg/restapi/operations"
40         "routing-manager/pkg/restapi/operations/handle"
41         "routing-manager/pkg/rpe"
42         "routing-manager/pkg/rtmgr"
43         "routing-manager/pkg/sdl"
44         "strconv"
45         "time"
46 )
47
48 //var myClient = &http.Client{Timeout: 1 * time.Second}
49
50 type HttpRestful struct {
51         Engine
52         LaunchRest                   LaunchRestHandler
53         RecvXappCallbackData         RecvXappCallbackDataHandler
54         ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
55         RetrieveStartupData          RetrieveStartupDataHandler
56 }
57
58 func NewHttpRestful() *HttpRestful {
59         instance := new(HttpRestful)
60         instance.LaunchRest = launchRest
61         instance.RecvXappCallbackData = recvXappCallbackData
62         instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
63         instance.RetrieveStartupData = retrieveStartupData
64         return instance
65 }
66
67 // ToDo: Use Range over channel. Read and return only the latest one.
68 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
69         var xappData *models.XappCallbackData
70         // Drain the channel as we are only looking for the latest value until
71         // xapp manager sends all xapp data with every request.
72         length := len(dataChannel)
73         //rtmgr.Logger.Info(length)
74         for i := 0; i <= length; i++ {
75                 rtmgr.Logger.Info("data received")
76                 // If no data received from the REST, it blocks.
77                 xappData = <-dataChannel
78         }
79         if nil != xappData {
80                 var xapps []rtmgr.XApp
81                 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
82                 return &xapps, err
83         } else {
84                 rtmgr.Logger.Info("No data")
85         }
86
87         rtmgr.Logger.Debug("Nothing received on the Http interface")
88         return nil, nil
89 }
90
91 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
92         if len(callbackData.XApps) == 0 {
93                 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
94         }
95         var xapps []rtmgr.XApp
96         err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
97         if err != nil {
98                 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
99         }
100         return nil
101 }
102
103 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
104         if data != nil {
105                 rtmgr.Logger.Debug("Received callback data")
106         }
107         err := validateXappCallbackData(data)
108         if err != nil {
109                 rtmgr.Logger.Warn("XApp callback data validation failed: " + err.Error())
110                 return err
111         } else {
112                 datach <- data
113                 return nil
114         }
115 }
116
117 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
118         var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
119         for _, ep := range rtmgr.Eps {
120                 if ep.Ip == *data.Address && ep.Port == *data.Port {
121                         err = nil
122                         break
123                 }
124         }
125         return err
126 }
127
128 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
129         data *models.XappSubscriptionData) error {
130         rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
131         err := validateXappSubscriptionData(data)
132         if err != nil {
133                 rtmgr.Logger.Error(err.Error())
134                 return err
135         }
136         subchan <- data
137         //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
138         rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
139         return nil
140 }
141
142 func subscriptionExists(data *models.XappSubscriptionData) bool {
143         present := false
144         sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
145         for _, elem := range rtmgr.Subs {
146                 if elem == sub {
147                         present = true
148                         break
149                 }
150         }
151         return present
152 }
153
154 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
155         data *models.XappSubscriptionData) error {
156         rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
157         err := validateXappSubscriptionData(data)
158         if err != nil {
159                 rtmgr.Logger.Error(err.Error())
160                 return err
161         }
162
163         if !subscriptionExists(data) {
164                 rtmgr.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
165                 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
166                 return err
167         }
168
169         subdelchan <- data
170         return nil
171 }
172
173 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
174         subdelchan chan<- *models.XappSubscriptionData) {
175         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
176         if err != nil {
177                 //log.Fatalln(err)
178                 rtmgr.Logger.Error(err.Error())
179                 os.Exit(1)
180         }
181         nbiUrl, err := url.Parse(*nbiif)
182         if err != nil {
183                 rtmgr.Logger.Error(err.Error())
184                 os.Exit(1)
185         }
186         api := operations.NewRoutingManagerAPI(swaggerSpec)
187         server := restapi.NewServer(api)
188         defer server.Shutdown()
189
190         server.Port, err = strconv.Atoi(nbiUrl.Port())
191         if err != nil {
192                 rtmgr.Logger.Error("Invalid NBI RestAPI port")
193                 os.Exit(1)
194         }
195         server.Host = "0.0.0.0"
196         // set handlers
197         api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
198                 func(params handle.ProvideXappHandleParams) middleware.Responder {
199                         rtmgr.Logger.Info("Data received on Http interface")
200                         err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
201                         if err != nil {
202                                 rtmgr.Logger.Error("Invalid XApp callback data: " + err.Error())
203                                 return handle.NewProvideXappHandleBadRequest()
204                         } else {
205                                 return handle.NewGetHandlesOK()
206                         }
207                 })
208         api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
209                 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
210                         err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
211                         if err != nil {
212                                 return handle.NewProvideXappSubscriptionHandleBadRequest()
213                         } else {
214                                 return handle.NewGetHandlesOK()
215                         }
216                 })
217         api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
218                 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
219                         err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
220                         if err != nil {
221                                 return handle.NewDeleteXappSubscriptionHandleNoContent()
222                         } else {
223                                 return handle.NewGetHandlesOK()
224                         }
225                 })
226         // start to serve API
227         rtmgr.Logger.Info("Starting the HTTP Rest service")
228         if err := server.Serve(); err != nil {
229                 rtmgr.Logger.Error(err.Error())
230         }
231 }
232
233 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
234         rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
235         r, err := myClient.Get(xmurl)
236         if err != nil {
237                 return nil, err
238         }
239         defer r.Body.Close()
240
241         if r.StatusCode == 200 {
242                 rtmgr.Logger.Debug("http client raw response: %v", r)
243                 var xapps []rtmgr.XApp
244                 err = json.NewDecoder(r.Body).Decode(&xapps)
245                 if err != nil {
246                         rtmgr.Logger.Warn("Json decode failed: " + err.Error())
247                 }
248                 rtmgr.Logger.Info("HTTP GET: OK")
249                 rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
250                 return &xapps, err
251         }
252         rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
253         return nil, nil
254 }
255
256 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
257         var readErr error
258         var maxRetries = 10
259         for i := 1; i <= maxRetries; i++ {
260                 time.Sleep(2 * time.Second)
261                 xappData, err := httpGetXApps(xmurl)
262                 if xappData != nil && err == nil {
263                         pcData, confErr := rtmgr.GetPlatformComponents(configfile)
264                         if confErr != nil {
265                                 rtmgr.Logger.Error(confErr.Error())
266                                 return confErr
267                         }
268                         rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
269                         // Combine the xapps data and platform data before writing to the SDL
270                         ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData}
271                         writeErr := sdlEngine.WriteAll(fileName, ricData)
272                         if writeErr != nil {
273                                 rtmgr.Logger.Error(writeErr.Error())
274                         }
275                         // post subscription req to appmgr
276                         readErr = PostSubReq(xmurl, nbiif)
277                         if readErr == nil {
278                                 return nil
279                         }
280                 } else if err == nil {
281                         readErr = errors.New("unexpected HTTP status code")
282                 } else {
283                         rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
284                         readErr = err
285                 }
286         }
287         return readErr
288 }
289
290 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
291         sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
292         err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
293         if err != nil {
294                 rtmgr.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
295                 return err
296         }
297
298         datach := make(chan *models.XappCallbackData, 10)
299         subschan := make(chan *models.XappSubscriptionData, 10)
300         subdelchan := make(chan *models.XappSubscriptionData, 10)
301         rtmgr.Logger.Info("Launching Rest Http service")
302         go func() {
303                 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
304         }()
305
306         go func() {
307                 for {
308                         data, err := r.RecvXappCallbackData(datach)
309                         if err != nil {
310                                 rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
311                         } else if data != nil {
312                                 sdlEngine.WriteXApps(fileName, data)
313                                 triggerSBI <- true
314                         }
315                 }
316         }()
317
318         go func() {
319                 for {
320                         data := <-subschan
321                         rtmgr.Logger.Debug("received XApp subscription data")
322                         addSubscription(&rtmgr.Subs, data)
323                         triggerSBI <- true
324                 }
325         }()
326
327         go func() {
328                 for {
329                         data := <-subdelchan
330                         rtmgr.Logger.Debug("received XApp subscription delete data")
331                         delSubscription(&rtmgr.Subs, data)
332                         triggerSBI <- true
333                 }
334         }()
335
336         return nil
337 }
338
339 func (r *HttpRestful) Terminate() error {
340         return nil
341 }
342
343 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
344         var b = false
345         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
346         for _, elem := range *subs {
347                 if elem == sub {
348                         rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
349                         b = true
350                 }
351         }
352         if b == false {
353                 *subs = append(*subs, sub)
354         }
355         return b
356 }
357
358 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
359         rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
360         var present = false
361         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
362         for i, elem := range *subs {
363                 if elem == sub {
364                         present = true
365                         // Since the order of the list is not important, we are swapping the last element
366                         // with the matching element and replacing the list with list(n-1) elements.
367                         (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
368                         *subs = (*subs)[:len(*subs)-1]
369                         break
370                 }
371         }
372         if present == false {
373                 rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
374         }
375         return present
376 }