f778265edee516733e3ef6affa0c93bdac93dda8
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/spf13/viper"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "time"
34 )
35
36 type ReadyCB func(interface{})
37 type ShutdownCB func()
38
39 var (
40         // XApp is an application instance
41         Rmr           *RMRClient
42         Sdl           *SDLClient
43         Rnib          *RNIBClient
44         Resource      *Router
45         Metric        *Metrics
46         Logger        *Log
47         Config        Configurator
48         Subscription  *Subscriber
49         Alarm         *AlarmClient
50         Util          *Utils
51         readyCb       ReadyCB
52         readyCbParams interface{}
53         shutdownCb    ShutdownCB
54         shutdownFlag  int32
55         shutdownCnt   int32
56 )
57
58 func IsReady() bool {
59         return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
60 }
61
62 func SetReadyCB(cb ReadyCB, params interface{}) {
63         readyCb = cb
64         readyCbParams = params
65 }
66
67 func XappReadyCb(params interface{}) {
68         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
69         if readyCb != nil {
70                 readyCb(readyCbParams)
71         }
72 }
73
74 func SetShutdownCB(cb ShutdownCB) {
75         shutdownCb = cb
76 }
77
78 func XappShutdownCb() {
79         if err := doDeregister(); err != nil {
80                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
81         } else {
82                 Logger.Info("xApp deregistration successfull!")
83         }
84
85         if shutdownCb != nil {
86                 shutdownCb()
87         }
88 }
89
90 func registerXapp() {
91         for {
92                 time.Sleep(5 * time.Second)
93                 if !IsHealthProbeReady() {
94                         Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
95                         continue
96                 }
97
98                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
99                 if err := doRegister(); err == nil {
100                         Logger.Info("Registration done, proceeding with startup ...")
101                         break
102                 }
103         }
104 }
105
106 func getService(host, service string) string {
107         appnamespace := os.Getenv("APP_NAMESPACE")
108         if appnamespace == "" {
109                 appnamespace = DEFAULT_XAPP_NS
110         }
111
112         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
113         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
114         if len(url) > 1 {
115                 return url[1]
116         }
117         return ""
118 }
119
120 func getPltNamespace(envName, defVal string) string {
121         pltnamespace := os.Getenv("PLT_NAMESPACE")
122         if pltnamespace == "" {
123                 pltnamespace = defVal
124         }
125
126         return pltnamespace
127 }
128
129 func doPost(pltNs, url string, msg []byte, status int) error {
130         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
131         if err != nil || resp == nil || resp.StatusCode != status {
132                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
133                 return err
134         }
135         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
136
137         return err
138 }
139
140 func doRegister() error {
141         host, _ := os.Hostname()
142         xappname := viper.GetString("name")
143         xappversion := viper.GetString("version")
144         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
145
146         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
147         if httpEp == "" || rmrEp == "" {
148                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
149                 return nil
150         }
151
152         requestBody, err := json.Marshal(map[string]string{
153                 "appName":         host,
154                 "httpEndpoint":    httpEp,
155                 "rmrEndpoint":     rmrEp,
156                 "appInstanceName": xappname,
157                 "appVersion":      xappversion,
158                 "configPath":      CONFIG_PATH,
159         })
160
161         if err != nil {
162                 Logger.Error("json.Marshal failed with error: %v", err)
163                 return err
164         }
165
166         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
167 }
168
169 func doDeregister() error {
170         if !IsHealthProbeReady() {
171                 return nil
172         }
173
174         name, _ := os.Hostname()
175         xappname := viper.GetString("name")
176         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
177
178         requestBody, err := json.Marshal(map[string]string{
179                 "appName":         name,
180                 "appInstanceName": xappname,
181         })
182
183         if err != nil {
184                 Logger.Error("json.Marshal failed with error: %v", err)
185                 return err
186         }
187
188         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
189 }
190
191 func InstallSignalHandler() {
192         //
193         // Signal handlers to really exit program.
194         // shutdownCb can hang until application has
195         // made all needed gracefull shutdown actions
196         // hardcoded limit for shutdown is 20 seconds
197         //
198         interrupt := make(chan os.Signal, 1)
199         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
200         //signal handler function
201         go func() {
202                 for range interrupt {
203                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
204                                 // close function
205                                 go func() {
206                                         timeout := int(20)
207                                         sentry := make(chan struct{})
208                                         defer close(sentry)
209
210                                         // close callback
211                                         go func() {
212                                                 XappShutdownCb()
213                                                 sentry <- struct{}{}
214                                         }()
215                                         select {
216                                         case <-time.After(time.Duration(timeout) * time.Second):
217                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
218                                         case <-sentry:
219                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
220                                         }
221                                         os.Exit(0)
222                                 }()
223                         } else {
224                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
225                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
226                                 if newCnt >= 5 {
227                                         Logger.Info("xapp-frame shutdown forced exit")
228                                         os.Exit(0)
229                                 }
230                                 continue
231                         }
232
233                 }
234         }()
235 }
236
237 func init() {
238         // Load xapp configuration
239         Logger = LoadConfig()
240
241         if viper.IsSet("controls.logger.level") {
242                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
243         } else {
244                 Logger.SetLevel(viper.GetInt("logger.level"))
245         }
246
247         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
248                 Logger.SetFormat(0)
249         }
250
251         Resource = NewRouter()
252         Config = Configurator{}
253         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
254         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
255         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
256         Rnib = NewRNIBClient()
257         Util = NewUtils()
258
259         InstallSignalHandler()
260 }
261
262 func RunWithParams(c MessageConsumer, sdlcheck bool) {
263         Rmr = NewRMRClient()
264
265         Rmr.SetReadyCB(XappReadyCb, nil)
266
267         host := fmt.Sprintf(":%d", GetPortData("http").Port)
268         go http.ListenAndServe(host, Resource.router)
269         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
270
271         if sdlcheck {
272                 Sdl.TestConnection()
273         }
274         go registerXapp()
275
276         Rmr.Start(c)
277 }
278
279 func Run(c MessageConsumer) {
280         RunWithParams(c, true)
281 }