Adding scope of RICPlatform that are under Apache License
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18
19    This source code is part of the near-RT RIC (RAN Intelligent Controller)
20    platform project (RICP).
21
22 ==================================================================================
23 */
24 /*
25   Mnemonic:     httprestful.go
26   Abstract:     HTTP Restful API NBI implementation
27                 Based on Swagger generated code
28   Date:         25 March 2019
29 */
30
31 package nbi
32
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
34 import (
35         "encoding/json"
36         "errors"
37         "fmt"
38         "net/url"
39         "os"
40         "routing-manager/pkg/models"
41         "routing-manager/pkg/restapi"
42         "routing-manager/pkg/restapi/operations"
43         "routing-manager/pkg/restapi/operations/handle"
44         "routing-manager/pkg/rpe"
45         "routing-manager/pkg/rtmgr"
46         "routing-manager/pkg/sdl"
47         "strconv"
48         "time"
49
50         "github.com/go-openapi/loads"
51         "github.com/go-openapi/runtime/middleware"
52 )
53
54 //var myClient = &http.Client{Timeout: 1 * time.Second}
55
56 type HttpRestful struct {
57         Engine
58         LaunchRest                   LaunchRestHandler
59         RecvXappCallbackData         RecvXappCallbackDataHandler
60         ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
61         RetrieveStartupData          RetrieveStartupDataHandler
62 }
63
64 func NewHttpRestful() *HttpRestful {
65         instance := new(HttpRestful)
66         instance.LaunchRest = launchRest
67         instance.RecvXappCallbackData = recvXappCallbackData
68         instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
69         instance.RetrieveStartupData = retrieveStartupData
70         return instance
71 }
72
73 // ToDo: Use Range over channel. Read and return only the latest one.
74 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
75         var xappData *models.XappCallbackData
76         // Drain the channel as we are only looking for the latest value until
77         // xapp manager sends all xapp data with every request.
78         length := len(dataChannel)
79         //rtmgr.Logger.Info(length)
80         for i := 0; i <= length; i++ {
81                 rtmgr.Logger.Info("data received")
82                 // If no data received from the REST, it blocks.
83                 xappData = <-dataChannel
84         }
85         if nil != xappData {
86                 var xapps []rtmgr.XApp
87                 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
88                 return &xapps, err
89         } else {
90                 rtmgr.Logger.Info("No data")
91         }
92
93         rtmgr.Logger.Debug("Nothing received on the Http interface")
94         return nil, nil
95 }
96
97 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
98         if len(callbackData.XApps) == 0 {
99                 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
100         }
101         var xapps []rtmgr.XApp
102         err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
103         if err != nil {
104                 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
105         }
106         return nil
107 }
108
109 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
110         if data != nil {
111                 rtmgr.Logger.Debug("Received callback data")
112         }
113         err := validateXappCallbackData(data)
114         if err != nil {
115                 rtmgr.Logger.Warn("XApp callback data validation failed: " + err.Error())
116                 return err
117         } else {
118                 datach <- data
119                 return nil
120         }
121 }
122
123 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
124         var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
125         for _, ep := range rtmgr.Eps {
126                 if ep.Ip == *data.Address && ep.Port == *data.Port {
127                         err = nil
128                         break
129                 }
130         }
131         return err
132 }
133
134 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
135         data *models.XappSubscriptionData) error {
136         rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
137         err := validateXappSubscriptionData(data)
138         if err != nil {
139                 rtmgr.Logger.Error(err.Error())
140                 return err
141         }
142         subchan <- data
143         //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
144         rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
145         return nil
146 }
147
148 func subscriptionExists(data *models.XappSubscriptionData) bool {
149         present := false
150         sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
151         for _, elem := range rtmgr.Subs {
152                 if elem == sub {
153                         present = true
154                         break
155                 }
156         }
157         return present
158 }
159
160 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
161         data *models.XappSubscriptionData) error {
162         rtmgr.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
163         err := validateXappSubscriptionData(data)
164         if err != nil {
165                 rtmgr.Logger.Error(err.Error())
166                 return err
167         }
168
169         if !subscriptionExists(data) {
170                 rtmgr.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
171                 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
172                 return err
173         }
174
175         subdelchan <- data
176         return nil
177 }
178
179 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
180         subdelchan chan<- *models.XappSubscriptionData) {
181         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
182         if err != nil {
183                 //log.Fatalln(err)
184                 rtmgr.Logger.Error(err.Error())
185                 os.Exit(1)
186         }
187         nbiUrl, err := url.Parse(*nbiif)
188         if err != nil {
189                 rtmgr.Logger.Error(err.Error())
190                 os.Exit(1)
191         }
192         api := operations.NewRoutingManagerAPI(swaggerSpec)
193         server := restapi.NewServer(api)
194         defer server.Shutdown()
195
196         server.Port, err = strconv.Atoi(nbiUrl.Port())
197         if err != nil {
198                 rtmgr.Logger.Error("Invalid NBI RestAPI port")
199                 os.Exit(1)
200         }
201         server.Host = "0.0.0.0"
202         // set handlers
203         api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
204                 func(params handle.ProvideXappHandleParams) middleware.Responder {
205                         rtmgr.Logger.Info("Data received on Http interface")
206                         err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
207                         if err != nil {
208                                 rtmgr.Logger.Error("Invalid XApp callback data: " + err.Error())
209                                 return handle.NewProvideXappHandleBadRequest()
210                         } else {
211                                 return handle.NewGetHandlesOK()
212                         }
213                 })
214         api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
215                 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
216                         err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
217                         if err != nil {
218                                 return handle.NewProvideXappSubscriptionHandleBadRequest()
219                         } else {
220                                 return handle.NewGetHandlesOK()
221                         }
222                 })
223         api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
224                 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
225                         err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
226                         if err != nil {
227                                 return handle.NewDeleteXappSubscriptionHandleNoContent()
228                         } else {
229                                 return handle.NewGetHandlesOK()
230                         }
231                 })
232         // start to serve API
233         rtmgr.Logger.Info("Starting the HTTP Rest service")
234         if err := server.Serve(); err != nil {
235                 rtmgr.Logger.Error(err.Error())
236         }
237 }
238
239 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
240         rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
241         r, err := myClient.Get(xmurl)
242         if err != nil {
243                 return nil, err
244         }
245         defer r.Body.Close()
246
247         if r.StatusCode == 200 {
248                 rtmgr.Logger.Debug("http client raw response: %v", r)
249                 var xapps []rtmgr.XApp
250                 err = json.NewDecoder(r.Body).Decode(&xapps)
251                 if err != nil {
252                         rtmgr.Logger.Warn("Json decode failed: " + err.Error())
253                 }
254                 rtmgr.Logger.Info("HTTP GET: OK")
255                 rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
256                 return &xapps, err
257         }
258         rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
259         return nil, nil
260 }
261
262 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
263         var readErr error
264         var maxRetries = 10
265         for i := 1; i <= maxRetries; i++ {
266                 time.Sleep(2 * time.Second)
267                 xappData, err := httpGetXApps(xmurl)
268                 if xappData != nil && err == nil {
269                         pcData, confErr := rtmgr.GetPlatformComponents(configfile)
270                         if confErr != nil {
271                                 rtmgr.Logger.Error(confErr.Error())
272                                 return confErr
273                         }
274                         rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
275                         // Combine the xapps data and platform data before writing to the SDL
276                         ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData}
277                         writeErr := sdlEngine.WriteAll(fileName, ricData)
278                         if writeErr != nil {
279                                 rtmgr.Logger.Error(writeErr.Error())
280                         }
281                         // post subscription req to appmgr
282                         readErr = PostSubReq(xmurl, nbiif)
283                         if readErr == nil {
284                                 return nil
285                         }
286                 } else if err == nil {
287                         readErr = errors.New("unexpected HTTP status code")
288                 } else {
289                         rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
290                         readErr = err
291                 }
292         }
293         return readErr
294 }
295
296 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
297         sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
298         err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
299         if err != nil {
300                 rtmgr.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
301                 return err
302         }
303
304         datach := make(chan *models.XappCallbackData, 10)
305         subschan := make(chan *models.XappSubscriptionData, 10)
306         subdelchan := make(chan *models.XappSubscriptionData, 10)
307         rtmgr.Logger.Info("Launching Rest Http service")
308         go func() {
309                 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
310         }()
311
312         go func() {
313                 for {
314                         data, err := r.RecvXappCallbackData(datach)
315                         if err != nil {
316                                 rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
317                         } else if data != nil {
318                                 sdlEngine.WriteXApps(fileName, data)
319                                 triggerSBI <- true
320                         }
321                 }
322         }()
323
324         go func() {
325                 for {
326                         data := <-subschan
327                         rtmgr.Logger.Debug("received XApp subscription data")
328                         addSubscription(&rtmgr.Subs, data)
329                         triggerSBI <- true
330                 }
331         }()
332
333         go func() {
334                 for {
335                         data := <-subdelchan
336                         rtmgr.Logger.Debug("received XApp subscription delete data")
337                         delSubscription(&rtmgr.Subs, data)
338                         triggerSBI <- true
339                 }
340         }()
341
342         return nil
343 }
344
345 func (r *HttpRestful) Terminate() error {
346         return nil
347 }
348
349 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
350         var b = false
351         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
352         for _, elem := range *subs {
353                 if elem == sub {
354                         rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
355                         b = true
356                 }
357         }
358         if b == false {
359                 *subs = append(*subs, sub)
360         }
361         return b
362 }
363
364 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
365         rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
366         var present = false
367         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
368         for i, elem := range *subs {
369                 if elem == sub {
370                         present = true
371                         // Since the order of the list is not important, we are swapping the last element
372                         // with the matching element and replacing the list with list(n-1) elements.
373                         (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
374                         *subs = (*subs)[:len(*subs)-1]
375                         break
376                 }
377         }
378         if present == false {
379                 rtmgr.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
380         }
381         return present
382 }