"bytes"
"encoding/json"
"fmt"
- "github.com/spf13/viper"
- "io/ioutil"
+ "net"
"net/http"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
+ "testing"
"time"
+
+ "github.com/spf13/viper"
)
+// For testing purpose go version 1.13 ->
+
+var _ = func() bool {
+ testing.Init()
+ return true
+}()
+
type ReadyCB func(interface{})
type ShutdownCB func()
var (
// XApp is an application instance
- Rmr *RMRClient
- Sdl *SDLClient
- Rnib *RNIBClient
- Resource *Router
- Metric *Metrics
- Logger *Log
- Config Configurator
- Subscription *Subscriber
- Alarm *AlarmClient
- readyCb ReadyCB
- readyCbParams interface{}
- shutdownCb ShutdownCB
- shutdownFlag int32
- shutdownCnt int32
+ Rmr *RMRClient
+ Sdl *SDLClient
+ SdlStorage *SDLStorage
+ Rnib *RNIBClient
+ Resource *Router
+ Metric *Metrics
+ Logger *Log
+ Config Configurator
+ Subscription *Subscriber
+ Alarm *AlarmClient
+ Util *Utils
+ readyCb ReadyCB
+ readyCbParams interface{}
+ shutdownCb ShutdownCB
+ shutdownFlag int32
+ shutdownCnt int32
+ disableAlarmClient bool
+ isRegistered bool
)
+var startTime time.Time
+
+func XappUpTime() time.Duration {
+ return time.Since(startTime)
+}
+
+func init() {
+ startTime = time.Now()
+}
+
func IsReady() bool {
- return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
+ return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
+}
+
+func IsRegistered() bool {
+ return isRegistered
}
func SetReadyCB(cb ReadyCB, params interface{}) {
}
func XappReadyCb(params interface{}) {
- Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
+ if disableAlarmClient == false {
+ Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
+ }
if readyCb != nil {
readyCb(readyCbParams)
}
}
-func xappShutdownCb() {
- SendDeregistermsg()
- Logger.Info("Wait for xapp to get unregistered")
- time.Sleep(10 * time.Second)
+func SetShutdownCB(cb ShutdownCB) {
+ shutdownCb = cb
}
-func registerxapp() {
- var (
- retries int = 10
- )
- for retries > 0 {
- name, _ := os.Hostname()
- httpservicename := "SERVICE_RICXAPP_" + strings.ToUpper(name) + "_HTTP_PORT"
- httpendpoint := os.Getenv(strings.Replace(httpservicename, "-", "_", -1))
- urlString := strings.Split(httpendpoint, "//")
- // Added this check to make UT pass
- if urlString[0] == "" {
- return
- }
- resp, err := http.Get(fmt.Sprintf("http://%s/ric/v1/health/ready", urlString[1]))
- retries -= 1
+func XappShutdownCb() {
+ if err := doDeregister(); err != nil {
+ Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
+ } else {
+ Logger.Info("xApp deregistration successfull!")
+ }
+
+ if shutdownCb != nil {
+ shutdownCb()
+ }
+
+ isRegistered = false
+}
+
+func registerXapp() {
+ for {
time.Sleep(5 * time.Second)
- if err != nil {
- Logger.Error("Error in health check: %v", err)
+ if !IsHealthProbeReady() {
+ Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
+ continue
}
- if err == nil {
- retries -= 10
- Logger.Info("Health Probe Success with resp.StatusCode is %v", resp.StatusCode)
- if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
- go SendRegistermsg()
- }
- } else {
- Logger.Info("Health Probe failed, retrying...")
+
+ Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
+ if err := doRegister(); err == nil {
+ isRegistered = true
+ Logger.Info("Registration done, proceeding with startup ...")
+ break
}
}
}
-func SendRegistermsg() {
- name, _ := os.Hostname()
- xappname := viper.GetString("name")
- xappversion := viper.GetString("version")
+func getService(host, service string) string {
appnamespace := os.Getenv("APP_NAMESPACE")
if appnamespace == "" {
- appnamespace = "ricxapp"
+ appnamespace = DEFAULT_XAPP_NS
}
- httpservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_HTTP_PORT"
- rmrservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_RMR_PORT"
- httpendpointstr := os.Getenv(strings.Replace(httpservicename, "-", "_", -1))
- rmrendpointstr := os.Getenv(strings.Replace(rmrservicename, "-", "_", -1))
- httpendpoint := strings.Split(httpendpointstr, "//")
- rmrendpoint := strings.Split(rmrendpointstr, "//")
- if httpendpoint[0] == "" || rmrendpoint[0] == "" {
- return
+
+ svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
+ url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
+
+ Logger.Info("getService: %+v %+v", svc, url)
+ if len(url) > 1 {
+ return url[1]
}
+ return ""
+}
+func getPltNamespace(envName, defVal string) string {
pltnamespace := os.Getenv("PLT_NAMESPACE")
if pltnamespace == "" {
- pltnamespace = "ricplt"
+ pltnamespace = defVal
+ }
+
+ return pltnamespace
+}
+
+func doPost(pltNs, url string, msg []byte, status int) error {
+ resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
+ if err != nil || resp == nil || resp.StatusCode != status {
+ logdesc := fmt.Sprintf("http.Post to '%s' failed with", fmt.Sprintf(url, pltNs, pltNs))
+ if resp != nil {
+ logdesc += fmt.Sprintf(" status: %d != %d", resp.StatusCode, status)
+ } else {
+ logdesc += fmt.Sprintf(" resp: nil")
+ }
+ if err != nil {
+ logdesc += fmt.Sprintf(" err: %s", err.Error())
+ } else {
+ logdesc += fmt.Sprintf(" err: nil")
+ }
+ Logger.Info(logdesc)
+ return fmt.Errorf(logdesc)
}
- configpath := "/ric/v1/config"
+ Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
+
+ return err
+}
+
+func doRegister() error {
+ host, _ := os.Hostname()
+ xappname := viper.GetString("name")
+ xappversion := viper.GetString("version")
+ pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
+
+ //httpEp, rmrEp := getService(xappname, SERVICE_HTTP), getService(xappname, SERVICE_RMR)
+ httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
+ if httpEp == "" || rmrEp == "" {
+ Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
+ return nil
+ }
requestBody, err := json.Marshal(map[string]string{
- "appName": name,
- "httpEndpoint": httpendpoint[1],
- "rmrEndpoint": rmrendpoint[1],
+ "appName": host,
+ "httpEndpoint": httpEp,
+ "rmrEndpoint": rmrEp,
"appInstanceName": xappname,
"appVersion": xappversion,
- "configPath": configpath,
+ "configPath": CONFIG_PATH,
})
if err != nil {
- Logger.Info("Error while compiling request to appmgr: %v", err)
- } else {
- url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/register", pltnamespace, pltnamespace)
- resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
- Logger.Info(" Resp is %v", resp)
- if err != nil {
- Logger.Info("Error compiling request to appmgr: %v", err)
- }
- Logger.Info("Registering request sent. Response received is :%v", resp)
-
- if resp != nil {
- body, err := ioutil.ReadAll(resp.Body)
- Logger.Info("Post body is %v", resp.Body)
- if err != nil {
- Logger.Info("rsp: Error compiling request to appmgr: %v", string(body))
- }
- defer resp.Body.Close()
- }
+ Logger.Error("json.Marshal failed with error: %v", err)
+ return err
}
+
+ return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
}
-func SendDeregistermsg() {
+func doDeregister() error {
+ if !IsHealthProbeReady() {
+ return nil
+ }
name, _ := os.Hostname()
xappname := viper.GetString("name")
-
- appnamespace := os.Getenv("APP_NAMESPACE")
- if appnamespace == "" {
- appnamespace = "ricxapp"
- }
- pltnamespace := os.Getenv("PLT_NAMESPACE")
- if pltnamespace == "" {
- pltnamespace = "ricplt"
- }
+ pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
requestBody, err := json.Marshal(map[string]string{
"appName": name,
})
if err != nil {
- Logger.Info("Error while compiling request to appmgr: %v", err)
- } else {
- url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/deregister", pltnamespace, pltnamespace)
- resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
- Logger.Info(" Resp is %v", resp)
- if err != nil {
- Logger.Info("Error compiling request to appmgr: %v", err)
- }
- Logger.Info("Deregistering request sent. Response received is :%v", resp)
-
- if resp != nil {
- body, err := ioutil.ReadAll(resp.Body)
- Logger.Info("Post body is %v", resp.Body)
- if err != nil {
- Logger.Info("rsp: Error compiling request to appmgr: %v", string(body))
- }
- defer resp.Body.Close()
- }
+ Logger.Error("json.Marshal failed with error: %v", err)
+ return err
}
-}
-func SetShutdownCB(cb ShutdownCB) {
- shutdownCb = cb
+ return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
}
func InstallSignalHandler() {
// close callback
go func() {
- if shutdownCb != nil {
- shutdownCb()
- }
+ XappShutdownCb()
sentry <- struct{}{}
}()
select {
} else {
Logger.SetLevel(viper.GetInt("logger.level"))
}
- Logger.SetFormat(0)
+
+ if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
+ Logger.SetFormat(0)
+ }
Resource = NewRouter()
Config = Configurator{}
Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
- Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
+ Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
+ SdlStorage = NewSdlStorage()
Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
- Rnib = NewRNIBClient()
+ Rnib = GetNewRnibClient(SdlStorage.db)
+ Util = NewUtils()
InstallSignalHandler()
}
-func RunWithParams(c MessageConsumer, sdlcheck bool) {
+func GetIpAddress() (string, error) {
+ ifname := os.Getenv("INTERFACE_NAME")
+ itf, err := net.InterfaceByName(ifname)
+ if err != nil {
+ return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
+ }
+ item, err := itf.Addrs()
+ if err != nil {
+ return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
+ }
+ for _, addr := range item {
+ switch v := addr.(type) {
+ case *net.IPNet:
+ if !v.IP.IsLinkLocalUnicast() {
+ return v.IP.String(), nil
+ }
+ }
+ }
+ return "<nil>", fmt.Errorf("Interface (%s) couldn't find ip", ifname)
+}
+
+type RunParams struct {
+ SdlCheck bool
+ DisableAlarmClient bool
+}
+
+func RunWithRunParams(c MessageConsumer, params RunParams) {
+
+ if params.DisableAlarmClient {
+ disableAlarmClient = true
+ } else {
+ disableAlarmClient = false
+ }
+
Rmr = NewRMRClient()
+
Rmr.SetReadyCB(XappReadyCb, nil)
- SetShutdownCB(xappShutdownCb)
- host := fmt.Sprintf(":%d", GetPortData("http").Port)
+ ipString, err := GetIpAddress()
+ if err != nil {
+ Logger.Info("IP address is not able to resolve " + err.Error())
+ }
+ var host string
+ if ipString == "<nil>" {
+ host = fmt.Sprintf(":%d", GetPortData("http").Port)
+ } else {
+ host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port)
+ }
go http.ListenAndServe(host, Resource.router)
Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
- if sdlcheck {
- Sdl.TestConnection()
+
+ if params.SdlCheck {
+ SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
}
- go registerxapp()
+ go registerXapp()
+
Rmr.Start(c)
}
+func RunWithParams(c MessageConsumer, sdlcheck bool) {
+ RunWithRunParams(c, RunParams{SdlCheck: sdlcheck, DisableAlarmClient: false})
+}
+
func Run(c MessageConsumer) {
RunWithParams(c, true)
}