Backward compatible with old xApp descriptor
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
old mode 100644 (file)
new mode 100755 (executable)
index 1fd7ad6..a9a130e
@@ -23,57 +23,134 @@ import (
        "fmt"
        "github.com/spf13/viper"
        "net/http"
+       "os"
+       "os/signal"
+       "sync/atomic"
+       "syscall"
+       "time"
 )
 
 type ReadyCB func(interface{})
+type ShutdownCB func()
 
 var (
        // XApp is an application instance
-       Rmr      *RMRClient
-       Sdl      *SDLClient
-       Rnib     *RNIBClient
-       Resource *Router
-       Metric   *Metrics
-       Logger   *Log
-       Config   Configurator
+       Rmr           *RMRClient
+       Sdl           *SDLClient
+       Rnib          *RNIBClient
+       Resource      *Router
+       Metric        *Metrics
+       Logger        *Log
+       Config        Configurator
+       Subscription  *Subscriber
+       Alarm         *AlarmClient
+       readyCb       ReadyCB
+       readyCbParams interface{}
+       shutdownCb    ShutdownCB
+       shutdownFlag  int32
+       shutdownCnt   int32
 )
 
 func IsReady() bool {
-       return Rmr.IsReady() && Sdl.IsReady()
+       return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
 }
 
 func SetReadyCB(cb ReadyCB, params interface{}) {
-       Rmr.SetReadyCB(cb, params)
+       readyCb = cb
+       readyCbParams = params
+}
+
+func XappReadyCb(params interface{}) {
+       Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
+       if readyCb != nil {
+               readyCb(readyCbParams)
+       }
+}
+
+func SetShutdownCB(cb ShutdownCB) {
+       shutdownCb = cb
+}
+
+func InstallSignalHandler() {
+       //
+       // Signal handlers to really exit program.
+       // shutdownCb can hang until application has
+       // made all needed gracefull shutdown actions
+       // hardcoded limit for shutdown is 20 seconds
+       //
+       interrupt := make(chan os.Signal, 1)
+       signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
+       //signal handler function
+       go func() {
+               for range interrupt {
+                       if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
+                               // close function
+                               go func() {
+                                       timeout := int(20)
+                                       sentry := make(chan struct{})
+                                       defer close(sentry)
+
+                                       // close callback
+                                       go func() {
+                                               if shutdownCb != nil {
+                                                       shutdownCb()
+                                               }
+                                               sentry <- struct{}{}
+                                       }()
+                                       select {
+                                       case <-time.After(time.Duration(timeout) * time.Second):
+                                               Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
+                                       case <-sentry:
+                                               Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
+                                       }
+                                       os.Exit(0)
+                               }()
+                       } else {
+                               newCnt := atomic.AddInt32(&shutdownCnt, 1)
+                               Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
+                               if newCnt >= 5 {
+                                       Logger.Info("xapp-frame shutdown forced exit")
+                                       os.Exit(0)
+                               }
+                               continue
+                       }
+
+               }
+       }()
 }
 
 func init() {
        // Load xapp configuration
        Logger = LoadConfig()
 
-       Logger.SetLevel(viper.GetInt("logger.level"))
+       if viper.IsSet("controls.logger.level") {
+               Logger.SetLevel(viper.GetInt("controls.logger.level"))
+       } else {
+               Logger.SetLevel(viper.GetInt("logger.level"))
+       }
        Resource = NewRouter()
        Config = Configurator{}
        Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
+       Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
+       Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
+       Rnib = NewRNIBClient()
+
+       InstallSignalHandler()
+}
+
+func RunWithParams(c MessageConsumer, sdlcheck bool) {
        Rmr = NewRMRClient()
+       Rmr.SetReadyCB(XappReadyCb, nil)
 
-       if viper.IsSet("db.namespaces") {
-               namespaces := viper.GetStringSlice("db.namespaces")
-               if namespaces[0] != "" {
-                       Sdl = NewSDLClient(viper.GetStringSlice("db.namespaces")[0])
-               }
-               if namespaces[1] != "" {
-                       Rnib = NewRNIBClient(viper.GetStringSlice("db.namespaces")[1])
-               }
-       } else {
-               Sdl = NewSDLClient(viper.GetString("db.namespace"))
+       host := fmt.Sprintf(":%d", GetPortData("http").Port)
+       go http.ListenAndServe(host, Resource.router)
+       Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
+       if sdlcheck {
+               Sdl.TestConnection()
        }
+       Rmr.Start(c)
 }
 
 func Run(c MessageConsumer) {
-       go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
-
-       Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
-
-       Sdl.TestConnection()
-       Rmr.Start(c)
+       RunWithParams(c, true)
 }