2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 "github.com/spf13/viper"
36 type ReadyCB func(interface{})
37 type ShutdownCB func()
40 // XApp is an application instance
48 Subscription *Subscriber
52 readyCbParams interface{}
59 return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
62 func SetReadyCB(cb ReadyCB, params interface{}) {
64 readyCbParams = params
67 func XappReadyCb(params interface{}) {
68 Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
70 readyCb(readyCbParams)
74 func SetShutdownCB(cb ShutdownCB) {
78 func XappShutdownCb() {
79 if err := doDeregister(); err != nil {
80 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
82 Logger.Info("xApp deregistration successfull!")
85 if shutdownCb != nil {
92 time.Sleep(5 * time.Second)
93 if !IsHealthProbeReady() {
94 Logger.Info("xApp is not ready yet, waiting ...")
98 Logger.Info("xApp is now up and ready, continue with registration ...")
99 if err := doRegister(); err == nil {
100 Logger.Info("xApp registration done, proceeding with startup ...")
106 func getService(host, service string) string {
107 appnamespace := os.Getenv("APP_NAMESPACE")
108 if appnamespace == "" {
109 appnamespace = DEFAULT_XAPP_NS
112 svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
113 url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
120 func getPltNamespace(envName, defVal string) string {
121 pltnamespace := os.Getenv("PLT_NAMESPACE")
122 if pltnamespace == "" {
123 pltnamespace = defVal
129 func doPost(pltNs, url string, msg []byte, status int) error {
130 resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
131 if err != nil || resp == nil || resp.StatusCode != status {
132 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
135 Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
140 func doRegister() error {
141 host, _ := os.Hostname()
142 xappname := viper.GetString("name")
143 xappversion := viper.GetString("version")
144 pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
146 httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
147 if httpEp == "" || rmrEp == "" {
148 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
152 requestBody, err := json.Marshal(map[string]string{
154 "httpEndpoint": httpEp,
155 "rmrEndpoint": rmrEp,
156 "appInstanceName": xappname,
157 "appVersion": xappversion,
158 "configPath": CONFIG_PATH,
162 Logger.Error("json.Marshal failed with error: %v", err)
166 return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
169 func doDeregister() error {
170 if !IsHealthProbeReady() {
174 name, _ := os.Hostname()
175 xappname := viper.GetString("name")
176 pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
178 requestBody, err := json.Marshal(map[string]string{
180 "appInstanceName": xappname,
184 Logger.Error("json.Marshal failed with error: %v", err)
188 return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
191 func InstallSignalHandler() {
193 // Signal handlers to really exit program.
194 // shutdownCb can hang until application has
195 // made all needed gracefull shutdown actions
196 // hardcoded limit for shutdown is 20 seconds
198 interrupt := make(chan os.Signal, 1)
199 signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
200 //signal handler function
202 for range interrupt {
203 if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
207 sentry := make(chan struct{})
216 case <-time.After(time.Duration(timeout) * time.Second):
217 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
219 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
224 newCnt := atomic.AddInt32(&shutdownCnt, 1)
225 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
227 Logger.Info("xapp-frame shutdown forced exit")
238 // Load xapp configuration
239 Logger = LoadConfig()
241 if viper.IsSet("controls.logger.level") {
242 Logger.SetLevel(viper.GetInt("controls.logger.level"))
244 Logger.SetLevel(viper.GetInt("logger.level"))
248 Resource = NewRouter()
249 Config = Configurator{}
250 Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
251 Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
252 Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
253 Rnib = NewRNIBClient()
256 InstallSignalHandler()
259 func RunWithParams(c MessageConsumer, sdlcheck bool) {
262 Rmr.SetReadyCB(XappReadyCb, nil)
264 host := fmt.Sprintf(":%d", GetPortData("http").Port)
265 go http.ListenAndServe(host, Resource.router)
266 Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
276 func Run(c MessageConsumer) {
277 RunWithParams(c, true)