package restful
import (
- //"github.com/spf13/viper"
+ "encoding/json"
+ "errors"
+ "fmt"
+ //"io/ioutil"
"log"
+ "net/http"
"os"
+ "strconv"
+ "strings"
"time"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/models"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/restapi"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/restapi/operations"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/restapi/operations/health"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/restapi/operations/xapp"
+ "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
+ "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi"
+ "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations"
+ "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/health"
+ "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/xapp"
"github.com/go-openapi/loads"
"github.com/go-openapi/runtime/middleware"
+ "github.com/valyala/fastjson"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/appmgr"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/cm"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/helm"
- "gerrit.oran-osc.org/r/ric-plt/appmgr/pkg/resthooks"
+ "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
+ "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/resthooks"
)
+type XappData struct {
+ httpendpoint string
+ rmrendpoint string
+ rmrserviceep string
+ status string
+ xappname string
+ xappinstname string
+ xappversion string
+ xappconfigpath string
+ xappdynamiconfig bool
+ xappInstance *models.XappInstance
+}
+
+var xappmap = map[string]map[string]*XappData{}
+
func NewRestful() *Restful {
r := &Restful{
- helm: helm.NewHelm(),
- cm: cm.NewCM(),
rh: resthooks.NewResthook(true),
ready: false,
}
appmgr.Logger.Info("Xapp manager started ... serving on %s:%d\n", server.Host, server.Port)
- go r.NotifyClients()
+ go r.RetrieveApps()
if err := server.Serve(); err != nil {
log.Fatal(err.Error())
}
+
+}
+
+func (r *Restful) RetrieveApps() {
+ time.Sleep(5 * time.Second)
+ var xlist models.RegisterRequest
+ applist := r.rh.GetAppsInSDL()
+ if applist != nil {
+ appmgr.Logger.Info("List obtained from GetAppsInSDL is %s", *applist)
+ newstring := strings.Split(*applist, " ")
+ for i, _ := range newstring {
+ appmgr.Logger.Debug("Checking for xapp %s", newstring[i])
+ if newstring[i] != "" {
+ err := json.Unmarshal([]byte(newstring[i]), &xlist)
+ if err != nil {
+ appmgr.Logger.Error("Error while unmarshalling")
+ continue
+ }
+ } else {
+ continue //SDL may have empty item,so need to skip
+ }
+
+ xmodel, _ := r.PrepareConfig(xlist, false)
+ if xmodel == nil {
+ appmgr.Logger.Error("Xapp not found, deleting it from DB")
+ r.rh.UpdateAppData(xlist, true)
+ }
+ }
+ }
+
}
func (r *Restful) SetupHandler() *operations.AppManagerAPI {
func(params health.GetHealthAliveParams) middleware.Responder {
return health.NewGetHealthAliveOK()
})
+
api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc(
func(params health.GetHealthReadyParams) middleware.Responder {
return health.NewGetHealthReadyOK()
func(params operations.GetSubscriptionsParams) middleware.Responder {
return operations.NewGetSubscriptionsOK().WithPayload(r.rh.GetAllSubscriptions())
})
+
api.GetSubscriptionByIDHandler = operations.GetSubscriptionByIDHandlerFunc(
func(params operations.GetSubscriptionByIDParams) middleware.Responder {
if result, found := r.rh.GetSubscriptionById(params.SubscriptionID); found {
}
return operations.NewGetSubscriptionByIDNotFound()
})
+
api.AddSubscriptionHandler = operations.AddSubscriptionHandlerFunc(
func(params operations.AddSubscriptionParams) middleware.Responder {
return operations.NewAddSubscriptionCreated().WithPayload(r.rh.AddSubscription(*params.SubscriptionRequest))
})
+
api.ModifySubscriptionHandler = operations.ModifySubscriptionHandlerFunc(
func(params operations.ModifySubscriptionParams) middleware.Responder {
if _, ok := r.rh.ModifySubscription(params.SubscriptionID, *params.SubscriptionRequest); ok {
}
return operations.NewModifySubscriptionBadRequest()
})
+
api.DeleteSubscriptionHandler = operations.DeleteSubscriptionHandlerFunc(
func(params operations.DeleteSubscriptionParams) middleware.Responder {
if _, ok := r.rh.DeleteSubscription(params.SubscriptionID); ok {
// URL: /ric/v1/xapp
api.XappGetAllXappsHandler = xapp.GetAllXappsHandlerFunc(
func(params xapp.GetAllXappsParams) middleware.Responder {
- if result, err := r.helm.StatusAll(); err == nil {
+ if result, err := r.GetApps(); err == nil {
return xapp.NewGetAllXappsOK().WithPayload(result)
}
return xapp.NewGetAllXappsInternalServerError()
})
- api.XappListAllXappsHandler = xapp.ListAllXappsHandlerFunc(
- func(params xapp.ListAllXappsParams) middleware.Responder {
- if result := r.helm.SearchAll(); err == nil {
- return xapp.NewListAllXappsOK().WithPayload(result)
- }
- return xapp.NewListAllXappsInternalServerError()
+
+ // URL: /ric/v1/config
+ api.XappGetAllXappConfigHandler = xapp.GetAllXappConfigHandlerFunc(
+ func(params xapp.GetAllXappConfigParams) middleware.Responder {
+ return xapp.NewGetAllXappConfigOK().WithPayload(r.getAppConfig())
})
- api.XappGetXappByNameHandler = xapp.GetXappByNameHandlerFunc(
- func(params xapp.GetXappByNameParams) middleware.Responder {
- if result, err := r.helm.Status(params.XAppName); err == nil {
- return xapp.NewGetXappByNameOK().WithPayload(&result)
+
+ api.RegisterXappHandler = operations.RegisterXappHandlerFunc(
+ func(params operations.RegisterXappParams) middleware.Responder {
+ appmgr.Logger.Info("appname is %s", (*params.RegisterRequest.AppName))
+ appmgr.Logger.Info("endpoint is %s", (*params.RegisterRequest.HTTPEndpoint))
+ appmgr.Logger.Info("rmrendpoint is %s", (*params.RegisterRequest.RmrEndpoint))
+ if result, err := r.RegisterXapp(*params.RegisterRequest); err == nil {
+ go r.rh.PublishSubscription(*result, models.EventTypeDeployed)
+ return operations.NewRegisterXappCreated()
}
- return xapp.NewGetXappByNameNotFound()
+ return operations.NewRegisterXappBadRequest()
})
- api.XappGetXappInstanceByNameHandler = xapp.GetXappInstanceByNameHandlerFunc(
- func(params xapp.GetXappInstanceByNameParams) middleware.Responder {
- if result, err := r.helm.Status(params.XAppName); err == nil {
- for _, v := range result.Instances {
- if *v.Name == params.XAppInstanceName {
- return xapp.NewGetXappInstanceByNameOK().WithPayload(v)
- }
- }
+
+ api.DeregisterXappHandler = operations.DeregisterXappHandlerFunc(
+ func(params operations.DeregisterXappParams) middleware.Responder {
+ appmgr.Logger.Info("appname is %s", (*params.DeregisterRequest.AppName))
+ if result, err := r.DeregisterXapp(*params.DeregisterRequest); err == nil {
+ go r.rh.PublishSubscription(*result, models.EventTypeUndeployed)
+ return operations.NewDeregisterXappNoContent()
}
- return xapp.NewGetXappInstanceByNameNotFound()
+ return operations.NewDeregisterXappBadRequest()
})
- api.XappDeployXappHandler = xapp.DeployXappHandlerFunc(
- func(params xapp.DeployXappParams) middleware.Responder {
- if result, err := r.helm.Install(*params.XappDescriptor); err == nil {
- go r.PublishXappCreateEvent(params)
- return xapp.NewDeployXappCreated().WithPayload(&result)
+
+ return api
+}
+
+func httpGetXAppsconfig(url string) *string {
+ appmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + url)
+ resp, err := http.Get(url)
+ if err != nil {
+ return nil
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusOK {
+ var data XappConfigList
+ appmgr.Logger.Info("http client raw response: %v", resp)
+ if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
+ appmgr.Logger.Error("Json decode failed: " + err.Error())
+ return nil
+ }
+ //data[0] assuming only for one app
+ str := fmt.Sprintf("%v", data[0].Config)
+ appmgr.Logger.Info("HTTP BODY: %v", str)
+
+ resp.Body.Close()
+ return &str
+ } else {
+ appmgr.Logger.Info("httprestful got an unexpected http status code: %v", resp.StatusCode)
+ return nil
+ }
+}
+
+func parseConfig(config *string) *appmgr.RtmData {
+ var p fastjson.Parser
+ var msgs appmgr.RtmData
+
+ v, err := p.Parse(*config)
+ if err != nil {
+ appmgr.Logger.Info("fastjson.Parser for failed: %v", err)
+ return nil
+ }
+
+ if v.Exists("rmr") {
+ for _, m := range v.GetArray("rmr", "txMessages") {
+ msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
+ }
+
+ for _, m := range v.GetArray("rmr", "rxMessages") {
+ msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
+ }
+
+ for _, m := range v.GetArray("rmr", "policies") {
+ if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
+ msgs.Policies = append(msgs.Policies, int64(val))
}
- return xapp.NewUndeployXappInternalServerError()
- })
- api.XappUndeployXappHandler = xapp.UndeployXappHandlerFunc(
- func(params xapp.UndeployXappParams) middleware.Responder {
- if result, err := r.helm.Delete(params.XAppName); err == nil {
- go r.PublishXappDeleteEvent(result)
- return xapp.NewUndeployXappNoContent()
+ }
+ } else {
+ for _, p := range v.GetArray("messaging", "ports") {
+ appmgr.Logger.Info("txMessages=%v, rxMessages=%v", p.GetArray("txMessages"), p.GetArray("rxMessages"))
+ for _, m := range p.GetArray("txMessages") {
+ msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
}
- return xapp.NewUndeployXappInternalServerError()
- })
- // URL: /ric/v1/config
- api.XappGetAllXappConfigHandler = xapp.GetAllXappConfigHandlerFunc(
- func(params xapp.GetAllXappConfigParams) middleware.Responder {
- return xapp.NewGetAllXappConfigOK().WithPayload(r.cm.UploadConfig())
- })
- api.XappCreateXappConfigHandler = xapp.CreateXappConfigHandlerFunc(
- func(params xapp.CreateXappConfigParams) middleware.Responder {
- result, err := r.cm.CreateConfigMap(*params.XAppConfig)
- if err == nil {
- if err.Error() != "Validation failed!" {
- return xapp.NewCreateXappConfigInternalServerError()
- } else {
- return xapp.NewCreateXappConfigUnprocessableEntity()
- }
+ for _, m := range p.GetArray("rxMessages") {
+ msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
}
- r.rh.PublishSubscription(models.Xapp{}, models.EventTypeCreated)
- return xapp.NewCreateXappConfigCreated().WithPayload(result)
- })
- api.XappModifyXappConfigHandler = xapp.ModifyXappConfigHandlerFunc(
- func(params xapp.ModifyXappConfigParams) middleware.Responder {
- result, err := r.cm.UpdateConfigMap(*params.XAppConfig)
- if err == nil {
- if err.Error() != "Validation failed!" {
- return xapp.NewModifyXappConfigInternalServerError()
- } else {
- return xapp.NewModifyXappConfigUnprocessableEntity()
+
+ for _, m := range p.GetArray("policies") {
+ if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
+ msgs.Policies = append(msgs.Policies, int64(val))
}
}
- r.rh.PublishSubscription(models.Xapp{}, models.EventTypeModified)
- return xapp.NewModifyXappConfigOK().WithPayload(result)
- })
- api.XappDeleteXappConfigHandler = xapp.DeleteXappConfigHandlerFunc(
- func(params xapp.DeleteXappConfigParams) middleware.Responder {
- _, err := r.cm.DeleteConfigMap(*params.ConfigMetadata)
- if err == nil {
- return xapp.NewDeleteXappConfigInternalServerError()
- }
- r.rh.PublishSubscription(models.Xapp{}, models.EventTypeDeleted)
- return xapp.NewDeleteXappConfigNoContent()
- })
+ }
+ }
+ return &msgs
+}
- // LCM: /xapps/{xAppName}/instances/{xAppInstanceName}/stop/start
- api.XappStartXappInstanceByNameHandler = xapp.StartXappInstanceByNameHandlerFunc(
- func(params xapp.StartXappInstanceByNameParams) middleware.Responder {
- return xapp.NewStartXappInstanceByNameOK()
- })
- api.XappStopXappInstanceByNameHandler = xapp.StopXappInstanceByNameHandlerFunc(
- func(params xapp.StopXappInstanceByNameParams) middleware.Responder {
- return xapp.NewStopXappInstanceByNameOK()
- })
+func (r *Restful) RegisterXapp(params models.RegisterRequest) (xapp *models.Xapp, err error) {
+ return r.PrepareConfig(params, true)
+}
- return api
+func (r *Restful) DeregisterXapp(params models.DeregisterRequest) (xapp *models.Xapp, err error) {
+ var registeredlist models.RegisterRequest
+ registeredlist.AppName = params.AppName
+ registeredlist.AppInstanceName = params.AppInstanceName
+ if _, found := xappmap[*params.AppName]; found {
+ var x models.Xapp
+ x.Instances = append(x.Instances, xappmap[*params.AppName][*params.AppInstanceName].xappInstance)
+ registeredlist.HTTPEndpoint = &xappmap[*params.AppName][*params.AppInstanceName].httpendpoint
+ delete(xappmap[*params.AppName], *params.AppInstanceName)
+ if len(xappmap[*params.AppName]) == 0 {
+ delete(xappmap, *params.AppName)
+ }
+ r.rh.UpdateAppData(registeredlist, true)
+ return &x, nil
+ } else {
+ appmgr.Logger.Error("XApp Instance %v Not Found", *params.AppName)
+ return nil, errors.New("XApp Instance Not Found")
+ }
}
-func (r *Restful) NotifyClients() {
- r.helm.Initialize()
- if xapps, err := r.helm.StatusAll(); err == nil {
- r.rh.NotifyClients(xapps, models.EventTypeRestarted)
- r.ready = true
+func (r *Restful) PrepareConfig(params models.RegisterRequest, updateflag bool) (xapp *models.Xapp, err error) {
+ maxRetries := 5
+ configPresent := false
+ var xappconfig *string
+ appmgr.Logger.Info("http endpoint is %s", *params.HTTPEndpoint)
+ for i := 1; i <= maxRetries; i++ {
+ if params.Config != "" {
+ appmgr.Logger.Info("Getting config during xapp register: %v", params.Config)
+ xappconfig = ¶ms.Config
+ configPresent = true
+ } else {
+ appmgr.Logger.Info("Getting config from xapp:")
+ xappconfig = httpGetXAppsconfig(fmt.Sprintf("http://%s%s", *params.HTTPEndpoint, params.ConfigPath))
+ }
+
+ if xappconfig != nil {
+ data := parseConfig(xappconfig)
+ if data != nil {
+ appmgr.Logger.Info("iRetry Count = %v", i)
+ var xapp models.Xapp
+
+ xapp.Name = params.AppName
+ xapp.Version = params.AppVersion
+ //xapp.Status = params.Status
+
+ r.rh.UpdateAppData(params, updateflag)
+ return r.FillInstanceData(params, &xapp, *data, configPresent)
+ break
+ } else {
+ appmgr.Logger.Error("No Data from xapp")
+ }
+ if configPresent == true {
+ break
+ }
+ time.Sleep(2 * time.Second)
+ }
}
+ return nil, errors.New("Unable to get configmap after 5 retries")
}
-func (r *Restful) PublishXappCreateEvent(params xapp.DeployXappParams) {
- name := *params.XappDescriptor.XappName
- if params.XappDescriptor.ReleaseName != "" {
- name = params.XappDescriptor.ReleaseName
+func (r *Restful) FillInstanceData(params models.RegisterRequest, xapp *models.Xapp, rtData appmgr.RtmData, configFlag bool) (xapps *models.Xapp, err error) {
+
+ endPointStr := strings.Split(*params.RmrEndpoint, ":")
+ var x models.XappInstance
+ x.Name = params.AppInstanceName
+ //x.Status = strings.ToLower(params.Status)
+ x.Status = "deployed"
+ //x.IP = endPointStr[0]
+ x.IP = fmt.Sprintf("service-ricxapp-%s-rmr.ricxapp", *params.AppInstanceName)
+ x.Port, _ = strconv.ParseInt(endPointStr[1], 10, 64)
+ x.TxMessages = rtData.TxMessages
+ x.RxMessages = rtData.RxMessages
+ x.Policies = rtData.Policies
+ xapp.Instances = append(xapp.Instances, &x)
+ rmrsrvname := fmt.Sprintf("service-ricxapp-%s-rmr.ricxapp:%s", *params.AppInstanceName, x.Port)
+
+ a := &XappData{httpendpoint: *params.HTTPEndpoint,
+ rmrendpoint: *params.RmrEndpoint,
+ rmrserviceep: rmrsrvname,
+ status: "deployed",
+ xappname: *params.AppName,
+ xappversion: params.AppVersion,
+ xappinstname: *params.AppInstanceName,
+ xappconfigpath: params.ConfigPath,
+ xappdynamiconfig: configFlag,
+ xappInstance: &x}
+
+ if _, ok := xappmap[*params.AppName]; ok {
+ xappmap[*params.AppName][*params.AppInstanceName] = a
+ appmgr.Logger.Info("appname already present, %v", xappmap[*params.AppName])
+ } else {
+ xappmap[*params.AppName] = make(map[string]*XappData)
+ xappmap[*params.AppName][*params.AppInstanceName] = a
+ appmgr.Logger.Info("Creating app instance, %v", xappmap[*params.AppName])
}
- for i := 0; i < 5; i++ {
- if result, _ := r.helm.Status(name); result.Instances != nil {
- r.rh.PublishSubscription(result, models.EventTypeDeployed)
- break
+ return xapp, nil
+
+}
+
+func (r *Restful) GetApps() (xapps models.AllDeployedXapps, err error) {
+ xapps = models.AllDeployedXapps{}
+ for _, v := range xappmap {
+ var x models.Xapp
+ for i, j := range v {
+ x.Status = j.status
+ x.Name = &j.xappname
+ x.Version = j.xappversion
+ appmgr.Logger.Info("Xapps details currently in map Appname = %v,rmrendpoint = %v,Status = %v", i, j.rmrendpoint, j.status)
+ x.Instances = append(x.Instances, j.xappInstance)
}
- time.Sleep(time.Duration(5) * time.Second)
+ xapps = append(xapps, &x)
}
+
+ return xapps, nil
+
}
-func (r *Restful) PublishXappDeleteEvent(xapp models.Xapp) {
- r.rh.PublishSubscription(xapp, models.EventTypeUndeployed)
+func (r *Restful) getAppConfig() (configList models.AllXappConfig) {
+ for _, v := range xappmap {
+ namespace := "ricxapp" //Namespace hardcoded, to be removed later
+ for _, j := range v {
+ var activeConfig interface{}
+ if j.xappdynamiconfig {
+ continue
+ }
+ xappconfig := httpGetXAppsconfig(fmt.Sprintf("http://%s%s", j.httpendpoint, j.xappconfigpath))
+
+ if xappconfig == nil {
+ appmgr.Logger.Info("config not found for %s", &j.xappname)
+ continue
+ }
+ json.Unmarshal([]byte(*xappconfig), &activeConfig)
+
+ c := models.XAppConfig{
+ Metadata: &models.ConfigMetadata{XappName: &j.xappname, Namespace: &namespace},
+ Config: activeConfig,
+ }
+ configList = append(configList, &c)
+
+ }
+
+ }
+ return
}