Fixed integration and subscription routes related issues for R3
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18
19    This source code is part of the near-RT RIC (RAN Intelligent Controller)
20    platform project (RICP).
21
22 ==================================================================================
23 */
24 /*
25   Mnemonic:     httprestful.go
26   Abstract:     HTTP Restful API NBI implementation
27                 Based on Swagger generated code
28   Date:         25 March 2019
29 */
30
31 package nbi
32
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
34 import (
35         "encoding/json"
36         "errors"
37         "fmt"
38         "github.com/go-openapi/loads"
39         "github.com/go-openapi/runtime/middleware"
40         "net/url"
41         "os"
42         "routing-manager/pkg/models"
43         "routing-manager/pkg/restapi"
44         "routing-manager/pkg/restapi/operations"
45         "routing-manager/pkg/restapi/operations/handle"
46         "routing-manager/pkg/rpe"
47         "routing-manager/pkg/rtmgr"
48         "routing-manager/pkg/sdl"
49         "strconv"
50         "time"
51 )
52
53 //var myClient = &http.Client{Timeout: 1 * time.Second}
54
55 type HttpRestful struct {
56         Engine
57         LaunchRest                   LaunchRestHandler
58         RecvXappCallbackData         RecvXappCallbackDataHandler
59         ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
60         RetrieveStartupData          RetrieveStartupDataHandler
61 }
62
63 func NewHttpRestful() *HttpRestful {
64         instance := new(HttpRestful)
65         instance.LaunchRest = launchRest
66         instance.RecvXappCallbackData = recvXappCallbackData
67         instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
68         instance.RetrieveStartupData = retrieveStartupData
69         return instance
70 }
71
72 // ToDo: Use Range over channel. Read and return only the latest one.
73 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
74         var xappData *models.XappCallbackData
75         // Drain the channel as we are only looking for the latest value until
76         // xapp manager sends all xapp data with every request.
77         length := len(dataChannel)
78         //rtmgr.Logger.Info(length)
79         for i := 0; i <= length; i++ {
80                 rtmgr.Logger.Info("data received")
81                 // If no data received from the REST, it blocks.
82                 xappData = <-dataChannel
83         }
84         if nil != xappData {
85                 var xapps []rtmgr.XApp
86                 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
87                 return &xapps, err
88         } else {
89                 rtmgr.Logger.Info("No data")
90         }
91
92         rtmgr.Logger.Debug("Nothing received on the Http interface")
93         return nil, nil
94 }
95
96 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
97         if len(callbackData.XApps) == 0 {
98                 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
99         }
100         var xapps []rtmgr.XApp
101         err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
102         if err != nil {
103                 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
104         }
105         return nil
106 }
107
108 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
109         if data != nil {
110                 rtmgr.Logger.Debug("Received callback data")
111         }
112         err := validateXappCallbackData(data)
113         if err != nil {
114                 rtmgr.Logger.Warn("XApp callback data validation failed: " + err.Error())
115                 return err
116         } else {
117                 datach <- data
118                 return nil
119         }
120 }
121
122 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
123         var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
124         for _, ep := range rtmgr.Eps {
125                 if ep.Ip == *data.Address && ep.Port == *data.Port {
126                         err = nil
127                         break
128                 }
129         }
130         return err
131 }
132
133 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
134         data *models.XappSubscriptionData) error {
135         rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
136         err := validateXappSubscriptionData(data)
137         if err != nil {
138                 rtmgr.Logger.Error(err.Error())
139                 return err
140         }
141         subchan <- data
142         //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
143         rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
144         return nil
145 }
146
147 func subscriptionExists(data *models.XappSubscriptionData) bool {
148         present := false
149         sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
150         for _, elem := range rtmgr.Subs {
151                 if elem == sub {
152                         present = true
153                         break
154                 }
155         }
156         return present
157 }
158
159 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
160         data *models.XappSubscriptionData) error {
161         rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
162         err := validateXappSubscriptionData(data)
163         if err != nil {
164                 rtmgr.Logger.Error(err.Error())
165                 return err
166         }
167
168         if !subscriptionExists(data) {
169                 rtmgr.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
170                 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
171                 return err
172         }
173
174         subdelchan <- data
175         return nil
176 }
177
178 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
179         subdelchan chan<- *models.XappSubscriptionData) {
180         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
181         if err != nil {
182                 //log.Fatalln(err)
183                 rtmgr.Logger.Error(err.Error())
184                 os.Exit(1)
185         }
186         nbiUrl, err := url.Parse(*nbiif)
187         if err != nil {
188                 rtmgr.Logger.Error(err.Error())
189                 os.Exit(1)
190         }
191         api := operations.NewRoutingManagerAPI(swaggerSpec)
192         server := restapi.NewServer(api)
193         defer server.Shutdown()
194
195         server.Port, err = strconv.Atoi(nbiUrl.Port())
196         if err != nil {
197                 rtmgr.Logger.Error("Invalid NBI RestAPI port")
198                 os.Exit(1)
199         }
200         server.Host = "0.0.0.0"
201         // set handlers
202         api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
203                 func(params handle.ProvideXappHandleParams) middleware.Responder {
204                         rtmgr.Logger.Info("Data received on Http interface")
205                         err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
206                         if err != nil {
207                                 rtmgr.Logger.Error("Invalid XApp callback data: " + err.Error())
208                                 return handle.NewProvideXappHandleBadRequest()
209                         } else {
210                                 return handle.NewGetHandlesOK()
211                         }
212                 })
213         api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
214                 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
215                         err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
216                         if err != nil {
217                                 return handle.NewProvideXappSubscriptionHandleBadRequest()
218                         } else {
219                                 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
220                                 time.Sleep(1 * time.Second)
221                                 return handle.NewGetHandlesOK()
222                         }
223                 })
224         api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
225                 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
226                         err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
227                         if err != nil {
228                                 return handle.NewDeleteXappSubscriptionHandleNoContent()
229                         } else {
230                                 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
231                                 time.Sleep(1 * time.Second)
232                                 return handle.NewGetHandlesOK()
233                         }
234                 })
235         // start to serve API
236         rtmgr.Logger.Info("Starting the HTTP Rest service")
237         if err := server.Serve(); err != nil {
238                 rtmgr.Logger.Error(err.Error())
239         }
240 }
241
242 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
243         rtmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
244         r, err := myClient.Get(xmurl)
245         if err != nil {
246                 return nil, err
247         }
248         defer r.Body.Close()
249
250         if r.StatusCode == 200 {
251                 rtmgr.Logger.Debug("http client raw response: %v", r)
252                 var xapps []rtmgr.XApp
253                 err = json.NewDecoder(r.Body).Decode(&xapps)
254                 if err != nil {
255                         rtmgr.Logger.Warn("Json decode failed: " + err.Error())
256                 }
257                 rtmgr.Logger.Info("HTTP GET: OK")
258                 rtmgr.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
259                 return &xapps, err
260         }
261         rtmgr.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
262         return nil, nil
263 }
264
265 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
266         var readErr error
267         var maxRetries = 10
268         for i := 1; i <= maxRetries; i++ {
269                 time.Sleep(2 * time.Second)
270                 xappData, err := httpGetXApps(xmurl)
271                 if xappData != nil && err == nil {
272                         pcData, confErr := rtmgr.GetPlatformComponents(configfile)
273                         if confErr != nil {
274                                 rtmgr.Logger.Error(confErr.Error())
275                                 return confErr
276                         }
277                         rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
278                         // Combine the xapps data and platform data before writing to the SDL
279                         ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData}
280                         writeErr := sdlEngine.WriteAll(fileName, ricData)
281                         if writeErr != nil {
282                                 rtmgr.Logger.Error(writeErr.Error())
283                         }
284                         // post subscription req to appmgr
285                         readErr = PostSubReq(xmurl, nbiif)
286                         if readErr == nil {
287                                 return nil
288                         }
289                 } else if err == nil {
290                         readErr = errors.New("unexpected HTTP status code")
291                 } else {
292                         rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
293                         readErr = err
294                 }
295         }
296         return readErr
297 }
298
299 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
300         sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
301         err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
302         if err != nil {
303                 rtmgr.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
304                 return err
305         }
306
307         datach := make(chan *models.XappCallbackData, 10)
308         subschan := make(chan *models.XappSubscriptionData, 10)
309         subdelchan := make(chan *models.XappSubscriptionData, 10)
310         rtmgr.Logger.Info("Launching Rest Http service")
311         go func() {
312                 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
313         }()
314
315         go func() {
316                 for {
317                         data, err := r.RecvXappCallbackData(datach)
318                         if err != nil {
319                                 rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
320                         } else if data != nil {
321                                 rtmgr.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
322                                 alldata, err1 := httpGetXApps(xmurl)
323                                 if alldata != nil && err1 == nil {
324                                         sdlEngine.WriteXApps(fileName, alldata)
325                                         triggerSBI <- true
326                                 }
327                         }
328                 }
329         }()
330
331         go func() {
332                 for {
333                         data := <-subschan
334                         rtmgr.Logger.Debug("received XApp subscription data")
335                         addSubscription(&rtmgr.Subs, data)
336                         triggerSBI <- true
337                 }
338         }()
339
340         go func() {
341                 for {
342                         data := <-subdelchan
343                         rtmgr.Logger.Debug("received XApp subscription delete data")
344                         delSubscription(&rtmgr.Subs, data)
345                         triggerSBI <- true
346                 }
347         }()
348
349         return nil
350 }
351
352 func (r *HttpRestful) Terminate() error {
353         return nil
354 }
355
356 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
357         var b = false
358         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
359         for _, elem := range *subs {
360                 if elem == sub {
361                         rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
362                         b = true
363                 }
364         }
365         if b == false {
366                 *subs = append(*subs, sub)
367         }
368         return b
369 }
370
371 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
372         rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
373         var present = false
374         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
375         for i, elem := range *subs {
376                 if elem == sub {
377                         present = true
378                         // Since the order of the list is not important, we are swapping the last element
379                         // with the matching element and replacing the list with list(n-1) elements.
380                         (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
381                         *subs = (*subs)[:len(*subs)-1]
382                         break
383                 }
384         }
385         if present == false {
386                 rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
387         }
388         return present
389 }