package restful
import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ //"io/ioutil"
"log"
+ "net/http"
"os"
+ "strconv"
+ "strings"
"time"
"gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/xapp"
"github.com/go-openapi/loads"
"github.com/go-openapi/runtime/middleware"
+ "github.com/valyala/fastjson"
"gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
- "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/cm"
- "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/helm"
"gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/resthooks"
)
+type XappData struct {
+ httpendpoint string
+ rmrendpoint string
+ status string
+ xappname string
+ xappinstname string
+ xappversion string
+ xappconfigpath string
+ xappInstance *models.XappInstance
+}
+
+var xappmap = map[string]map[string]*XappData{}
+
func NewRestful() *Restful {
r := &Restful{
- helm: helm.NewHelm(),
- cm: cm.NewCM(),
rh: resthooks.NewResthook(true),
ready: false,
}
appmgr.Logger.Info("Xapp manager started ... serving on %s:%d\n", server.Host, server.Port)
- go r.NotifyClients()
+ go r.RetrieveApps()
if err := server.Serve(); err != nil {
log.Fatal(err.Error())
}
+
+}
+
+func (r *Restful) RetrieveApps() {
+ time.Sleep(5 * time.Second)
+ var xlist models.RegisterRequest
+ applist := r.rh.GetAppsInSDL()
+ if applist != nil {
+ appmgr.Logger.Info("List obtained from GetAppsInSDL is %s", *applist)
+ newstring := strings.Split(*applist, " ")
+ for i, _ := range newstring {
+ appmgr.Logger.Debug("Checking for xapp %s", newstring[i])
+ if newstring[i] != "" {
+ err := json.Unmarshal([]byte(newstring[i]), &xlist)
+ if err != nil {
+ appmgr.Logger.Error("Error while unmarshalling")
+ continue
+ }
+ } else {
+ continue //SDL may have empty item,so need to skip
+ }
+
+ xmodel, _ := r.PrepareConfig(xlist, false)
+ if xmodel == nil {
+ appmgr.Logger.Error("Xapp not found, deleting it from DB")
+ r.rh.UpdateAppData(xlist, true)
+ }
+ }
+ }
+
}
func (r *Restful) SetupHandler() *operations.AppManagerAPI {
return health.NewGetHealthAliveOK()
})
- api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc(
+ api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc(
func(params health.GetHealthReadyParams) middleware.Responder {
return health.NewGetHealthReadyOK()
})
// URL: /ric/v1/xapp
api.XappGetAllXappsHandler = xapp.GetAllXappsHandlerFunc(
func(params xapp.GetAllXappsParams) middleware.Responder {
- if result, err := r.helm.StatusAll(); err == nil {
+ if result, err := r.GetApps(); err == nil {
return xapp.NewGetAllXappsOK().WithPayload(result)
}
return xapp.NewGetAllXappsInternalServerError()
})
- api.XappListAllXappsHandler = xapp.ListAllXappsHandlerFunc(
- func(params xapp.ListAllXappsParams) middleware.Responder {
- if result := r.helm.SearchAll(); err == nil {
- return xapp.NewListAllXappsOK().WithPayload(result)
+ api.RegisterXappHandler = operations.RegisterXappHandlerFunc(
+ func(params operations.RegisterXappParams) middleware.Responder {
+ appmgr.Logger.Info("appname is %s", (*params.RegisterRequest.AppName))
+ appmgr.Logger.Info("endpoint is %s", (*params.RegisterRequest.HTTPEndpoint))
+ appmgr.Logger.Info("rmrendpoint is %s", (*params.RegisterRequest.RmrEndpoint))
+ if result, err := r.RegisterXapp(*params.RegisterRequest); err == nil {
+ go r.rh.PublishSubscription(*result, models.EventTypeDeployed)
+ return operations.NewRegisterXappCreated()
}
- return xapp.NewListAllXappsInternalServerError()
+ return operations.NewRegisterXappBadRequest()
})
- api.XappGetXappByNameHandler = xapp.GetXappByNameHandlerFunc(
- func(params xapp.GetXappByNameParams) middleware.Responder {
- if result, err := r.helm.Status(params.XAppName); err == nil {
- return xapp.NewGetXappByNameOK().WithPayload(&result)
+ api.DeregisterXappHandler = operations.DeregisterXappHandlerFunc(
+ func(params operations.DeregisterXappParams) middleware.Responder {
+ appmgr.Logger.Info("appname is %s", (*params.DeregisterRequest.AppName))
+ if result, err := r.DeregisterXapp(*params.DeregisterRequest); err == nil {
+ go r.rh.PublishSubscription(*result, models.EventTypeUndeployed)
+ return operations.NewDeregisterXappNoContent()
}
- return xapp.NewGetXappByNameNotFound()
+ return operations.NewDeregisterXappBadRequest()
})
- api.XappGetXappInstanceByNameHandler = xapp.GetXappInstanceByNameHandlerFunc(
- func(params xapp.GetXappInstanceByNameParams) middleware.Responder {
- if result, err := r.helm.Status(params.XAppName); err == nil {
- for _, v := range result.Instances {
- if *v.Name == params.XAppInstanceName {
- return xapp.NewGetXappInstanceByNameOK().WithPayload(v)
- }
- }
- }
- return xapp.NewGetXappInstanceByNameNotFound()
- })
+ return api
+}
+
+func httpGetXAppsconfig(url string) (*appmgr.RtmData, error) {
+ appmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + url)
+ resp, err := http.Get(url)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusOK {
+ var data XappConfigList
+ appmgr.Logger.Info("http client raw response: %v", resp)
+ if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
+ appmgr.Logger.Error("Json decode failed: " + err.Error())
+ return nil, err
+ }
+ //data[0] assuming only for one app
+ str := fmt.Sprintf("%v", data[0].Config)
+ appmgr.Logger.Info("HTTP BODY: %v", str)
+
+ resp.Body.Close()
+
+ var p fastjson.Parser
+ var msgs appmgr.RtmData
- api.XappDeployXappHandler = xapp.DeployXappHandlerFunc(
- func(params xapp.DeployXappParams) middleware.Responder {
- if result, err := r.helm.Install(*params.XappDescriptor); err == nil {
- go r.PublishXappCreateEvent(params)
- return xapp.NewDeployXappCreated().WithPayload(&result)
+ v, err := p.Parse(str)
+ if err != nil {
+ appmgr.Logger.Info("fastjson.Parser for failed: %v", err)
+ return nil, err
+ }
+
+ if v.Exists("rmr") {
+ for _, m := range v.GetArray("rmr", "txMessages") {
+ msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
}
- return xapp.NewUndeployXappInternalServerError()
- })
- api.XappUndeployXappHandler = xapp.UndeployXappHandlerFunc(
- func(params xapp.UndeployXappParams) middleware.Responder {
- if result, err := r.helm.Delete(params.XAppName); err == nil {
- go r.PublishXappDeleteEvent(result)
- return xapp.NewUndeployXappNoContent()
+ for _, m := range v.GetArray("rmr", "rxMessages") {
+ msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
}
- return xapp.NewUndeployXappInternalServerError()
- })
- // URL: /ric/v1/config
- api.XappGetAllXappConfigHandler = xapp.GetAllXappConfigHandlerFunc(
- func(params xapp.GetAllXappConfigParams) middleware.Responder {
- return xapp.NewGetAllXappConfigOK().WithPayload(r.cm.UploadConfigAll())
- })
+ for _, m := range v.GetArray("rmr", "policies") {
+ if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
+ msgs.Policies = append(msgs.Policies, int64(val))
+ }
+ }
+ } else {
+ for _, p := range v.GetArray("messaging", "ports") {
+ appmgr.Logger.Info("txMessages=%v, rxMessages=%v", p.GetArray("txMessages"), p.GetArray("rxMessages"))
+ for _, m := range p.GetArray("txMessages") {
+ msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
+ }
- api.XappGetConfigElementHandler = xapp.GetConfigElementHandlerFunc(
- func(params xapp.GetConfigElementParams) middleware.Responder {
- return xapp.NewGetConfigElementOK().WithPayload(r.cm.UploadConfigElement(params.Element))
- })
+ for _, m := range p.GetArray("rxMessages") {
+ msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
+ }
- api.XappModifyXappConfigHandler = xapp.ModifyXappConfigHandlerFunc(
- func(params xapp.ModifyXappConfigParams) middleware.Responder {
- result, err := r.cm.UpdateConfigMap(*params.XAppConfig)
- if err != nil {
- if err.Error() != "Validation failed!" {
- return xapp.NewModifyXappConfigInternalServerError()
- } else {
- return xapp.NewModifyXappConfigUnprocessableEntity()
+ for _, m := range p.GetArray("policies") {
+ if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
+ msgs.Policies = append(msgs.Policies, int64(val))
+ }
}
}
- r.rh.PublishSubscription(models.Xapp{}, models.EventTypeModified)
- return xapp.NewModifyXappConfigOK().WithPayload(result)
- })
+ }
+ return &msgs, nil
+ }
+ appmgr.Logger.Info("httprestful got an unexpected http status code: %v", resp.StatusCode)
+ return nil, nil
+}
- return api
+func (r *Restful) RegisterXapp(params models.RegisterRequest) (xapp *models.Xapp, err error) {
+ return r.PrepareConfig(params, true)
}
-func (r *Restful) NotifyClients() {
- r.helm.Initialize()
- if xapps, err := r.helm.StatusAll(); err == nil {
- r.rh.NotifyClients(xapps, models.EventTypeRestarted)
- r.ready = true
+func (r *Restful) DeregisterXapp(params models.DeregisterRequest) (xapp *models.Xapp, err error) {
+ var registeredlist models.RegisterRequest
+ registeredlist.AppName = params.AppName
+ registeredlist.AppInstanceName = params.AppInstanceName
+ if _, found := xappmap[*params.AppName]; found {
+ var x models.Xapp
+ x.Instances = append(x.Instances, xappmap[*params.AppName][*params.AppInstanceName].xappInstance)
+ registeredlist.HTTPEndpoint = &xappmap[*params.AppName][*params.AppInstanceName].httpendpoint
+ delete(xappmap[*params.AppName], *params.AppInstanceName)
+ if len(xappmap[*params.AppName]) == 0 {
+ delete(xappmap, *params.AppName)
+ }
+ r.rh.UpdateAppData(registeredlist, true)
+ return &x, nil
+ } else {
+ appmgr.Logger.Error("XApp Instance %v Not Found", *params.AppName)
+ return nil, errors.New("XApp Instance Not Found")
}
}
-func (r *Restful) PublishXappCreateEvent(params xapp.DeployXappParams) {
- name := *params.XappDescriptor.XappName
- if params.XappDescriptor.ReleaseName != "" {
- name = params.XappDescriptor.ReleaseName
- }
+func (r *Restful) PrepareConfig(params models.RegisterRequest, updateflag bool) (xapp *models.Xapp, err error) {
+ maxRetries := 5
+ //tmpString := strings.Split(*params.HTTPEndpoint, "//")
+ appmgr.Logger.Info("http endpoint is %s", *params.HTTPEndpoint)
+ for i := 1; i <= maxRetries; i++ {
+ data, err := httpGetXAppsconfig(fmt.Sprintf("http://%s%s", *params.HTTPEndpoint, params.ConfigPath))
+
+ if data != nil && err == nil {
+ appmgr.Logger.Info("iRetry Count = %v", i)
+ var xapp models.Xapp
- for i := 0; i < 5; i++ {
- time.Sleep(time.Duration(5) * time.Second)
- if result, _ := r.helm.Status(name); result.Instances != nil {
- r.rh.PublishSubscription(result, models.EventTypeDeployed)
+ xapp.Name = params.AppName
+ xapp.Version = params.AppVersion
+ //xapp.Status = params.Status
+
+ r.rh.UpdateAppData(params, updateflag)
+ return r.FillInstanceData(params, &xapp, *data)
break
+ } else if err == nil {
+ appmgr.Logger.Error("Unexpected HTTP status code/JSON Parsing error")
+ } else {
+ appmgr.Logger.Error("Couldn't get data due to" + err.Error())
}
+ time.Sleep(2 * time.Second)
}
+
+ return nil, errors.New("Unable to get configmap after 5 retries")
}
-func (r *Restful) PublishXappDeleteEvent(xapp models.Xapp) {
- r.rh.PublishSubscription(xapp, models.EventTypeUndeployed)
+func (r *Restful) FillInstanceData(params models.RegisterRequest, xapp *models.Xapp, rtData appmgr.RtmData) (xapps *models.Xapp, err error) {
+
+ //tmpString := strings.Split(*params.RmrEndpoint, "//")
+ endPointStr := strings.Split(*params.RmrEndpoint, ":")
+ var x models.XappInstance
+ x.Name = params.AppInstanceName
+ //x.Status = strings.ToLower(params.Status)
+ x.Status = "deployed"
+ x.IP = endPointStr[0]
+ x.Port, _ = strconv.ParseInt(endPointStr[1], 10, 64)
+ x.TxMessages = rtData.TxMessages
+ x.RxMessages = rtData.RxMessages
+ x.Policies = rtData.Policies
+ xapp.Instances = append(xapp.Instances, &x)
+
+ a := &XappData{httpendpoint: *params.HTTPEndpoint, rmrendpoint: *params.RmrEndpoint, status: "deployed", xappname: *params.AppName, xappversion: params.AppVersion, xappinstname: *params.AppInstanceName, xappconfigpath: params.ConfigPath, xappInstance: &x}
+
+ if _, ok := xappmap[*params.AppName]; ok {
+ xappmap[*params.AppName][*params.AppInstanceName] = a
+ appmgr.Logger.Info("appname already present, %v", xappmap[*params.AppName])
+ } else {
+ xappmap[*params.AppName] = make(map[string]*XappData)
+ xappmap[*params.AppName][*params.AppInstanceName] = a
+ appmgr.Logger.Info("Creating app instance, %v", xappmap[*params.AppName])
+ }
+
+ return xapp, nil
+
+}
+
+func (r *Restful) GetApps() (xapps models.AllDeployedXapps, err error) {
+ xapps = models.AllDeployedXapps{}
+ for _, v := range xappmap {
+ var x models.Xapp
+ for i, j := range v {
+ x.Status = j.status
+ x.Name = &j.xappname
+ x.Version = j.xappversion
+ appmgr.Logger.Info("Xapps details currently in map Appname = %v,rmrendpoint = %v,Status = %v", i, j.rmrendpoint, j.status)
+ x.Instances = append(x.Instances, j.xappInstance)
+ }
+ xapps = append(xapps, &x)
+ }
+
+ return xapps, nil
+
}
import (
"bytes"
"encoding/json"
+ "fmt"
sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
cmap "github.com/orcaman/concurrent-map"
"github.com/segmentio/ksuid"
"net/http"
+ "strings"
"time"
"gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
)
func NewResthook(restoreData bool) *Resthook {
- return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()))
+ return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),sdl.NewSdlInstance("appdb", sdl.NewDatabase()))
}
-func createResthook(restoreData bool, sdlInst iSdl) *Resthook {
+func createResthook(restoreData bool, sdlInst iSdl, sdlInst2 iSdl) *Resthook {
rh := &Resthook{
client: &http.Client{},
db: sdlInst,
+ db2: sdlInst2,
}
if restoreData {
rh.db.RemoveAll()
rh.subscriptions = cmap.New()
}
+
+func (rh *Resthook) UpdateAppData(params models.RegisterRequest, updateflag bool) {
+ appmgr.Logger.Info("Endpoint to be added in SDL: %s", *params.HTTPEndpoint)
+ if updateflag == false {
+ return
+ }
+
+ value, err := rh.db2.Get([]string{"endpoints"})
+ if err != nil {
+ appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
+ return
+ }
+
+ appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
+ var appsindb []string
+ var data string
+ dbflag := false
+
+ if value["endpoints"] != nil {
+ formstring := fmt.Sprintf("%s", value["endpoints"])
+ newstring := strings.Split(formstring, " ")
+ for i, _ := range newstring {
+ if len(newstring) == 1 && strings.Contains(newstring[i], *params.HTTPEndpoint) {
+ appmgr.Logger.Info("Removing Key %s", *params.HTTPEndpoint)
+ rh.db2.Remove([]string{"endpoints"})
+ dbflag = true
+ break
+ }
+ if strings.Contains(newstring[i], *params.HTTPEndpoint) {
+ appmgr.Logger.Info("Removing entry %s", *params.HTTPEndpoint)
+ dbflag = true
+ continue
+ }
+ appsindb = append(appsindb, newstring[i])
+ data = strings.Join(appsindb, " ")
+ }
+ rh.db2.Set("endpoints", strings.TrimSpace(data))
+ }
+
+ if dbflag == false {
+ xappData, err := json.Marshal(params)
+ if err != nil {
+ appmgr.Logger.Info("json.Marshal failed: %v", err)
+ return
+ }
+ appsindb = append(appsindb, string(xappData))
+ data = strings.Join(appsindb, " ")
+ rh.db2.Set("endpoints", strings.TrimSpace(data))
+ }
+}
+
+func (rh *Resthook) GetAppsInSDL() *string {
+ value, err := rh.db2.Get([]string{"endpoints"})
+ if err != nil {
+ appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error())
+ return nil
+ }
+ appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"])
+ if value["endpoints"] == nil || value["endpoints"] == "" {
+ return nil
+ } else {
+ apps := fmt.Sprintf("%s", value["endpoints"])
+ return &apps
+ }
+}