2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
33 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
34 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi"
35 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations"
36 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/health"
37 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/xapp"
38 "github.com/go-openapi/loads"
39 "github.com/go-openapi/runtime/middleware"
40 "github.com/valyala/fastjson"
42 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
43 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/resthooks"
46 type XappData struct {
56 xappInstance *models.XappInstance
59 var xappmap = map[string]map[string]*XappData{}
61 func NewRestful() *Restful {
63 rh: resthooks.NewResthook(true),
66 r.api = r.SetupHandler()
70 func (r *Restful) Run() {
71 server := restapi.NewServer(r.api)
72 defer server.Shutdown()
74 server.Host = "0.0.0.0"
76 appmgr.Logger.Info("Xapp manager started ... serving on %s:%d\n", server.Host, server.Port)
78 go r.symptomdataServer()
80 if err := server.Serve(); err != nil {
81 log.Fatal(err.Error())
86 func (r *Restful) RetrieveApps() {
87 time.Sleep(5 * time.Second)
88 var xlist models.RegisterRequest
89 applist := r.rh.GetAppsInSDL()
91 appmgr.Logger.Info("List obtained from GetAppsInSDL is %s", *applist)
92 newstring := strings.Split(*applist, " ")
93 for i, _ := range newstring {
94 appmgr.Logger.Debug("Checking for xapp %s", newstring[i])
95 if newstring[i] != "" {
96 err := json.Unmarshal([]byte(newstring[i]), &xlist)
98 appmgr.Logger.Error("Error while unmarshalling")
102 continue //SDL may have empty item,so need to skip
105 xmodel, _ := r.PrepareConfig(xlist, false)
107 appmgr.Logger.Error("Xapp not found, deleting it from DB")
108 r.rh.UpdateAppData(xlist, true)
115 func (r *Restful) SetupHandler() *operations.AppManagerAPI {
116 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
118 appmgr.Logger.Error(err.Error())
121 api := operations.NewAppManagerAPI(swaggerSpec)
123 // URL: /ric/v1/health
124 api.HealthGetHealthAliveHandler = health.GetHealthAliveHandlerFunc(
125 func(params health.GetHealthAliveParams) middleware.Responder {
126 return health.NewGetHealthAliveOK()
129 api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc(
130 func(params health.GetHealthReadyParams) middleware.Responder {
131 return health.NewGetHealthReadyOK()
134 // URL: /ric/v1/subscriptions
135 api.GetSubscriptionsHandler = operations.GetSubscriptionsHandlerFunc(
136 func(params operations.GetSubscriptionsParams) middleware.Responder {
137 return operations.NewGetSubscriptionsOK().WithPayload(r.rh.GetAllSubscriptions())
140 api.GetSubscriptionByIDHandler = operations.GetSubscriptionByIDHandlerFunc(
141 func(params operations.GetSubscriptionByIDParams) middleware.Responder {
142 if result, found := r.rh.GetSubscriptionById(params.SubscriptionID); found {
143 return operations.NewGetSubscriptionByIDOK().WithPayload(&result)
145 return operations.NewGetSubscriptionByIDNotFound()
148 api.AddSubscriptionHandler = operations.AddSubscriptionHandlerFunc(
149 func(params operations.AddSubscriptionParams) middleware.Responder {
150 return operations.NewAddSubscriptionCreated().WithPayload(r.rh.AddSubscription(*params.SubscriptionRequest))
153 api.ModifySubscriptionHandler = operations.ModifySubscriptionHandlerFunc(
154 func(params operations.ModifySubscriptionParams) middleware.Responder {
155 if _, ok := r.rh.ModifySubscription(params.SubscriptionID, *params.SubscriptionRequest); ok {
156 return operations.NewModifySubscriptionOK()
158 return operations.NewModifySubscriptionBadRequest()
161 api.DeleteSubscriptionHandler = operations.DeleteSubscriptionHandlerFunc(
162 func(params operations.DeleteSubscriptionParams) middleware.Responder {
163 if _, ok := r.rh.DeleteSubscription(params.SubscriptionID); ok {
164 return operations.NewDeleteSubscriptionNoContent()
166 return operations.NewDeleteSubscriptionBadRequest()
170 api.XappGetAllXappsHandler = xapp.GetAllXappsHandlerFunc(
171 func(params xapp.GetAllXappsParams) middleware.Responder {
172 if result, err := r.GetApps(); err == nil {
173 return xapp.NewGetAllXappsOK().WithPayload(result)
175 return xapp.NewGetAllXappsInternalServerError()
178 // URL: /ric/v1/config
179 api.XappGetAllXappConfigHandler = xapp.GetAllXappConfigHandlerFunc(
180 func(params xapp.GetAllXappConfigParams) middleware.Responder {
181 return xapp.NewGetAllXappConfigOK().WithPayload(r.getAppConfig())
184 api.RegisterXappHandler = operations.RegisterXappHandlerFunc(
185 func(params operations.RegisterXappParams) middleware.Responder {
186 appmgr.Logger.Info("appname is %s", (*params.RegisterRequest.AppName))
187 appmgr.Logger.Info("endpoint is %s", (*params.RegisterRequest.HTTPEndpoint))
188 appmgr.Logger.Info("rmrendpoint is %s", (*params.RegisterRequest.RmrEndpoint))
189 if result, err := r.RegisterXapp(*params.RegisterRequest); err == nil {
190 go r.rh.PublishSubscription(*result, models.EventTypeDeployed)
191 return operations.NewRegisterXappCreated()
193 return operations.NewRegisterXappBadRequest()
196 api.DeregisterXappHandler = operations.DeregisterXappHandlerFunc(
197 func(params operations.DeregisterXappParams) middleware.Responder {
198 appmgr.Logger.Info("appname is %s", (*params.DeregisterRequest.AppName))
199 if result, err := r.DeregisterXapp(*params.DeregisterRequest); err == nil {
200 go r.rh.PublishSubscription(*result, models.EventTypeUndeployed)
201 return operations.NewDeregisterXappNoContent()
203 return operations.NewDeregisterXappBadRequest()
209 func httpGetXAppsconfig(url string) *string {
210 appmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + url)
211 resp, err := http.Get(url)
215 defer resp.Body.Close()
217 if resp.StatusCode == http.StatusOK {
218 var data XappConfigList
219 appmgr.Logger.Info("http client raw response: %v", resp)
220 if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
221 appmgr.Logger.Error("Json decode failed: " + err.Error())
224 //data[0] assuming only for one app
225 str := fmt.Sprintf("%v", data[0].Config)
226 appmgr.Logger.Info("HTTP BODY: %v", str)
231 appmgr.Logger.Info("httprestful got an unexpected http status code: %v", resp.StatusCode)
236 func parseConfig(config *string) *appmgr.RtmData {
237 var p fastjson.Parser
238 var msgs appmgr.RtmData
240 v, err := p.Parse(*config)
242 appmgr.Logger.Info("fastjson.Parser for failed: %v", err)
247 for _, m := range v.GetArray("rmr", "txMessages") {
248 msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
251 for _, m := range v.GetArray("rmr", "rxMessages") {
252 msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
255 for _, m := range v.GetArray("rmr", "policies") {
256 if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
257 msgs.Policies = append(msgs.Policies, int64(val))
261 for _, p := range v.GetArray("messaging", "ports") {
262 appmgr.Logger.Info("txMessages=%v, rxMessages=%v", p.GetArray("txMessages"), p.GetArray("rxMessages"))
263 for _, m := range p.GetArray("txMessages") {
264 msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
267 for _, m := range p.GetArray("rxMessages") {
268 msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
271 for _, m := range p.GetArray("policies") {
272 if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
273 msgs.Policies = append(msgs.Policies, int64(val))
281 func (r *Restful) RegisterXapp(params models.RegisterRequest) (xapp *models.Xapp, err error) {
282 return r.PrepareConfig(params, true)
285 func (r *Restful) DeregisterXapp(params models.DeregisterRequest) (xapp *models.Xapp, err error) {
286 var registeredlist models.RegisterRequest
287 registeredlist.AppName = params.AppName
288 registeredlist.AppInstanceName = params.AppInstanceName
289 if _, found := xappmap[*params.AppName]; found {
291 x.Instances = append(x.Instances, xappmap[*params.AppName][*params.AppInstanceName].xappInstance)
292 registeredlist.HTTPEndpoint = &xappmap[*params.AppName][*params.AppInstanceName].httpendpoint
293 delete(xappmap[*params.AppName], *params.AppInstanceName)
294 if len(xappmap[*params.AppName]) == 0 {
295 delete(xappmap, *params.AppName)
297 r.rh.UpdateAppData(registeredlist, true)
300 appmgr.Logger.Error("XApp Instance %v Not Found", *params.AppName)
301 return nil, errors.New("XApp Instance Not Found")
305 func (r *Restful) PrepareConfig(params models.RegisterRequest, updateflag bool) (xapp *models.Xapp, err error) {
307 configPresent := false
308 var xappconfig *string
309 appmgr.Logger.Info("http endpoint is %s", *params.HTTPEndpoint)
310 for i := 1; i <= maxRetries; i++ {
311 if params.Config != "" {
312 appmgr.Logger.Info("Getting config during xapp register: %v", params.Config)
313 xappconfig = ¶ms.Config
316 appmgr.Logger.Info("Getting config from xapp:")
317 xappconfig = httpGetXAppsconfig(fmt.Sprintf("http://%s%s", *params.HTTPEndpoint, params.ConfigPath))
320 if xappconfig != nil {
321 data := parseConfig(xappconfig)
323 appmgr.Logger.Info("iRetry Count = %v", i)
326 xapp.Name = params.AppName
327 xapp.Version = params.AppVersion
328 //xapp.Status = params.Status
330 r.rh.UpdateAppData(params, updateflag)
331 return r.FillInstanceData(params, &xapp, *data, configPresent)
334 appmgr.Logger.Error("No Data from xapp")
336 if configPresent == true {
339 time.Sleep(2 * time.Second)
342 return nil, errors.New("Unable to get configmap after 5 retries")
345 func (r *Restful) FillInstanceData(params models.RegisterRequest, xapp *models.Xapp, rtData appmgr.RtmData, configFlag bool) (xapps *models.Xapp, err error) {
347 endPointStr := strings.Split(*params.RmrEndpoint, ":")
348 var x models.XappInstance
349 x.Name = params.AppInstanceName
350 //x.Status = strings.ToLower(params.Status)
351 x.Status = "deployed"
352 //x.IP = endPointStr[0]
353 x.IP = fmt.Sprintf("service-ricxapp-%s-rmr.ricxapp", *params.AppInstanceName)
354 x.Port, _ = strconv.ParseInt(endPointStr[1], 10, 64)
355 x.TxMessages = rtData.TxMessages
356 x.RxMessages = rtData.RxMessages
357 x.Policies = rtData.Policies
358 xapp.Instances = append(xapp.Instances, &x)
359 rmrsrvname := fmt.Sprintf("service-ricxapp-%s-rmr.ricxapp:%s", *params.AppInstanceName, x.Port)
361 a := &XappData{httpendpoint: *params.HTTPEndpoint,
362 rmrendpoint: *params.RmrEndpoint,
363 rmrserviceep: rmrsrvname,
365 xappname: *params.AppName,
366 xappversion: params.AppVersion,
367 xappinstname: *params.AppInstanceName,
368 xappconfigpath: params.ConfigPath,
369 xappdynamiconfig: configFlag,
372 if _, ok := xappmap[*params.AppName]; ok {
373 xappmap[*params.AppName][*params.AppInstanceName] = a
374 appmgr.Logger.Info("appname already present, %v", xappmap[*params.AppName])
376 xappmap[*params.AppName] = make(map[string]*XappData)
377 xappmap[*params.AppName][*params.AppInstanceName] = a
378 appmgr.Logger.Info("Creating app instance, %v", xappmap[*params.AppName])
385 func (r *Restful) GetApps() (xapps models.AllDeployedXapps, err error) {
386 xapps = models.AllDeployedXapps{}
387 for _, v := range xappmap {
389 for i, j := range v {
392 x.Version = j.xappversion
393 appmgr.Logger.Info("Xapps details currently in map Appname = %v,rmrendpoint = %v,Status = %v", i, j.rmrendpoint, j.status)
394 x.Instances = append(x.Instances, j.xappInstance)
396 xapps = append(xapps, &x)
403 func (r *Restful) getAppConfig() (configList models.AllXappConfig) {
404 for _, v := range xappmap {
405 namespace := "ricxapp" //Namespace hardcoded, to be removed later
406 for _, j := range v {
407 var activeConfig interface{}
408 if j.xappdynamiconfig {
411 xappconfig := httpGetXAppsconfig(fmt.Sprintf("http://%s%s", j.httpendpoint, j.xappconfigpath))
413 if xappconfig == nil {
414 appmgr.Logger.Info("config not found for %s", &j.xappname)
417 json.Unmarshal([]byte(*xappconfig), &activeConfig)
419 c := models.XAppConfig{
420 Metadata: &models.ConfigMetadata{XappName: &j.xappname, Namespace: &namespace},
421 Config: activeConfig,
423 configList = append(configList, &c)
431 func (r *Restful) symptomdataServer() {
432 http.HandleFunc("/ric/v1/symptomdata", func(w http.ResponseWriter, req *http.Request) {
435 XappList models.AllDeployedXapps `json:"xappList"`
436 ConfigList models.AllXappConfig `json:"configList"`
437 SubscriptionList models.AllSubscriptions `json:"subscriptionList"`
441 r.rh.GetAllSubscriptions(),
444 w.Header().Set("Content-Type", "application/json")
445 w.Header().Set("Content-Disposition", "attachment; filename=platform/apps_info.json")
446 w.WriteHeader(http.StatusOK)
447 resp, _ := json.MarshalIndent(xappData, "", " ")
451 http.ListenAndServe(":8081", nil)