2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
36 "github.com/spf13/viper"
39 // For testing purpose go version 1.13 ->
46 type ReadyCB func(interface{})
47 type ShutdownCB func()
50 // XApp is an application instance
53 SdlStorage *SDLStorage
59 Subscription *Subscriber
63 readyCbParams interface{}
69 var startTime time.Time
71 func XappUpTime() time.Duration {
72 return time.Since(startTime)
76 startTime = time.Now()
80 return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
83 func SetReadyCB(cb ReadyCB, params interface{}) {
85 readyCbParams = params
88 func XappReadyCb(params interface{}) {
89 Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
91 readyCb(readyCbParams)
95 func SetShutdownCB(cb ShutdownCB) {
99 func XappShutdownCb() {
100 if err := doDeregister(); err != nil {
101 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
103 Logger.Info("xApp deregistration successfull!")
106 if shutdownCb != nil {
111 func registerXapp() {
113 time.Sleep(5 * time.Second)
114 if !IsHealthProbeReady() {
115 Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
119 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
120 if err := doRegister(); err == nil {
121 Logger.Info("Registration done, proceeding with startup ...")
127 func getService(host, service string) string {
128 appnamespace := os.Getenv("APP_NAMESPACE")
129 if appnamespace == "" {
130 appnamespace = DEFAULT_XAPP_NS
133 svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
134 url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
136 Logger.Info("getService: %+v %+v", svc, url)
143 func getPltNamespace(envName, defVal string) string {
144 pltnamespace := os.Getenv("PLT_NAMESPACE")
145 if pltnamespace == "" {
146 pltnamespace = defVal
152 func doPost(pltNs, url string, msg []byte, status int) error {
153 resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
154 if err != nil || resp == nil || resp.StatusCode != status {
155 logdesc := fmt.Sprintf("http.Post to '%s' failed with", fmt.Sprintf(url, pltNs, pltNs))
157 logdesc += fmt.Sprintf(" status: %d != %d", resp.StatusCode, status)
159 logdesc += fmt.Sprintf(" resp: nil")
162 logdesc += fmt.Sprintf(" err: %s", err.Error())
164 logdesc += fmt.Sprintf(" err: nil")
167 return fmt.Errorf(logdesc)
170 Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
175 func doRegister() error {
176 host, _ := os.Hostname()
177 xappname := viper.GetString("name")
178 xappversion := viper.GetString("version")
179 pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
181 //httpEp, rmrEp := getService(xappname, SERVICE_HTTP), getService(xappname, SERVICE_RMR)
182 httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
183 if httpEp == "" || rmrEp == "" {
184 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
188 requestBody, err := json.Marshal(map[string]string{
190 "httpEndpoint": httpEp,
191 "rmrEndpoint": rmrEp,
192 "appInstanceName": xappname,
193 "appVersion": xappversion,
194 "configPath": CONFIG_PATH,
198 Logger.Error("json.Marshal failed with error: %v", err)
202 return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
205 func doDeregister() error {
206 if !IsHealthProbeReady() {
210 name, _ := os.Hostname()
211 xappname := viper.GetString("name")
212 pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
214 requestBody, err := json.Marshal(map[string]string{
216 "appInstanceName": xappname,
220 Logger.Error("json.Marshal failed with error: %v", err)
224 return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
227 func InstallSignalHandler() {
229 // Signal handlers to really exit program.
230 // shutdownCb can hang until application has
231 // made all needed gracefull shutdown actions
232 // hardcoded limit for shutdown is 20 seconds
234 interrupt := make(chan os.Signal, 1)
235 signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
236 //signal handler function
238 for range interrupt {
239 if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
243 sentry := make(chan struct{})
252 case <-time.After(time.Duration(timeout) * time.Second):
253 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
255 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
260 newCnt := atomic.AddInt32(&shutdownCnt, 1)
261 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
263 Logger.Info("xapp-frame shutdown forced exit")
274 // Load xapp configuration
275 Logger = LoadConfig()
277 if viper.IsSet("controls.logger.level") {
278 Logger.SetLevel(viper.GetInt("controls.logger.level"))
280 Logger.SetLevel(viper.GetInt("logger.level"))
283 if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
287 Resource = NewRouter()
288 Config = Configurator{}
289 Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
290 Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
291 SdlStorage = NewSdlStorage()
292 Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
293 Rnib = GetNewRnibClient(SdlStorage.db)
296 InstallSignalHandler()
299 func GetIpAddress() (string, error) {
300 ifname := os.Getenv("INTERFACE_NAME")
301 itf, err := net.InterfaceByName(ifname)
303 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
305 item, err := itf.Addrs()
307 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
309 for _, addr := range item {
310 switch v := addr.(type) {
312 if !v.IP.IsLinkLocalUnicast() {
313 return v.IP.String(), nil
317 return "<nil>", fmt.Errorf("Interface (%s) couldn't find ip", ifname)
320 func RunWithParams(c MessageConsumer, sdlcheck bool) {
323 Rmr.SetReadyCB(XappReadyCb, nil)
324 ipString, err := GetIpAddress()
326 Logger.Info("IP address is not able to resolve " + err.Error())
329 if ipString == "<nil>" {
330 host = fmt.Sprintf(":%d", GetPortData("http").Port)
332 host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port)
334 go http.ListenAndServe(host, Resource.router)
335 Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
338 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
345 func Run(c MessageConsumer) {
346 RunWithParams(c, true)