package xapp
import (
+ "bytes"
+ "encoding/json"
"fmt"
"github.com/spf13/viper"
"net/http"
"os"
"os/signal"
+ "strings"
"sync/atomic"
"syscall"
"time"
readyCbParams = params
}
-func xappReadyCb(params interface{}) {
- Alarm = NewAlarmClient(viper.GetString("alarm.MOId"), viper.GetString("alarm.APPId"))
+func XappReadyCb(params interface{}) {
+ Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
if readyCb != nil {
readyCb(readyCbParams)
}
shutdownCb = cb
}
-func init() {
- // Load xapp configuration
- Logger = LoadConfig()
+func XappShutdownCb() {
+ if err := doDeregister(); err != nil {
+ Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
+ } else {
+ Logger.Info("xApp deregistration successfull!")
+ }
- Logger.SetLevel(viper.GetInt("logger.level"))
- Resource = NewRouter()
- Config = Configurator{}
- Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
- Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
+ if shutdownCb != nil {
+ shutdownCb()
+ }
+}
- if viper.IsSet("db.namespaces") {
- namespaces := viper.GetStringSlice("db.namespaces")
- if len(namespaces) > 0 && namespaces[0] != "" {
- Sdl = NewSDLClient(viper.GetStringSlice("db.namespaces")[0])
+func registerXapp() {
+ for {
+ time.Sleep(5 * time.Second)
+ if !IsHealthProbeReady() {
+ Logger.Info("xApp is not ready yet, waiting ...")
+ continue
}
- if len(namespaces) > 1 && namespaces[1] != "" {
- Rnib = NewRNIBClient(viper.GetStringSlice("db.namespaces")[1])
+
+ Logger.Info("xApp is now up and ready, continue with registration ...")
+ if err := doRegister(); err == nil {
+ Logger.Info("xApp registration done, proceeding with startup ...")
+ break
}
- } else {
- Sdl = NewSDLClient(viper.GetString("db.namespace"))
}
+}
+
+func getService(host, service string) string {
+ appnamespace := os.Getenv("APP_NAMESPACE")
+ if appnamespace == "" {
+ appnamespace = DEFAULT_XAPP_NS
+ }
+
+ svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
+ url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
+ if len(url) > 1 {
+ return url[1]
+ }
+ return ""
+}
+
+func getPltNamespace(envName, defVal string) string {
+ pltnamespace := os.Getenv("PLT_NAMESPACE")
+ if pltnamespace == "" {
+ pltnamespace = defVal
+ }
+
+ return pltnamespace
+}
+
+func doPost(pltNs, url string, msg []byte, status int) error {
+ resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
+ if err != nil || resp == nil || resp.StatusCode != status {
+ Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
+ return err
+ }
+ Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
+
+ return err
+}
+
+func doRegister() error {
+ host, _ := os.Hostname()
+ xappname := viper.GetString("name")
+ xappversion := viper.GetString("version")
+ pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
+
+ httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
+ if httpEp == "" || rmrEp == "" {
+ Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
+ return nil
+ }
+
+ requestBody, err := json.Marshal(map[string]string{
+ "appName": host,
+ "httpEndpoint": httpEp,
+ "rmrEndpoint": rmrEp,
+ "appInstanceName": xappname,
+ "appVersion": xappversion,
+ "configPath": CONFIG_PATH,
+ })
+ if err != nil {
+ Logger.Error("json.Marshal failed with error: %v", err)
+ return err
+ }
+
+ return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
+}
+
+func doDeregister() error {
+ if !IsHealthProbeReady() {
+ return nil
+ }
+
+ name, _ := os.Hostname()
+ xappname := viper.GetString("name")
+ pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
+
+ requestBody, err := json.Marshal(map[string]string{
+ "appName": name,
+ "appInstanceName": xappname,
+ })
+
+ if err != nil {
+ Logger.Error("json.Marshal failed with error: %v", err)
+ return err
+ }
+
+ return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
+}
+
+func InstallSignalHandler() {
//
// Signal handlers to really exit program.
// shutdownCb can hang until application has
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
//signal handler function
go func() {
- for _ = range interrupt {
+ for range interrupt {
if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
// close function
go func() {
// close callback
go func() {
- if shutdownCb != nil {
- shutdownCb()
- }
+ XappShutdownCb()
sentry <- struct{}{}
}()
select {
}()
}
+func init() {
+ // Load xapp configuration
+ Logger = LoadConfig()
+
+ if viper.IsSet("controls.logger.level") {
+ Logger.SetLevel(viper.GetInt("controls.logger.level"))
+ } else {
+ Logger.SetLevel(viper.GetInt("logger.level"))
+ }
+ Logger.SetFormat(0)
+
+ Resource = NewRouter()
+ Config = Configurator{}
+ Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
+ Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
+ Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
+ Rnib = NewRNIBClient()
+
+ InstallSignalHandler()
+}
+
func RunWithParams(c MessageConsumer, sdlcheck bool) {
Rmr = NewRMRClient()
- Rmr.SetReadyCB(xappReadyCb, nil)
- go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
- Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
+
+ Rmr.SetReadyCB(XappReadyCb, nil)
+
+ host := fmt.Sprintf(":%d", GetPortData("http").Port)
+ go http.ListenAndServe(host, Resource.router)
+ Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
+
if sdlcheck {
Sdl.TestConnection()
}
+ go registerXapp()
+
Rmr.Start(c)
}