5e90bfac41d1742674b73f2b95314bc8f6122813
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18
19    This source code is part of the near-RT RIC (RAN Intelligent Controller)
20    platform project (RICP).
21
22 ==================================================================================
23 */
24 /*
25   Mnemonic:     httprestful.go
26   Abstract:     HTTP Restful API NBI implementation
27                 Based on Swagger generated code
28   Date:         25 March 2019
29 */
30
31 package nbi
32
33 //noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
34 import (
35         "encoding/json"
36         "errors"
37         "fmt"
38         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39         "github.com/go-openapi/loads"
40         "github.com/go-openapi/runtime/middleware"
41         "net/url"
42         "os"
43         "routing-manager/pkg/models"
44         "routing-manager/pkg/restapi"
45         "routing-manager/pkg/restapi/operations"
46         "routing-manager/pkg/restapi/operations/handle"
47         "routing-manager/pkg/rpe"
48         "routing-manager/pkg/rtmgr"
49         "routing-manager/pkg/sdl"
50         "strconv"
51         "time"
52 )
53
54 //var myClient = &http.Client{Timeout: 1 * time.Second}
55
56 type HttpRestful struct {
57         Engine
58         LaunchRest                   LaunchRestHandler
59         RecvXappCallbackData         RecvXappCallbackDataHandler
60         RecvNewE2Tdata               RecvNewE2TdataHandler
61
62         ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
63         RetrieveStartupData          RetrieveStartupDataHandler
64 }
65
66 func NewHttpRestful() *HttpRestful {
67         instance := new(HttpRestful)
68         instance.LaunchRest = launchRest
69         instance.RecvXappCallbackData = recvXappCallbackData
70         instance.RecvNewE2Tdata = recvNewE2Tdata
71         instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
72         instance.RetrieveStartupData = retrieveStartupData
73         return instance
74 }
75
76 // ToDo: Use Range over channel. Read and return only the latest one.
77 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
78         var xappData *models.XappCallbackData
79         // Drain the channel as we are only looking for the latest value until
80         // xapp manager sends all xapp data with every request.
81         length := len(dataChannel)
82         //xapp.Logger.Info(length)
83         for i := 0; i <= length; i++ {
84                 xapp.Logger.Info("data received")
85                 // If no data received from the REST, it blocks.
86                 xappData = <-dataChannel
87         }
88         if nil != xappData {
89                 var xapps []rtmgr.XApp
90                 err := json.Unmarshal([]byte(xappData.XApps), &xapps)
91                 return &xapps, err
92         } else {
93                 xapp.Logger.Info("No data")
94         }
95
96         xapp.Logger.Debug("Nothing received on the Http interface")
97         return nil, nil
98 }
99
100 func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, error) {
101         var e2tData *models.E2tData
102         xapp.Logger.Info("data received")
103
104         e2tData = <-dataChannel
105
106         if nil != e2tData {
107                 var e2tinst rtmgr.E2TInstance
108                 e2tinst.Fqdn = *e2tData.E2TAddress
109                 e2tinst.Name = "E2TERMINST"
110                 return &e2tinst,nil
111         } else {
112                 xapp.Logger.Info("No data")
113         }
114
115         xapp.Logger.Debug("Nothing received on the Http interface")
116         return nil, nil
117 }
118
119
120
121
122 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
123         if len(callbackData.XApps) == 0 {
124                 return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
125         }
126         var xapps []rtmgr.XApp
127         err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
128         if err != nil {
129                 return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
130         }
131         return nil
132 }
133
134 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
135         if data != nil {
136                 xapp.Logger.Debug("Received callback data")
137         }
138         err := validateXappCallbackData(data)
139         if err != nil {
140                 xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
141                 return err
142         } else {
143                 datach <- data
144                 return nil
145         }
146 }
147
148 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
149         var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
150         for _, ep := range rtmgr.Eps {
151                 if ep.Ip == *data.Address && ep.Port == *data.Port {
152                         err = nil
153                         break
154                 }
155         }
156         return err
157 }
158
159 func validateE2tData(data *models.E2tData) error {
160         var err = fmt.Errorf("E2T E2TAddress is not proper: %v", *data.E2TAddress)
161 /*      for _, ep := range rtmgr.Eps {
162                 if ep.Ip == *data.Address && ep.Port == *data.Port {
163                         err = nil
164                         break
165                 }
166         }*/
167
168         if (*data.E2TAddress != "") {
169                 err = nil
170         }
171         return err
172 }
173
174 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
175         data *models.XappSubscriptionData) error {
176         xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
177         err := validateXappSubscriptionData(data)
178         if err != nil {
179                 xapp.Logger.Error(err.Error())
180                 return err
181         }
182         subchan <- data
183         //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
184         xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
185         return nil
186 }
187
188 func subscriptionExists(data *models.XappSubscriptionData) bool {
189         present := false
190         sub := rtmgr.Subscription{SubID: *data.SubscriptionID, Fqdn: *data.Address, Port: *data.Port}
191         for _, elem := range rtmgr.Subs {
192                 if elem == sub {
193                         present = true
194                         break
195                 }
196         }
197         return present
198 }
199
200 func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
201         data *models.XappSubscriptionData) error {
202         xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
203         err := validateXappSubscriptionData(data)
204         if err != nil {
205                 xapp.Logger.Error(err.Error())
206                 return err
207         }
208
209         if !subscriptionExists(data) {
210                 xapp.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
211                 err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
212                 return err
213         }
214
215         subdelchan <- data
216         return nil
217 }
218
219 func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
220         data *models.E2tData) error {
221         xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
222         err := validateE2tData(data)
223         if err != nil {
224                 xapp.Logger.Error(err.Error())
225                 return err
226         }
227
228         e2taddchan <- data
229         return nil
230 }
231
232 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData,
233         subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData) {
234         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
235         if err != nil {
236                 //log.Fatalln(err)
237                 xapp.Logger.Error(err.Error())
238                 os.Exit(1)
239         }
240         nbiUrl, err := url.Parse(*nbiif)
241         if err != nil {
242                 xapp.Logger.Error(err.Error())
243                 os.Exit(1)
244         }
245         api := operations.NewRoutingManagerAPI(swaggerSpec)
246         server := restapi.NewServer(api)
247         defer server.Shutdown()
248
249         server.Port, err = strconv.Atoi(nbiUrl.Port())
250         if err != nil {
251                 xapp.Logger.Error("Invalid NBI RestAPI port")
252                 os.Exit(1)
253         }
254         server.Host = "0.0.0.0"
255         // set handlers
256         api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
257                 func(params handle.ProvideXappHandleParams) middleware.Responder {
258                         xapp.Logger.Info("Data received on Http interface")
259                         err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
260                         if err != nil {
261                                 xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
262                                 return handle.NewProvideXappHandleBadRequest()
263                         } else {
264                                 return handle.NewGetHandlesOK()
265                         }
266                 })
267         api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
268                 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
269                         err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
270                         if err != nil {
271                                 return handle.NewProvideXappSubscriptionHandleBadRequest()
272                         } else {
273                                 //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
274                                 time.Sleep(1 * time.Second)
275                                 return handle.NewGetHandlesOK()
276                         }
277                 })
278         api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
279                 func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
280                         err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
281                         if err != nil {
282                                 return handle.NewDeleteXappSubscriptionHandleNoContent()
283                         } else {
284                                 //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
285                                 time.Sleep(1 * time.Second)
286                                 return handle.NewGetHandlesOK()
287                         }
288                 })
289        api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
290                 func(params handle.CreateNewE2tHandleParams) middleware.Responder {
291                         err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
292                         if err != nil {
293                                 return handle.NewCreateNewE2tHandleBadRequest()
294                         } else {
295                                 time.Sleep(1 * time.Second)
296                                 return handle.NewCreateNewE2tHandleCreated()
297                         }
298                 })
299
300         // start to serve API
301         xapp.Logger.Info("Starting the HTTP Rest service")
302         if err := server.Serve(); err != nil {
303                 xapp.Logger.Error(err.Error())
304         }
305 }
306
307 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
308         xapp.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
309         r, err := myClient.Get(xmurl)
310         if err != nil {
311                 return nil, err
312         }
313         defer r.Body.Close()
314
315         if r.StatusCode == 200 {
316                 xapp.Logger.Debug("http client raw response: %v", r)
317                 var xapps []rtmgr.XApp
318                 err = json.NewDecoder(r.Body).Decode(&xapps)
319                 if err != nil {
320                         xapp.Logger.Warn("Json decode failed: " + err.Error())
321                 }
322                 xapp.Logger.Info("HTTP GET: OK")
323                 xapp.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
324                 return &xapps, err
325         }
326         xapp.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
327         return nil, nil
328 }
329
330 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
331         var readErr error
332         var maxRetries = 10
333         for i := 1; i <= maxRetries; i++ {
334                 time.Sleep(2 * time.Second)
335                 xappData, err := httpGetXApps(xmurl)
336                 if xappData != nil && err == nil {
337                         pcData, confErr := rtmgr.GetPlatformComponents(configfile)
338                         if confErr != nil {
339                                 xapp.Logger.Error(confErr.Error())
340                                 return confErr
341                         }
342                         xapp.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
343                         // Combine the xapps data and platform data before writing to the SDL
344                         ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts:  make(map[string]rtmgr.E2TInstance)}
345                         writeErr := sdlEngine.WriteAll(fileName, ricData)
346                         if writeErr != nil {
347                                 xapp.Logger.Error(writeErr.Error())
348                         }
349                         // post subscription req to appmgr
350                         readErr = PostSubReq(xmurl, nbiif)
351                         if readErr == nil {
352                                 return nil
353                         }
354                 } else if err == nil {
355                         readErr = errors.New("unexpected HTTP status code")
356                 } else {
357                         xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
358                         readErr = err
359                 }
360         }
361         return readErr
362 }
363
364 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
365         sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
366         err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
367         if err != nil {
368                 xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
369                 return err
370         }
371
372         datach := make(chan *models.XappCallbackData, 10)
373         subschan := make(chan *models.XappSubscriptionData, 10)
374         subdelchan := make(chan *models.XappSubscriptionData, 10)
375         e2taddchan := make(chan *models.E2tData, 10)
376         xapp.Logger.Info("Launching Rest Http service")
377         go func() {
378                 r.LaunchRest(&nbiif, datach, subschan, subdelchan, e2taddchan)
379         }()
380
381         go func() {
382                 for {
383                         data, err := r.RecvXappCallbackData(datach)
384                         if err != nil {
385                                 xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
386                         } else if data != nil {
387                                 xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
388                                 alldata, err1 := httpGetXApps(xmurl)
389                                 if alldata != nil && err1 == nil {
390                                         sdlEngine.WriteXApps(fileName, alldata)
391                                         triggerSBI <- true
392                                 }
393                         }
394                 }
395         }()
396
397         go func() {
398                 for {
399                         data := <-subschan
400                         xapp.Logger.Debug("received XApp subscription data")
401                         addSubscription(&rtmgr.Subs, data)
402                         triggerSBI <- true
403                 }
404         }()
405
406         go func() {
407                 for {
408                         data := <-subdelchan
409                         xapp.Logger.Debug("received XApp subscription delete data")
410                         delSubscription(&rtmgr.Subs, data)
411                         triggerSBI <- true
412                 }
413         }()
414
415         go func() {
416                 for {
417                         xapp.Logger.Debug("received create New E2T data")
418
419                         data, err := r.RecvNewE2Tdata(e2taddchan)
420                         if err != nil {
421                                 xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
422                         } else if data != nil {
423                                 sdlEngine.WriteNewE2TInstance(fileName, data)
424                                 triggerSBI <- true
425                         }
426                 }
427         }()
428
429         return nil
430 }
431
432 func (r *HttpRestful) Terminate() error {
433         return nil
434 }
435
436 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
437         var b = false
438         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
439         for _, elem := range *subs {
440                 if elem == sub {
441                         xapp.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
442                         b = true
443                 }
444         }
445         if b == false {
446                 *subs = append(*subs, sub)
447         }
448         return b
449 }
450
451 func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
452         xapp.Logger.Debug("Deleteing the subscription from the subscriptions list")
453         var present = false
454         sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
455         for i, elem := range *subs {
456                 if elem == sub {
457                         present = true
458                         // Since the order of the list is not important, we are swapping the last element
459                         // with the matching element and replacing the list with list(n-1) elements.
460                         (*subs)[len(*subs)-1], (*subs)[i] = (*subs)[i], (*subs)[len(*subs)-1]
461                         *subs = (*subs)[:len(*subs)-1]
462                         break
463                 }
464         }
465         if present == false {
466                 xapp.Logger.Warn("rtmgr.delSubscription: Subscription = %v, not present in the existing subscriptions", xappSubData)
467         }
468         return present
469 }