Added config and logger module from xapp-fwk. Added Routes related to A1Mediator...
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18
19    This source code is part of the near-RT RIC (RAN Intelligent Controller)
20    platform project (RICP).
21
22 ==================================================================================
23 */
24 /*
25   Mnemonic:     httprestful.go
26   Abstract:     HTTP Restful API NBI implementation
27                 Based on Swagger generated code
28   Date:         25 March 2019
29 */
30
31 package nbi
32
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
34 import (
35         "encoding/json"
36         "errors"
37         "fmt"
38         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39         "github.com/go-openapi/loads"
40         "github.com/go-openapi/runtime/middleware"
41         "net/url"
42         "os"
43         "routing-manager/pkg/models"
44         "routing-manager/pkg/restapi"
45         "routing-manager/pkg/restapi/operations"
46         "routing-manager/pkg/restapi/operations/handle"
47         "routing-manager/pkg/rpe"
48         "routing-manager/pkg/rtmgr"
49         "routing-manager/pkg/sdl"
50         "strconv"
51         "time"
52 )
53
54 //var myClient = &http.Client{Timeout: 1 * time.Second}
55
56 type HttpRestful struct {
57         Engine
58         LaunchRest                   LaunchRestHandler
59         RecvXappCallbackData         RecvXappCallbackDataHandler
60         ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
61         RetrieveStartupData          RetrieveStartupDataHandler
62 }
63
64 func NewHttpRestful() *HttpRestful {
65         instance := new(HttpRestful)
66         instance.LaunchRest = launchRest
67         instance.RecvXappCallbackData = recvXappCallbackData
68         instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
69         instance.RetrieveStartupData = retrieveStartupData
70         return instance
71 }
72
73 // ToDo: Use Range over channel. Read and return only the latest one.
74 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
75         var xappData *models.XappCallbackData
76         // Drain the channel as we are only looking for the latest value until
77         // xapp manager sends all xapp data with every request.
78         length := len(dataChannel)
79         //xapp.Logger.Info(length)
80         for i := 0; i <= length; i++ {
81                 xapp.Logger.Info("data received")
82                 // If no data received from the REST, it blocks.
83                 xappData = <-dataChannel
84         }
85         if nil != xappData {
86                 var xapps []rtmgr.XApp
87                 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
88                 return &xapps, err
89         } else {
90                 xapp.Logger.Info("No data")
91         }
92
93         xapp.Logger.Debug("Nothing received on the Http interface")
94         return nil, nil
95 }
96
97 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
98         if len(callbackData.XApps) == 0 {
99                 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
100         }
101         var xapps []rtmgr.XApp
102         err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
103         if err != nil {
104                 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
105         }
106         return nil
107 }
108
109 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
110         if data != nil {
111                 xapp.Logger.Debug("Received callback data")
112         }
113         err := validateXappCallbackData(data)
114         if err != nil {
115                 xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
116                 return err
117         } else {
118                 datach <- data
119                 return nil
120         }
121 }
122
123 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
124         var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
125         for _, ep := range rtmgr.Eps {
126                 if ep.Ip == *data.Address && ep.Port == *data.Port {
127                         err = nil
128                         break
129                 }
130         }
131         return err
132 }
133
134 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
135         data *models.XappSubscriptionData) error {
136         xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
137         err := validateXappSubscriptionData(data)
138         if err != nil {
139                 xapp.Logger.Error(err.Error())
140                 return err
141         }
142         subchan <- data
143         //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
144         xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
145         return nil
146 }
147
148 func subscriptionExists(data *models.XappSubscriptionData) bool {
149         present := false
150         sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
151         for _, elem := range rtmgr.Subs {
152                 if elem == sub {
153                         present = true
154                         break
155                 }
156         }
157         return present
158 }
159
160 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
161         data *models.XappSubscriptionData) error {
162         xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
163         err := validateXappSubscriptionData(data)
164         if err != nil {
165                 xapp.Logger.Error(err.Error())
166                 return err
167         }
168
169         if !subscriptionExists(data) {
170                 xapp.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
171                 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
172                 return err
173         }
174
175         subdelchan <- data
176         return nil
177 }
178
179 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
180         subdelchan chan<- *models.XappSubscriptionData) {
181         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
182         if err != nil {
183                 //log.Fatalln(err)
184                 xapp.Logger.Error(err.Error())
185                 os.Exit(1)
186         }
187         nbiUrl, err := url.Parse(*nbiif)
188         if err != nil {
189                 xapp.Logger.Error(err.Error())
190                 os.Exit(1)
191         }
192         api := operations.NewRoutingManagerAPI(swaggerSpec)
193         server := restapi.NewServer(api)
194         defer server.Shutdown()
195
196         server.Port, err = strconv.Atoi(nbiUrl.Port())
197         if err != nil {
198                 xapp.Logger.Error("Invalid NBI RestAPI port")
199                 os.Exit(1)
200         }
201         server.Host = "0.0.0.0"
202         // set handlers
203         api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
204                 func(params handle.ProvideXappHandleParams) middleware.Responder {
205                         xapp.Logger.Info("Data received on Http interface")
206                         err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
207                         if err != nil {
208                                 xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
209                                 return handle.NewProvideXappHandleBadRequest()
210                         } else {
211                                 return handle.NewGetHandlesOK()
212                         }
213                 })
214         api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
215                 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
216                         err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
217                         if err != nil {
218                                 return handle.NewProvideXappSubscriptionHandleBadRequest()
219                         } else {
220                                 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
221                                 time.Sleep(1 * time.Second)
222                                 return handle.NewGetHandlesOK()
223                         }
224                 })
225         api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
226                 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
227                         err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
228                         if err != nil {
229                                 return handle.NewDeleteXappSubscriptionHandleNoContent()
230                         } else {
231                                 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
232                                 time.Sleep(1 * time.Second)
233                                 return handle.NewGetHandlesOK()
234                         }
235                 })
236         // start to serve API
237         xapp.Logger.Info("Starting the HTTP Rest service")
238         if err := server.Serve(); err != nil {
239                 xapp.Logger.Error(err.Error())
240         }
241 }
242
243 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
244         xapp.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
245         r, err := myClient.Get(xmurl)
246         if err != nil {
247                 return nil, err
248         }
249         defer r.Body.Close()
250
251         if r.StatusCode == 200 {
252                 xapp.Logger.Debug("http client raw response: %v", r)
253                 var xapps []rtmgr.XApp
254                 err = json.NewDecoder(r.Body).Decode(&xapps)
255                 if err != nil {
256                         xapp.Logger.Warn("Json decode failed: " + err.Error())
257                 }
258                 xapp.Logger.Info("HTTP GET: OK")
259                 xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
260                 return &xapps, err
261         }
262         xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
263         return nil, nil
264 }
265
266 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
267         var readErr error
268         var maxRetries = 10
269         for i := 1; i <= maxRetries; i++ {
270                 time.Sleep(2 * time.Second)
271                 xappData, err := httpGetXApps(xmurl)
272                 if xappData != nil && err == nil {
273                         pcData, confErr := rtmgr.GetPlatformComponents(configfile)
274                         if confErr != nil {
275                                 xapp.Logger.Error(confErr.Error())
276                                 return confErr
277                         }
278                         xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
279                         // Combine the xapps data and platform data before writing to the SDL
280                         ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData}
281                         writeErr := sdlEngine.WriteAll(fileName, ricData)
282                         if writeErr != nil {
283                                 xapp.Logger.Error(writeErr.Error())
284                         }
285                         // post subscription req to appmgr
286                         readErr = PostSubReq(xmurl, nbiif)
287                         if readErr == nil {
288                                 return nil
289                         }
290                 } else if err == nil {
291                         readErr = errors.New("unexpected HTTP status code")
292                 } else {
293                         xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
294                         readErr = err
295                 }
296         }
297         return readErr
298 }
299
300 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
301         sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
302         err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
303         if err != nil {
304                 xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
305                 return err
306         }
307
308         datach := make(chan *models.XappCallbackData, 10)
309         subschan := make(chan *models.XappSubscriptionData, 10)
310         subdelchan := make(chan *models.XappSubscriptionData, 10)
311         xapp.Logger.Info("Launching Rest Http service")
312         go func() {
313                 r.LaunchRest(&nbiif, datach, subschan, subdelchan)
314         }()
315
316         go func() {
317                 for {
318                         data, err := r.RecvXappCallbackData(datach)
319                         if err != nil {
320                                 xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
321                         } else if data != nil {
322                                 xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
323                                 alldata, err1 := httpGetXApps(xmurl)
324                                 if alldata != nil && err1 == nil {
325                                         sdlEngine.WriteXApps(fileName, alldata)
326                                         triggerSBI <- true
327                                 }
328                         }
329                 }
330         }()
331
332         go func() {
333                 for {
334                         data := <-subschan
335                         xapp.Logger.Debug("received XApp subscription data")
336                         addSubscription(&rtmgr.Subs, data)
337                         triggerSBI <- true
338                 }
339         }()
340
341         go func() {
342                 for {
343                         data := <-subdelchan
344                         xapp.Logger.Debug("received XApp subscription delete data")
345                         delSubscription(&rtmgr.Subs, data)
346                         triggerSBI <- true
347                 }
348         }()
349
350         return nil
351 }
352
353 func (r *HttpRestful) Terminate() error {
354         return nil
355 }
356
357 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
358         var b = false
359         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
360         for _, elem := range *subs {
361                 if elem == sub {
362                         xapp.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
363                         b = true
364                 }
365         }
366         if b == false {
367                 *subs = append(*subs, sub)
368         }
369         return b
370 }
371
372 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
373         xapp.Logger.Debug("Deleteing the subscription from the subscriptions list")
374         var present = false
375         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
376         for i, elem := range *subs {
377                 if elem == sub {
378                         present = true
379                         // Since the order of the list is not important, we are swapping the last element
380                         // with the matching element and replacing the list with list(n-1) elements.
381                         (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
382                         *subs = (*subs)[:len(*subs)-1]
383                         break
384                 }
385         }
386         if present == false {
387                 xapp.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
388         }
389         return present
390 }