2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
33 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
34 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi"
35 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations"
36 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/health"
37 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/xapp"
38 "github.com/go-openapi/loads"
39 "github.com/go-openapi/runtime/middleware"
40 "github.com/valyala/fastjson"
42 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
43 "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/resthooks"
46 type XappData struct {
56 xappInstance *models.XappInstance
59 var xappmap = map[string]map[string]*XappData{}
61 func NewRestful() *Restful {
63 rh: resthooks.NewResthook(true),
66 r.api = r.SetupHandler()
70 func (r *Restful) Run() {
71 server := restapi.NewServer(r.api)
72 defer server.Shutdown()
74 server.Host = "0.0.0.0"
76 appmgr.Logger.Info("Xapp manager started ... serving on %s:%d\n", server.Host, server.Port)
78 go r.symptomdataServer()
80 if err := server.Serve(); err != nil {
81 log.Fatal(err.Error())
86 func (r *Restful) RetrieveApps() {
87 time.Sleep(5 * time.Second)
88 var xlist models.RegisterRequest
89 applist := r.rh.GetAppsInSDL()
91 appmgr.Logger.Info("List obtained from GetAppsInSDL is %s", *applist)
92 newstring := strings.Split(*applist, " ")
93 for i, _ := range newstring {
94 appmgr.Logger.Debug("Checking for xapp %s", newstring[i])
95 if newstring[i] != "" {
96 err := json.Unmarshal([]byte(newstring[i]), &xlist)
98 appmgr.Logger.Error("Error while unmarshalling")
102 continue //SDL may have empty item,so need to skip
105 xmodel, _ := r.PrepareConfig(xlist, false)
107 appmgr.Logger.Error("Xapp not found, deleting it from DB")
108 r.rh.UpdateAppData(xlist, true)
115 func (r *Restful) SetupHandler() *operations.AppManagerAPI {
116 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
118 appmgr.Logger.Error(err.Error())
121 api := operations.NewAppManagerAPI(swaggerSpec)
123 // URL: /ric/v1/health
124 api.HealthGetHealthAliveHandler = health.GetHealthAliveHandlerFunc(
125 func(params health.GetHealthAliveParams) middleware.Responder {
126 return health.NewGetHealthAliveOK()
129 api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc(
130 func(params health.GetHealthReadyParams) middleware.Responder {
131 return health.NewGetHealthReadyOK()
134 // URL: /ric/v1/subscriptions
135 api.GetSubscriptionsHandler = operations.GetSubscriptionsHandlerFunc(
136 func(params operations.GetSubscriptionsParams) middleware.Responder {
137 return operations.NewGetSubscriptionsOK().WithPayload(r.rh.GetAllSubscriptions())
140 api.GetSubscriptionByIDHandler = operations.GetSubscriptionByIDHandlerFunc(
141 func(params operations.GetSubscriptionByIDParams) middleware.Responder {
142 if result, found := r.rh.GetSubscriptionById(params.SubscriptionID); found {
143 return operations.NewGetSubscriptionByIDOK().WithPayload(&result)
145 return operations.NewGetSubscriptionByIDNotFound()
148 api.AddSubscriptionHandler = operations.AddSubscriptionHandlerFunc(
149 func(params operations.AddSubscriptionParams) middleware.Responder {
150 return operations.NewAddSubscriptionCreated().WithPayload(r.rh.AddSubscription(*params.SubscriptionRequest))
153 api.ModifySubscriptionHandler = operations.ModifySubscriptionHandlerFunc(
154 func(params operations.ModifySubscriptionParams) middleware.Responder {
155 if _, ok := r.rh.ModifySubscription(params.SubscriptionID, *params.SubscriptionRequest); ok {
156 return operations.NewModifySubscriptionOK()
158 return operations.NewModifySubscriptionBadRequest()
161 api.DeleteSubscriptionHandler = operations.DeleteSubscriptionHandlerFunc(
162 func(params operations.DeleteSubscriptionParams) middleware.Responder {
163 if _, ok := r.rh.DeleteSubscription(params.SubscriptionID); ok {
164 return operations.NewDeleteSubscriptionNoContent()
166 return operations.NewDeleteSubscriptionBadRequest()
170 api.XappGetAllXappsHandler = xapp.GetAllXappsHandlerFunc(
171 func(params xapp.GetAllXappsParams) middleware.Responder {
172 if result, err := r.GetApps(); err == nil {
173 return xapp.NewGetAllXappsOK().WithPayload(result)
175 return xapp.NewGetAllXappsInternalServerError()
178 // URL: /ric/v1/config
179 api.XappGetAllXappConfigHandler = xapp.GetAllXappConfigHandlerFunc(
180 func(params xapp.GetAllXappConfigParams) middleware.Responder {
181 return xapp.NewGetAllXappConfigOK().WithPayload(r.getAppConfig())
184 api.RegisterXappHandler = operations.RegisterXappHandlerFunc(
185 func(params operations.RegisterXappParams) middleware.Responder {
186 appmgr.Logger.Info("appname is %s", (*params.RegisterRequest.AppName))
187 appmgr.Logger.Info("endpoint is %s", (*params.RegisterRequest.HTTPEndpoint))
188 appmgr.Logger.Info("rmrendpoint is %s", (*params.RegisterRequest.RmrEndpoint))
189 if result, err := r.RegisterXapp(*params.RegisterRequest); err == nil {
190 go r.rh.PublishSubscription(*result, models.EventTypeDeployed)
191 return operations.NewRegisterXappCreated()
193 return operations.NewRegisterXappBadRequest()
196 api.DeregisterXappHandler = operations.DeregisterXappHandlerFunc(
197 func(params operations.DeregisterXappParams) middleware.Responder {
198 appmgr.Logger.Info("appname is %s", (*params.DeregisterRequest.AppName))
199 if result, err := r.DeregisterXapp(*params.DeregisterRequest); err == nil {
200 go r.rh.PublishSubscription(*result, models.EventTypeUndeployed)
201 return operations.NewDeregisterXappNoContent()
203 return operations.NewDeregisterXappBadRequest()
209 func httpGetXAppsconfig(url string) *string {
210 appmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + url)
211 resp, err := http.Get(url)
213 appmgr.Logger.Error("Error while querying config to Xapp: ", err.Error())
216 defer resp.Body.Close()
218 if resp.StatusCode == http.StatusOK {
219 var data XappConfigList
220 appmgr.Logger.Info("http client raw response: %v", resp)
221 if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
222 appmgr.Logger.Error("Json decode failed: " + err.Error())
225 //data[0] assuming only for one app
226 str := fmt.Sprintf("%v", data[0].Config)
227 appmgr.Logger.Info("HTTP BODY: %v", str)
232 appmgr.Logger.Info("httprestful got an unexpected http status code: %v", resp.StatusCode)
237 func parseConfig(config *string) *appmgr.RtmData {
238 var p fastjson.Parser
239 var msgs appmgr.RtmData
241 v, err := p.Parse(*config)
243 appmgr.Logger.Info("fastjson.Parser for failed: %v", err)
248 for _, m := range v.GetArray("rmr", "txMessages") {
249 msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
252 for _, m := range v.GetArray("rmr", "rxMessages") {
253 msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
256 for _, m := range v.GetArray("rmr", "policies") {
257 if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
258 msgs.Policies = append(msgs.Policies, int64(val))
262 for _, p := range v.GetArray("messaging", "ports") {
263 appmgr.Logger.Info("txMessages=%v, rxMessages=%v", p.GetArray("txMessages"), p.GetArray("rxMessages"))
264 for _, m := range p.GetArray("txMessages") {
265 msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
268 for _, m := range p.GetArray("rxMessages") {
269 msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
272 for _, m := range p.GetArray("policies") {
273 if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
274 msgs.Policies = append(msgs.Policies, int64(val))
282 func (r *Restful) RegisterXapp(params models.RegisterRequest) (xapp *models.Xapp, err error) {
283 return r.PrepareConfig(params, true)
286 func (r *Restful) DeregisterXapp(params models.DeregisterRequest) (xapp *models.Xapp, err error) {
287 var registeredlist models.RegisterRequest
288 registeredlist.AppName = params.AppName
289 registeredlist.AppInstanceName = params.AppInstanceName
290 if _, found := xappmap[*params.AppName]; found {
292 x.Instances = append(x.Instances, xappmap[*params.AppName][*params.AppInstanceName].xappInstance)
293 registeredlist.HTTPEndpoint = &xappmap[*params.AppName][*params.AppInstanceName].httpendpoint
294 delete(xappmap[*params.AppName], *params.AppInstanceName)
295 if len(xappmap[*params.AppName]) == 0 {
296 delete(xappmap, *params.AppName)
298 r.rh.UpdateAppData(registeredlist, true)
301 appmgr.Logger.Error("XApp Instance %v Not Found", *params.AppName)
302 return nil, errors.New("XApp Instance Not Found")
306 func (r *Restful) PrepareConfig(params models.RegisterRequest, updateflag bool) (xapp *models.Xapp, err error) {
308 configPresent := false
309 var xappconfig *string
310 appmgr.Logger.Info("http endpoint is %s", *params.HTTPEndpoint)
311 for i := 1; i <= maxRetries; i++ {
312 if params.Config != "" {
313 appmgr.Logger.Info("Getting config during xapp register: %v", params.Config)
314 xappconfig = ¶ms.Config
317 appmgr.Logger.Info("Getting config from xapp:")
318 xappconfig = httpGetXAppsconfig(fmt.Sprintf("http://%s%s", *params.HTTPEndpoint, params.ConfigPath))
321 if xappconfig != nil {
322 data := parseConfig(xappconfig)
326 xapp.Name = params.AppName
327 xapp.Version = params.AppVersion
328 //xapp.Status = params.Status
330 r.rh.UpdateAppData(params, updateflag)
331 return r.FillInstanceData(params, &xapp, *data, configPresent)
334 appmgr.Logger.Error("No Data from xapp")
336 if configPresent == true {
340 appmgr.Logger.Info("Retrying query configuration from xapp, try no : %d", i)
341 time.Sleep(4 * time.Second)
343 return nil, errors.New("Unable to get configmap after 5 retries")
346 func (r *Restful) FillInstanceData(params models.RegisterRequest, xapp *models.Xapp, rtData appmgr.RtmData, configFlag bool) (xapps *models.Xapp, err error) {
348 endPointStr := strings.Split(*params.RmrEndpoint, ":")
349 var x models.XappInstance
350 x.Name = params.AppInstanceName
351 //x.Status = strings.ToLower(params.Status)
352 x.Status = "deployed"
353 //x.IP = endPointStr[0]
354 x.IP = fmt.Sprintf("service-ricxapp-%s-rmr.ricxapp", *params.AppInstanceName)
355 x.Port, _ = strconv.ParseInt(endPointStr[1], 10, 64)
356 x.TxMessages = rtData.TxMessages
357 x.RxMessages = rtData.RxMessages
358 x.Policies = rtData.Policies
359 xapp.Instances = append(xapp.Instances, &x)
360 rmrsrvname := fmt.Sprintf("service-ricxapp-%s-rmr.ricxapp:%s", *params.AppInstanceName, x.Port)
362 a := &XappData{httpendpoint: *params.HTTPEndpoint,
363 rmrendpoint: *params.RmrEndpoint,
364 rmrserviceep: rmrsrvname,
366 xappname: *params.AppName,
367 xappversion: params.AppVersion,
368 xappinstname: *params.AppInstanceName,
369 xappconfigpath: params.ConfigPath,
370 xappdynamiconfig: configFlag,
373 if _, ok := xappmap[*params.AppName]; ok {
374 xappmap[*params.AppName][*params.AppInstanceName] = a
375 appmgr.Logger.Info("appname already present, %v", xappmap[*params.AppName])
377 xappmap[*params.AppName] = make(map[string]*XappData)
378 xappmap[*params.AppName][*params.AppInstanceName] = a
379 appmgr.Logger.Info("Creating app instance, %v", xappmap[*params.AppName])
386 func (r *Restful) GetApps() (xapps models.AllDeployedXapps, err error) {
387 xapps = models.AllDeployedXapps{}
388 for _, v := range xappmap {
390 for i, j := range v {
393 x.Version = j.xappversion
394 appmgr.Logger.Info("Xapps details currently in map Appname = %v,rmrendpoint = %v,Status = %v", i, j.rmrendpoint, j.status)
395 x.Instances = append(x.Instances, j.xappInstance)
397 xapps = append(xapps, &x)
404 func (r *Restful) getAppConfig() (configList models.AllXappConfig) {
405 for _, v := range xappmap {
406 namespace := "ricxapp" //Namespace hardcoded, to be removed later
407 for _, j := range v {
408 var activeConfig interface{}
409 if j.xappdynamiconfig {
412 xappconfig := httpGetXAppsconfig(fmt.Sprintf("http://%s%s", j.httpendpoint, j.xappconfigpath))
414 if xappconfig == nil {
415 appmgr.Logger.Info("config not found for %s", &j.xappname)
418 json.Unmarshal([]byte(*xappconfig), &activeConfig)
420 c := models.XAppConfig{
421 Metadata: &models.ConfigMetadata{XappName: &j.xappname, Namespace: &namespace},
422 Config: activeConfig,
424 configList = append(configList, &c)
432 func (r *Restful) symptomdataServer() {
433 http.HandleFunc("/ric/v1/symptomdata", func(w http.ResponseWriter, req *http.Request) {
436 XappList models.AllDeployedXapps `json:"xappList"`
437 ConfigList models.AllXappConfig `json:"configList"`
438 SubscriptionList models.AllSubscriptions `json:"subscriptionList"`
442 r.rh.GetAllSubscriptions(),
445 w.Header().Set("Content-Type", "application/json")
446 w.Header().Set("Content-Disposition", "attachment; filename=platform/apps_info.json")
447 w.WriteHeader(http.StatusOK)
448 resp, _ := json.MarshalIndent(xappData, "", " ")
452 http.ListenAndServe(":8081", nil)