Change log level
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
index b63c8aa..583d288 100755 (executable)
 package xapp
 
 import (
+       "bytes"
+       "encoding/json"
        "fmt"
-       "github.com/spf13/viper"
+       "net"
        "net/http"
+       "os"
+       "os/signal"
+       "strings"
+       "sync/atomic"
+       "syscall"
+       "testing"
+       "time"
+
+       "github.com/spf13/viper"
 )
 
+// For testing purpose go version 1.13 ->
+
+var _ = func() bool {
+       testing.Init()
+       return true
+}()
+
 type ReadyCB func(interface{})
+type ShutdownCB func()
 
 var (
        // XApp is an application instance
        Rmr           *RMRClient
        Sdl           *SDLClient
+       SdlStorage    *SDLStorage
        Rnib          *RNIBClient
        Resource      *Router
        Metric        *Metrics
@@ -38,12 +58,16 @@ var (
        Config        Configurator
        Subscription  *Subscriber
        Alarm         *AlarmClient
+       Util          *Utils
        readyCb       ReadyCB
        readyCbParams interface{}
+       shutdownCb    ShutdownCB
+       shutdownFlag  int32
+       shutdownCnt   int32
 )
 
 func IsReady() bool {
-       return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
+       return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
 }
 
 func SetReadyCB(cb ReadyCB, params interface{}) {
@@ -51,44 +75,246 @@ func SetReadyCB(cb ReadyCB, params interface{}) {
        readyCbParams = params
 }
 
-func xappReadyCb(params interface{}) {
+func XappReadyCb(params interface{}) {
+       Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
        if readyCb != nil {
                readyCb(readyCbParams)
        }
 }
 
+func SetShutdownCB(cb ShutdownCB) {
+       shutdownCb = cb
+}
+
+func XappShutdownCb() {
+       if err := doDeregister(); err != nil {
+               Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
+       } else {
+               Logger.Info("xApp deregistration successfull!")
+       }
+
+       if shutdownCb != nil {
+               shutdownCb()
+       }
+}
+
+func registerXapp() {
+       for {
+               time.Sleep(5 * time.Second)
+               if !IsHealthProbeReady() {
+                       Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
+                       continue
+               }
+
+               Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
+               if err := doRegister(); err == nil {
+                       Logger.Info("Registration done, proceeding with startup ...")
+                       break
+               }
+       }
+}
+
+func getService(host, service string) string {
+       appnamespace := os.Getenv("APP_NAMESPACE")
+       if appnamespace == "" {
+               appnamespace = DEFAULT_XAPP_NS
+       }
+
+       svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
+       url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
+
+       Logger.Info("getService: %+v %+v", svc, url)
+       if len(url) > 1 {
+               return url[1]
+       }
+       return ""
+}
+
+func getPltNamespace(envName, defVal string) string {
+       pltnamespace := os.Getenv("PLT_NAMESPACE")
+       if pltnamespace == "" {
+               pltnamespace = defVal
+       }
+
+       return pltnamespace
+}
+
+func doPost(pltNs, url string, msg []byte, status int) error {
+       resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
+       if err != nil || resp == nil || resp.StatusCode != status {
+               Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
+               return err
+       }
+       Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
+
+       return err
+}
+
+func doRegister() error {
+       host, _ := os.Hostname()
+       xappname := viper.GetString("name")
+       xappversion := viper.GetString("version")
+       pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
+
+       httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
+       if httpEp == "" || rmrEp == "" {
+               Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
+               return nil
+       }
+
+       requestBody, err := json.Marshal(map[string]string{
+               "appName":         host,
+               "httpEndpoint":    httpEp,
+               "rmrEndpoint":     rmrEp,
+               "appInstanceName": xappname,
+               "appVersion":      xappversion,
+               "configPath":      CONFIG_PATH,
+       })
+
+       if err != nil {
+               Logger.Error("json.Marshal failed with error: %v", err)
+               return err
+       }
+
+       return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
+}
+
+func doDeregister() error {
+       if !IsHealthProbeReady() {
+               return nil
+       }
+
+       name, _ := os.Hostname()
+       xappname := viper.GetString("name")
+       pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
+
+       requestBody, err := json.Marshal(map[string]string{
+               "appName":         name,
+               "appInstanceName": xappname,
+       })
+
+       if err != nil {
+               Logger.Error("json.Marshal failed with error: %v", err)
+               return err
+       }
+
+       return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
+}
+
+func InstallSignalHandler() {
+       //
+       // Signal handlers to really exit program.
+       // shutdownCb can hang until application has
+       // made all needed gracefull shutdown actions
+       // hardcoded limit for shutdown is 20 seconds
+       //
+       interrupt := make(chan os.Signal, 1)
+       signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
+       //signal handler function
+       go func() {
+               for range interrupt {
+                       if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
+                               // close function
+                               go func() {
+                                       timeout := int(20)
+                                       sentry := make(chan struct{})
+                                       defer close(sentry)
+
+                                       // close callback
+                                       go func() {
+                                               XappShutdownCb()
+                                               sentry <- struct{}{}
+                                       }()
+                                       select {
+                                       case <-time.After(time.Duration(timeout) * time.Second):
+                                               Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
+                                       case <-sentry:
+                                               Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
+                                       }
+                                       os.Exit(0)
+                               }()
+                       } else {
+                               newCnt := atomic.AddInt32(&shutdownCnt, 1)
+                               Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
+                               if newCnt >= 5 {
+                                       Logger.Info("xapp-frame shutdown forced exit")
+                                       os.Exit(0)
+                               }
+                               continue
+                       }
+
+               }
+       }()
+}
+
 func init() {
        // Load xapp configuration
        Logger = LoadConfig()
 
-       Logger.SetLevel(viper.GetInt("logger.level"))
+       if viper.IsSet("controls.logger.level") {
+               Logger.SetLevel(viper.GetInt("controls.logger.level"))
+       } else {
+               Logger.SetLevel(viper.GetInt("logger.level"))
+       }
+
+       if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
+               Logger.SetFormat(0)
+       }
+
        Resource = NewRouter()
        Config = Configurator{}
        Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
-       Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
-       Alarm = NewAlarmClient(viper.GetString("alarm.MOId"), viper.GetString("alarm.APPId"))
+       Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
+       SdlStorage = NewSdlStorage()
+       Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
+       Rnib = GetNewRnibClient(SdlStorage.db)
+       Util = NewUtils()
 
-       if viper.IsSet("db.namespaces") {
-               namespaces := viper.GetStringSlice("db.namespaces")
-               if namespaces[0] != "" {
-                       Sdl = NewSDLClient(viper.GetStringSlice("db.namespaces")[0])
-               }
-               if namespaces[1] != "" {
-                       Rnib = NewRNIBClient(viper.GetStringSlice("db.namespaces")[1])
+       InstallSignalHandler()
+}
+
+func getIpAdress() string {
+       var ip net.IP
+       itf, err := net.InterfaceByName(os.Getenv("INTERFACE_NAME"))
+       if err != nil {
+               Logger.Info("Interface name is not able to resolve " + err.Error())
+               return ip.String()
+       }
+       item, err := itf.Addrs()
+       if err != nil {
+               Logger.Info("IP address is not able to resolve " + err.Error())
+               return ip.String()
+       }
+       for _, addr := range item {
+               switch v := addr.(type) {
+               case *net.IPNet:
+                       if !v.IP.IsLinkLocalUnicast() {
+                               ip = v.IP
+                       }
                }
-       } else {
-               Sdl = NewSDLClient(viper.GetString("db.namespace"))
        }
+       return ip.String()
 }
 
 func RunWithParams(c MessageConsumer, sdlcheck bool) {
        Rmr = NewRMRClient()
-       Rmr.SetReadyCB(xappReadyCb, nil)
-       go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
-       Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
+
+       Rmr.SetReadyCB(XappReadyCb, nil)
+       ipString := getIpAdress()
+       var host string
+       if ipString == "<nil>" {
+               host = fmt.Sprintf(":%d", GetPortData("http").Port)
+       } else {
+               host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port)
+       }
+       go http.ListenAndServe(host, Resource.router)
+       Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
+
        if sdlcheck {
-               Sdl.TestConnection()
+               SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
        }
+       go registerXapp()
+
        Rmr.Start(c)
 }