Release of Routing Manager v0.3.0
[ric-plt/rtmgr.git] / pkg / nbi / httprestful.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /*
20   Mnemonic:     httprestful.go
21   Abstract:     HTTP Restful API NBI implementation
22                 Based on Swagger generated code
23   Date:         25 March 2019
24 */
25
26 package nbi
27
28 import (
29         "fmt"
30         "os"
31         "time"
32         "net/url"
33         "strconv"
34         "errors"
35         "encoding/json"
36         "routing-manager/pkg/rtmgr"
37         "routing-manager/pkg/rpe"
38         "routing-manager/pkg/sdl"
39         "routing-manager/pkg/models"
40         "routing-manager/pkg/restapi"
41         "routing-manager/pkg/restapi/operations"
42         "github.com/go-openapi/runtime/middleware"
43         "routing-manager/pkg/restapi/operations/handle"
44         loads "github.com/go-openapi/loads"
45 )
46
47 //var myClient = &http.Client{Timeout: 1 * time.Second}
48
49 type HttpRestful struct {
50         NbiEngine
51         LaunchRest                   LaunchRestHandler
52         RecvXappCallbackData         RecvXappCallbackDataHandler
53         ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
54         RetrieveStartupData          RetrieveStartupDataHandler
55 }
56
57 func NewHttpRestful() *HttpRestful {
58         instance := new(HttpRestful)
59         instance.LaunchRest = launchRest
60         instance.RecvXappCallbackData = recvXappCallbackData
61         instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
62         instance.RetrieveStartupData = retrieveStartupData
63         return instance
64 }
65
66 // ToDo: Use Range over channel. Read and return only the latest one.
67 func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
68         var xappData *models.XappCallbackData
69         // Drain the channel as we are only looking for the latest value until
70         // xapp manager sends all xapp data with every request.
71         length := len(dataChannel)
72         //rtmgr.Logger.Info(length)
73         for i := 0; i <= length; i++ {
74                 rtmgr.Logger.Info("data received")
75                 // If no data received from the REST, it blocks.
76                 xappData = <-dataChannel
77         }
78         if nil != xappData {
79                 var xapps []rtmgr.XApp
80                 err := json.Unmarshal([]byte(xappData.Data), &xapps)
81                 return &xapps, err
82         } else {
83                 rtmgr.Logger.Info("No data")
84         }
85
86         rtmgr.Logger.Debug("Nothing received on the Http interface")
87         return nil, nil
88
89 }
90
91 func validateXappCallbackData(callbackData *models.XappCallbackData) error {
92         if len(callbackData.Data) == 0 {
93                 return fmt.Errorf("Invalid Data field: \"%s\"", callbackData.Data)
94         }
95         var xapps []rtmgr.XApp
96         err := json.Unmarshal([]byte(callbackData.Data), &xapps)
97         if err != nil {
98                 return fmt.Errorf("Unmarshal failed: \"%s\"", err.Error())
99         }
100         return nil
101 }
102
103 func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
104         if data != nil {
105                 rtmgr.Logger.Debug("Received callback data")
106         }
107         err := validateXappCallbackData(data)
108         if err != nil {
109                 rtmgr.Logger.Debug("XApp callback data validation failed: "+err.Error())
110                 return err
111         } else {
112                 datach<-data
113                 return nil
114         }
115 }
116
117 func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
118         var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
119         for _, ep := range rtmgr.Eps {
120                 if ep.Ip == *data.Address && ep.Port == *data.Port {
121                         err = nil
122                         break
123                 }
124         }
125         return err
126 }
127
128 func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
129                                         data *models.XappSubscriptionData) error {
130         rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
131         err := validateXappSubscriptionData(data)
132         if err != nil {
133                 rtmgr.Logger.Error(err.Error())
134                 return err
135         }
136         subchan <- data
137         //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
138         rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
139         return nil
140 }
141
142 func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData) {
143         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
144         if err != nil {
145                 //log.Fatalln(err)
146                 rtmgr.Logger.Error(err.Error())
147                 os.Exit(1)
148         }
149         nbiUrl, err := url.Parse(*nbiif)
150         if err != nil {
151                 rtmgr.Logger.Error(err.Error())
152                 os.Exit(1)
153         }
154         api := operations.NewRoutingManagerAPI(swaggerSpec)
155         server := restapi.NewServer(api)
156         defer server.Shutdown()
157
158         server.Port, err = strconv.Atoi(nbiUrl.Port())
159         if err != nil {
160                rtmgr.Logger.Error("Invalid NBI RestAPI port")
161                os.Exit(1)
162         }
163         server.Host = nbiUrl.Hostname()
164         // set handlers
165         api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
166                 func(params handle.ProvideXappHandleParams) middleware.Responder {
167                 rtmgr.Logger.Info("Data received on Http interface")
168                 err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
169                 if err != nil {
170                         rtmgr.Logger.Error("Invalid XApp callback data: "+err.Error())
171                         return handle.NewProvideXappHandleBadRequest()
172                 } else {
173                         return handle.NewGetHandlesOK()
174                 }
175         })
176         api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
177                 func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
178                         err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
179                         if err != nil {
180                                 return handle.NewProvideXappSubscriptionHandleBadRequest()
181                         } else {
182                                 return handle.NewGetHandlesOK()
183                         }
184                 })
185         // start to serve API
186         rtmgr.Logger.Info("Starting the HTTP Rest service")
187         if err := server.Serve(); err != nil {
188                 rtmgr.Logger.Error(err.Error())
189         }
190 }
191
192 func httpGetXapps(xmurl string) (*[]rtmgr.XApp, error) {
193         rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
194         r, err := myClient.Get(xmurl)
195         if err != nil {
196                 return nil, err
197         }
198         defer r.Body.Close()
199
200         if r.StatusCode == 200 {
201                 rtmgr.Logger.Debug("http client raw response: %v", r)
202                 var xapps []rtmgr.XApp
203                 err = json.NewDecoder(r.Body).Decode(&xapps)
204                 if err != nil {
205                         rtmgr.Logger.Warn("Json decode failed: " + err.Error())
206                 }
207                 rtmgr.Logger.Info("HTTP GET: OK")
208                 rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
209                 return &xapps, err
210         }
211         rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
212         return nil, nil
213 }
214
215 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.SdlEngine) error {
216         var readErr error
217         var maxRetries = 10
218
219                 for i := 1; i <= maxRetries; i++ {
220                         time.Sleep(2 * time.Second)
221
222                         xappData, err := httpGetXapps(xmurl)
223
224                         if xappData != nil && err == nil {
225                                 pcData, confErr := rtmgr.GetPlatformComponents(configfile)
226                                 if confErr != nil {
227                                         rtmgr.Logger.Error(confErr.Error())
228                                         return confErr
229                                 }
230
231                                 rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
232                                 // Combine the xapps data and platform data before writing to the SDL
233                                 ricData := &rtmgr.RicComponents{Xapps: *xappData, Pcs: *pcData}
234
235                                 writeErr := sdlEngine.WriteAll(fileName, ricData)
236                                 if writeErr != nil {
237                                         rtmgr.Logger.Error(writeErr.Error())
238                                 }
239                                 // post subscription req to appmgr
240                                 readErr = PostSubReq(xmurl, nbiif)
241                                 if readErr == nil {
242                                         return nil
243                                 }
244                         } else if err == nil {
245                                 readErr = errors.New("Unexpected HTTP status code")
246                         } else {
247                                 rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
248                                 readErr = err
249                         }
250                 }
251         return readErr
252 }
253
254 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
255                                  sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
256         err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
257         if err != nil {
258                 rtmgr.Logger.Error("Exiting as nbi failed to get the intial startup data from the xapp manager: " + err.Error())
259                 return err
260         }
261
262         datach := make(chan *models.XappCallbackData, 10)
263         subschan := make(chan *models.XappSubscriptionData, 10)
264         rtmgr.Logger.Info("Launching Rest Http service")
265         go func() {
266                 r.LaunchRest(&nbiif, datach, subschan)
267         }()
268
269         go func() {
270                 for {
271                         data, err := r.RecvXappCallbackData(datach)
272                         if err != nil {
273                                 rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
274                         } else if data != nil {
275                                 sdlEngine.WriteXapps(fileName, data)
276                                 triggerSBI <- true
277                         }
278                 }
279         }()
280         go func() {
281                 for {
282                         data := <-subschan
283                         rtmgr.Logger.Debug("received XApp subscription data")
284                         addSubscription(&rtmgr.Subs, data)
285                         triggerSBI <- true
286                 }
287         }()
288
289         return nil
290 }
291
292 func (r *HttpRestful) Terminate() error {
293         return nil
294 }
295
296 func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
297         var b bool = false
298         sub := rtmgr.Subscription{SubID:*xappSubData.SubscriptionID, Fqdn:*xappSubData.Address, Port:*xappSubData.Port,}
299         for _, elem := range *subs {
300                 if elem == sub {
301                         rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
302                         b = true
303                 }
304         }
305         if b == false {
306                 *subs = append(*subs, sub)
307         }
308         return b
309 }
310