2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
34 "github.com/spf13/viper"
37 type ReadyCB func(interface{})
38 type ShutdownCB func()
41 // XApp is an application instance
49 Subscription *Subscriber
52 readyCbParams interface{}
59 return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
62 func SetReadyCB(cb ReadyCB, params interface{}) {
64 readyCbParams = params
67 func XappReadyCb(params interface{}) {
69 readyCb(readyCbParams)
73 func SetShutdownCB(cb ShutdownCB) {
77 func XappShutdownCb() {
78 if err := doDeregister(); err != nil {
79 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
81 Logger.Info("xApp deregistration successfull!")
84 if shutdownCb != nil {
91 time.Sleep(5 * time.Second)
92 if !IsHealthProbeReady() {
93 Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
97 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
98 if err := doRegister(); err == nil {
99 Logger.Info("Registration done, proceeding with startup ...")
105 func getService(host, service string) string {
106 appnamespace := os.Getenv("APP_NAMESPACE")
107 if appnamespace == "" {
108 appnamespace = DEFAULT_XAPP_NS
111 svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
112 url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
119 func getPltNamespace(envName, defVal string) string {
120 pltnamespace := os.Getenv("PLT_NAMESPACE")
121 if pltnamespace == "" {
122 pltnamespace = defVal
128 func doPost(pltNs, url string, msg []byte, status int) error {
129 resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
130 if err != nil || resp == nil || resp.StatusCode != status {
131 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
134 Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
139 func doRegister() error {
140 host, _ := os.Hostname()
141 xappname := viper.GetString("name")
142 xappversion := viper.GetString("version")
143 pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
145 httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
146 if httpEp == "" || rmrEp == "" {
147 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
151 requestBody, err := json.Marshal(map[string]string{
153 "httpEndpoint": httpEp,
154 "rmrEndpoint": rmrEp,
155 "appInstanceName": xappname,
156 "appVersion": xappversion,
157 "configPath": CONFIG_PATH,
161 Logger.Error("json.Marshal failed with error: %v", err)
165 return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
168 func doDeregister() error {
169 if !IsHealthProbeReady() {
173 name, _ := os.Hostname()
174 xappname := viper.GetString("name")
175 pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
177 requestBody, err := json.Marshal(map[string]string{
179 "appInstanceName": xappname,
183 Logger.Error("json.Marshal failed with error: %v", err)
187 return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
190 func InstallSignalHandler() {
192 // Signal handlers to really exit program.
193 // shutdownCb can hang until application has
194 // made all needed gracefull shutdown actions
195 // hardcoded limit for shutdown is 20 seconds
197 interrupt := make(chan os.Signal, 1)
198 signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
199 //signal handler function
201 for range interrupt {
202 if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
206 sentry := make(chan struct{})
215 case <-time.After(time.Duration(timeout) * time.Second):
216 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
218 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
223 newCnt := atomic.AddInt32(&shutdownCnt, 1)
224 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
226 Logger.Info("xapp-frame shutdown forced exit")
237 // Load xapp configuration
238 Logger = LoadConfig()
240 if viper.IsSet("controls.logger.level") {
241 Logger.SetLevel(viper.GetInt("controls.logger.level"))
243 Logger.SetLevel(viper.GetInt("logger.level"))
246 if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
250 Resource = NewRouter()
251 Config = Configurator{}
252 Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
253 Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
254 Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
255 Rnib = NewRNIBClient()
258 InstallSignalHandler()
261 func RunWithParams(c MessageConsumer, sdlcheck bool) {
264 Rmr.SetReadyCB(XappReadyCb, nil)
266 host := fmt.Sprintf(":%d", GetPortData("http").Port)
267 go http.ListenAndServe(host, Resource.router)
268 Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
278 func Run(c MessageConsumer) {
279 RunWithParams(c, true)