X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fxapp%2Fxapp.go;h=583d2889d6362b4260181e259d2a96ed6d03c4ee;hb=82651fe96c123b61252dfef755ce7b9882e7db97;hp=1fd7ad66f62cdd97136ef853f0f4d1946acf973f;hpb=192518d79ebcffe17bf569bf9037321a6fd26d44;p=ric-plt%2Fxapp-frame.git diff --git a/pkg/xapp/xapp.go b/pkg/xapp/xapp.go index 1fd7ad6..583d288 100755 --- a/pkg/xapp/xapp.go +++ b/pkg/xapp/xapp.go @@ -20,60 +20,304 @@ package xapp import ( + "bytes" + "encoding/json" "fmt" - "github.com/spf13/viper" + "net" "net/http" + "os" + "os/signal" + "strings" + "sync/atomic" + "syscall" + "testing" + "time" + + "github.com/spf13/viper" ) +// For testing purpose go version 1.13 -> + +var _ = func() bool { + testing.Init() + return true +}() + type ReadyCB func(interface{}) +type ShutdownCB func() var ( // XApp is an application instance - Rmr *RMRClient - Sdl *SDLClient - Rnib *RNIBClient - Resource *Router - Metric *Metrics - Logger *Log - Config Configurator + Rmr *RMRClient + Sdl *SDLClient + SdlStorage *SDLStorage + Rnib *RNIBClient + Resource *Router + Metric *Metrics + Logger *Log + Config Configurator + Subscription *Subscriber + Alarm *AlarmClient + Util *Utils + readyCb ReadyCB + readyCbParams interface{} + shutdownCb ShutdownCB + shutdownFlag int32 + shutdownCnt int32 ) func IsReady() bool { - return Rmr.IsReady() && Sdl.IsReady() + return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady() } func SetReadyCB(cb ReadyCB, params interface{}) { - Rmr.SetReadyCB(cb, params) + readyCb = cb + readyCbParams = params +} + +func XappReadyCb(params interface{}) { + Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name")) + if readyCb != nil { + readyCb(readyCbParams) + } +} + +func SetShutdownCB(cb ShutdownCB) { + shutdownCb = cb +} + +func XappShutdownCb() { + if err := doDeregister(); err != nil { + Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err) + } else { + Logger.Info("xApp deregistration successfull!") + } + + if shutdownCb != nil { + shutdownCb() + } +} + +func registerXapp() { + for { + time.Sleep(5 * time.Second) + if !IsHealthProbeReady() { + Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name")) + continue + } + + Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name")) + if err := doRegister(); err == nil { + Logger.Info("Registration done, proceeding with startup ...") + break + } + } +} + +func getService(host, service string) string { + appnamespace := os.Getenv("APP_NAMESPACE") + if appnamespace == "" { + appnamespace = DEFAULT_XAPP_NS + } + + svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host)) + url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//") + + Logger.Info("getService: %+v %+v", svc, url) + if len(url) > 1 { + return url[1] + } + return "" +} + +func getPltNamespace(envName, defVal string) string { + pltnamespace := os.Getenv("PLT_NAMESPACE") + if pltnamespace == "" { + pltnamespace = defVal + } + + return pltnamespace +} + +func doPost(pltNs, url string, msg []byte, status int) error { + resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg)) + if err != nil || resp == nil || resp.StatusCode != status { + Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err) + return err + } + Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status) + + return err +} + +func doRegister() error { + host, _ := os.Hostname() + xappname := viper.GetString("name") + xappversion := viper.GetString("version") + pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS) + + httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR) + if httpEp == "" || rmrEp == "" { + Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp) + return nil + } + + requestBody, err := json.Marshal(map[string]string{ + "appName": host, + "httpEndpoint": httpEp, + "rmrEndpoint": rmrEp, + "appInstanceName": xappname, + "appVersion": xappversion, + "configPath": CONFIG_PATH, + }) + + if err != nil { + Logger.Error("json.Marshal failed with error: %v", err) + return err + } + + return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated) +} + +func doDeregister() error { + if !IsHealthProbeReady() { + return nil + } + + name, _ := os.Hostname() + xappname := viper.GetString("name") + pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS) + + requestBody, err := json.Marshal(map[string]string{ + "appName": name, + "appInstanceName": xappname, + }) + + if err != nil { + Logger.Error("json.Marshal failed with error: %v", err) + return err + } + + return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent) +} + +func InstallSignalHandler() { + // + // Signal handlers to really exit program. + // shutdownCb can hang until application has + // made all needed gracefull shutdown actions + // hardcoded limit for shutdown is 20 seconds + // + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) + //signal handler function + go func() { + for range interrupt { + if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) { + // close function + go func() { + timeout := int(20) + sentry := make(chan struct{}) + defer close(sentry) + + // close callback + go func() { + XappShutdownCb() + sentry <- struct{}{} + }() + select { + case <-time.After(time.Duration(timeout) * time.Second): + Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout) + case <-sentry: + Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout) + } + os.Exit(0) + }() + } else { + newCnt := atomic.AddInt32(&shutdownCnt, 1) + Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5) + if newCnt >= 5 { + Logger.Info("xapp-frame shutdown forced exit") + os.Exit(0) + } + continue + } + + } + }() } func init() { // Load xapp configuration Logger = LoadConfig() - Logger.SetLevel(viper.GetInt("logger.level")) + if viper.IsSet("controls.logger.level") { + Logger.SetLevel(viper.GetInt("controls.logger.level")) + } else { + Logger.SetLevel(viper.GetInt("logger.level")) + } + + if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") { + Logger.SetFormat(0) + } + Resource = NewRouter() Config = Configurator{} Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router) - Rmr = NewRMRClient() + Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout")) + SdlStorage = NewSdlStorage() + Sdl = NewSDLClient(viper.GetString("controls.db.namespace")) + Rnib = GetNewRnibClient(SdlStorage.db) + Util = NewUtils() - if viper.IsSet("db.namespaces") { - namespaces := viper.GetStringSlice("db.namespaces") - if namespaces[0] != "" { - Sdl = NewSDLClient(viper.GetStringSlice("db.namespaces")[0]) - } - if namespaces[1] != "" { - Rnib = NewRNIBClient(viper.GetStringSlice("db.namespaces")[1]) + InstallSignalHandler() +} + +func getIpAdress() string { + var ip net.IP + itf, err := net.InterfaceByName(os.Getenv("INTERFACE_NAME")) + if err != nil { + Logger.Info("Interface name is not able to resolve " + err.Error()) + return ip.String() + } + item, err := itf.Addrs() + if err != nil { + Logger.Info("IP address is not able to resolve " + err.Error()) + return ip.String() + } + for _, addr := range item { + switch v := addr.(type) { + case *net.IPNet: + if !v.IP.IsLinkLocalUnicast() { + ip = v.IP + } } - } else { - Sdl = NewSDLClient(viper.GetString("db.namespace")) } + return ip.String() } -func Run(c MessageConsumer) { - go http.ListenAndServe(viper.GetString("local.host"), Resource.router) +func RunWithParams(c MessageConsumer, sdlcheck bool) { + Rmr = NewRMRClient() - Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host"))) + Rmr.SetReadyCB(XappReadyCb, nil) + ipString := getIpAdress() + var host string + if ipString == "" { + host = fmt.Sprintf(":%d", GetPortData("http").Port) + } else { + host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port) + } + go http.ListenAndServe(host, Resource.router) + Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host)) + + if sdlcheck { + SdlStorage.TestConnection(viper.GetString("controls.db.namespace")) + } + go registerXapp() - Sdl.TestConnection() Rmr.Start(c) } + +func Run(c MessageConsumer) { + RunWithParams(c, true) +}