Initial commit for Xapp Orchestration
[ric-plt/appmgr.git] / pkg / restful / restful.go
index 9124ecd..e450547 100755 (executable)
 package restful
 
 import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       //"io/ioutil"
        "log"
+       "net/http"
        "os"
+       "strconv"
+       "strings"
        "time"
 
        "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models"
@@ -31,17 +38,27 @@ import (
        "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/xapp"
        "github.com/go-openapi/loads"
        "github.com/go-openapi/runtime/middleware"
+       "github.com/valyala/fastjson"
 
        "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr"
-       "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/cm"
-       "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/helm"
        "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/resthooks"
 )
 
+type XappData struct {
+       httpendpoint   string
+       rmrendpoint    string
+       status         string
+       xappname       string
+       xappinstname   string
+       xappversion    string
+       xappconfigpath string
+       xappInstance   *models.XappInstance
+}
+
+var xappmap = map[string]map[string]*XappData{}
+
 func NewRestful() *Restful {
        r := &Restful{
-               helm:  helm.NewHelm(),
-               cm:    cm.NewCM(),
                rh:    resthooks.NewResthook(true),
                ready: false,
        }
@@ -57,10 +74,40 @@ func (r *Restful) Run() {
 
        appmgr.Logger.Info("Xapp manager started ... serving on %s:%d\n", server.Host, server.Port)
 
-       go r.NotifyClients()
+       go r.RetrieveApps()
        if err := server.Serve(); err != nil {
                log.Fatal(err.Error())
        }
+
+}
+
+func (r *Restful) RetrieveApps() {
+       time.Sleep(5 * time.Second)
+       var xlist models.RegisterRequest
+       applist := r.rh.GetAppsInSDL()
+       if applist != nil {
+               appmgr.Logger.Info("List obtained from GetAppsInSDL is %s", *applist)
+               newstring := strings.Split(*applist, " ")
+               for i, _ := range newstring {
+                       appmgr.Logger.Debug("Checking for xapp %s", newstring[i])
+                       if newstring[i] != "" {
+                               err := json.Unmarshal([]byte(newstring[i]), &xlist)
+                               if err != nil {
+                                       appmgr.Logger.Error("Error while unmarshalling")
+                                       continue
+                               }
+                       } else {
+                               continue //SDL may have empty item,so need to skip
+                       }
+
+                       xmodel, _ := r.PrepareConfig(xlist, false)
+                       if xmodel == nil {
+                               appmgr.Logger.Error("Xapp not found, deleting it from DB")
+                               r.rh.UpdateAppData(xlist, true)
+                       }
+               }
+       }
+
 }
 
 func (r *Restful) SetupHandler() *operations.AppManagerAPI {
@@ -77,7 +124,7 @@ func (r *Restful) SetupHandler() *operations.AppManagerAPI {
                        return health.NewGetHealthAliveOK()
                })
 
-               api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc(
+       api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc(
                func(params health.GetHealthReadyParams) middleware.Responder {
                        return health.NewGetHealthReadyOK()
                })
@@ -120,109 +167,202 @@ func (r *Restful) SetupHandler() *operations.AppManagerAPI {
        // URL: /ric/v1/xapp
        api.XappGetAllXappsHandler = xapp.GetAllXappsHandlerFunc(
                func(params xapp.GetAllXappsParams) middleware.Responder {
-                       if result, err := r.helm.StatusAll(); err == nil {
+                       if result, err := r.GetApps(); err == nil {
                                return xapp.NewGetAllXappsOK().WithPayload(result)
                        }
                        return xapp.NewGetAllXappsInternalServerError()
                })
 
-       api.XappListAllXappsHandler = xapp.ListAllXappsHandlerFunc(
-               func(params xapp.ListAllXappsParams) middleware.Responder {
-                       if result := r.helm.SearchAll(); err == nil {
-                               return xapp.NewListAllXappsOK().WithPayload(result)
+       api.RegisterXappHandler = operations.RegisterXappHandlerFunc(
+               func(params operations.RegisterXappParams) middleware.Responder {
+                       appmgr.Logger.Info("appname is %s", (*params.RegisterRequest.AppName))
+                       appmgr.Logger.Info("endpoint is %s", (*params.RegisterRequest.HTTPEndpoint))
+                       appmgr.Logger.Info("rmrendpoint is %s", (*params.RegisterRequest.RmrEndpoint))
+                       if result, err := r.RegisterXapp(*params.RegisterRequest); err == nil {
+                               go r.rh.PublishSubscription(*result, models.EventTypeDeployed)
+                               return operations.NewRegisterXappCreated()
                        }
-                       return xapp.NewListAllXappsInternalServerError()
+                       return operations.NewRegisterXappBadRequest()
                })
 
-       api.XappGetXappByNameHandler = xapp.GetXappByNameHandlerFunc(
-               func(params xapp.GetXappByNameParams) middleware.Responder {
-                       if result, err := r.helm.Status(params.XAppName); err == nil {
-                               return xapp.NewGetXappByNameOK().WithPayload(&result)
+       api.DeregisterXappHandler = operations.DeregisterXappHandlerFunc(
+               func(params operations.DeregisterXappParams) middleware.Responder {
+                       appmgr.Logger.Info("appname is %s", (*params.DeregisterRequest.AppName))
+                       if result, err := r.DeregisterXapp(*params.DeregisterRequest); err == nil {
+                               go r.rh.PublishSubscription(*result, models.EventTypeUndeployed)
+                               return operations.NewDeregisterXappNoContent()
                        }
-                       return xapp.NewGetXappByNameNotFound()
+                       return operations.NewDeregisterXappBadRequest()
                })
 
-       api.XappGetXappInstanceByNameHandler = xapp.GetXappInstanceByNameHandlerFunc(
-               func(params xapp.GetXappInstanceByNameParams) middleware.Responder {
-                       if result, err := r.helm.Status(params.XAppName); err == nil {
-                               for _, v := range result.Instances {
-                                       if *v.Name == params.XAppInstanceName {
-                                               return xapp.NewGetXappInstanceByNameOK().WithPayload(v)
-                                       }
-                               }
-                       }
-                       return xapp.NewGetXappInstanceByNameNotFound()
-               })
+       return api
+}
+
+func httpGetXAppsconfig(url string) (*appmgr.RtmData, error) {
+       appmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + url)
+       resp, err := http.Get(url)
+       if err != nil {
+               return nil, err
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode == http.StatusOK {
+               var data XappConfigList
+               appmgr.Logger.Info("http client raw response: %v", resp)
+               if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
+                       appmgr.Logger.Error("Json decode failed: " + err.Error())
+                       return nil, err
+               }
+               //data[0] assuming only for one app
+               str := fmt.Sprintf("%v", data[0].Config)
+               appmgr.Logger.Info("HTTP BODY: %v", str)
+
+               resp.Body.Close()
+
+               var p fastjson.Parser
+               var msgs appmgr.RtmData
 
-       api.XappDeployXappHandler = xapp.DeployXappHandlerFunc(
-               func(params xapp.DeployXappParams) middleware.Responder {
-                       if result, err := r.helm.Install(*params.XappDescriptor); err == nil {
-                               go r.PublishXappCreateEvent(params)
-                               return xapp.NewDeployXappCreated().WithPayload(&result)
+               v, err := p.Parse(str)
+               if err != nil {
+                       appmgr.Logger.Info("fastjson.Parser for failed: %v", err)
+                       return nil, err
+               }
+
+               if v.Exists("rmr") {
+                       for _, m := range v.GetArray("rmr", "txMessages") {
+                               msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
                        }
-                       return xapp.NewUndeployXappInternalServerError()
-               })
 
-       api.XappUndeployXappHandler = xapp.UndeployXappHandlerFunc(
-               func(params xapp.UndeployXappParams) middleware.Responder {
-                       if result, err := r.helm.Delete(params.XAppName); err == nil {
-                               go r.PublishXappDeleteEvent(result)
-                               return xapp.NewUndeployXappNoContent()
+                       for _, m := range v.GetArray("rmr", "rxMessages") {
+                               msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
                        }
-                       return xapp.NewUndeployXappInternalServerError()
-               })
 
-       // URL: /ric/v1/config
-       api.XappGetAllXappConfigHandler = xapp.GetAllXappConfigHandlerFunc(
-               func(params xapp.GetAllXappConfigParams) middleware.Responder {
-                       return xapp.NewGetAllXappConfigOK().WithPayload(r.cm.UploadConfigAll())
-               })
+                       for _, m := range v.GetArray("rmr", "policies") {
+                               if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
+                                       msgs.Policies = append(msgs.Policies, int64(val))
+                               }
+                       }
+               } else {
+                       for _, p := range v.GetArray("messaging", "ports") {
+                               appmgr.Logger.Info("txMessages=%v, rxMessages=%v", p.GetArray("txMessages"), p.GetArray("rxMessages"))
+                               for _, m := range p.GetArray("txMessages") {
+                                       msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`))
+                               }
 
-       api.XappGetConfigElementHandler = xapp.GetConfigElementHandlerFunc(
-               func(params xapp.GetConfigElementParams) middleware.Responder {
-                       return xapp.NewGetConfigElementOK().WithPayload(r.cm.UploadConfigElement(params.Element))
-               })
+                               for _, m := range p.GetArray("rxMessages") {
+                                       msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`))
+                               }
 
-       api.XappModifyXappConfigHandler = xapp.ModifyXappConfigHandlerFunc(
-               func(params xapp.ModifyXappConfigParams) middleware.Responder {
-                       result, err := r.cm.UpdateConfigMap(*params.XAppConfig)
-                       if err != nil {
-                               if err.Error() != "Validation failed!" {
-                                       return xapp.NewModifyXappConfigInternalServerError()
-                               } else {
-                                       return xapp.NewModifyXappConfigUnprocessableEntity()
+                               for _, m := range p.GetArray("policies") {
+                                       if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil {
+                                               msgs.Policies = append(msgs.Policies, int64(val))
+                                       }
                                }
                        }
-                       r.rh.PublishSubscription(models.Xapp{}, models.EventTypeModified)
-                       return xapp.NewModifyXappConfigOK().WithPayload(result)
-               })
+               }
+               return &msgs, nil
+       }
+       appmgr.Logger.Info("httprestful got an unexpected http status code: %v", resp.StatusCode)
+       return nil, nil
+}
 
-       return api
+func (r *Restful) RegisterXapp(params models.RegisterRequest) (xapp *models.Xapp, err error) {
+       return r.PrepareConfig(params, true)
 }
 
-func (r *Restful) NotifyClients() {
-       r.helm.Initialize()
-       if xapps, err := r.helm.StatusAll(); err == nil {
-               r.rh.NotifyClients(xapps, models.EventTypeRestarted)
-               r.ready = true
+func (r *Restful) DeregisterXapp(params models.DeregisterRequest) (xapp *models.Xapp, err error) {
+       var registeredlist models.RegisterRequest
+       registeredlist.AppName = params.AppName
+       registeredlist.AppInstanceName = params.AppInstanceName
+       if _, found := xappmap[*params.AppName]; found {
+               var x models.Xapp
+               x.Instances = append(x.Instances, xappmap[*params.AppName][*params.AppInstanceName].xappInstance)
+               registeredlist.HTTPEndpoint = &xappmap[*params.AppName][*params.AppInstanceName].httpendpoint
+               delete(xappmap[*params.AppName], *params.AppInstanceName)
+               if len(xappmap[*params.AppName]) == 0 {
+                       delete(xappmap, *params.AppName)
+               }
+               r.rh.UpdateAppData(registeredlist, true)
+               return &x, nil
+       } else {
+               appmgr.Logger.Error("XApp Instance %v Not Found", *params.AppName)
+               return nil, errors.New("XApp Instance Not Found")
        }
 }
 
-func (r *Restful) PublishXappCreateEvent(params xapp.DeployXappParams) {
-       name := *params.XappDescriptor.XappName
-       if params.XappDescriptor.ReleaseName != "" {
-               name = params.XappDescriptor.ReleaseName
-       }
+func (r *Restful) PrepareConfig(params models.RegisterRequest, updateflag bool) (xapp *models.Xapp, err error) {
+       maxRetries := 5
+       //tmpString := strings.Split(*params.HTTPEndpoint, "//")
+       appmgr.Logger.Info("http endpoint is %s", *params.HTTPEndpoint)
+       for i := 1; i <= maxRetries; i++ {
+               data, err := httpGetXAppsconfig(fmt.Sprintf("http://%s%s", *params.HTTPEndpoint, params.ConfigPath))
+
+               if data != nil && err == nil {
+                       appmgr.Logger.Info("iRetry Count = %v", i)
+                       var xapp models.Xapp
 
-       for i := 0; i < 5; i++ {
-               time.Sleep(time.Duration(5) * time.Second)
-               if result, _ := r.helm.Status(name); result.Instances != nil {
-                       r.rh.PublishSubscription(result, models.EventTypeDeployed)
+                       xapp.Name = params.AppName
+                       xapp.Version = params.AppVersion
+                       //xapp.Status = params.Status
+
+                       r.rh.UpdateAppData(params, updateflag)
+                       return r.FillInstanceData(params, &xapp, *data)
                        break
+               } else if err == nil {
+                       appmgr.Logger.Error("Unexpected HTTP status code/JSON Parsing error")
+               } else {
+                       appmgr.Logger.Error("Couldn't get data due to" + err.Error())
                }
+               time.Sleep(2 * time.Second)
        }
+
+       return nil, errors.New("Unable to get configmap after 5 retries")
 }
 
-func (r *Restful) PublishXappDeleteEvent(xapp models.Xapp) {
-       r.rh.PublishSubscription(xapp, models.EventTypeUndeployed)
+func (r *Restful) FillInstanceData(params models.RegisterRequest, xapp *models.Xapp, rtData appmgr.RtmData) (xapps *models.Xapp, err error) {
+
+       //tmpString := strings.Split(*params.RmrEndpoint, "//")
+       endPointStr := strings.Split(*params.RmrEndpoint, ":")
+       var x models.XappInstance
+       x.Name = params.AppInstanceName
+       //x.Status = strings.ToLower(params.Status)
+       x.Status = "deployed"
+       x.IP = endPointStr[0]
+       x.Port, _ = strconv.ParseInt(endPointStr[1], 10, 64)
+       x.TxMessages = rtData.TxMessages
+       x.RxMessages = rtData.RxMessages
+       x.Policies = rtData.Policies
+       xapp.Instances = append(xapp.Instances, &x)
+
+       a := &XappData{httpendpoint: *params.HTTPEndpoint, rmrendpoint: *params.RmrEndpoint, status: "deployed", xappname: *params.AppName, xappversion: params.AppVersion, xappinstname: *params.AppInstanceName, xappconfigpath: params.ConfigPath, xappInstance: &x}
+
+       if _, ok := xappmap[*params.AppName]; ok {
+               xappmap[*params.AppName][*params.AppInstanceName] = a
+               appmgr.Logger.Info("appname already present, %v", xappmap[*params.AppName])
+       } else {
+               xappmap[*params.AppName] = make(map[string]*XappData)
+               xappmap[*params.AppName][*params.AppInstanceName] = a
+               appmgr.Logger.Info("Creating app instance, %v", xappmap[*params.AppName])
+       }
+
+       return xapp, nil
+
+}
+
+func (r *Restful) GetApps() (xapps models.AllDeployedXapps, err error) {
+       xapps = models.AllDeployedXapps{}
+       for _, v := range xappmap {
+               var x models.Xapp
+               for i, j := range v {
+                       x.Status = j.status
+                       x.Name = &j.xappname
+                       x.Version = j.xappversion
+                       appmgr.Logger.Info("Xapps details currently in map Appname = %v,rmrendpoint = %v,Status = %v", i, j.rmrendpoint, j.status)
+                       x.Instances = append(x.Instances, j.xappInstance)
+               }
+               xapps = append(xapps, &x)
+       }
+
+       return xapps, nil
+
 }