Further UT improvements
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net/http"
27         "os"
28         "os/signal"
29         "strings"
30         "sync/atomic"
31         "syscall"
32         "time"
33
34         "github.com/spf13/viper"
35 )
36
37 type ReadyCB func(interface{})
38 type ShutdownCB func()
39
40 var (
41         // XApp is an application instance
42         Rmr           *RMRClient
43         Sdl           *SDLClient
44         Rnib          *RNIBClient
45         Resource      *Router
46         Metric        *Metrics
47         Logger        *Log
48         Config        Configurator
49         Subscription  *Subscriber
50         Alarm         *AlarmClient
51         Util          *Utils
52         readyCb       ReadyCB
53         readyCbParams interface{}
54         shutdownCb    ShutdownCB
55         shutdownFlag  int32
56         shutdownCnt   int32
57 )
58
59 func IsReady() bool {
60         return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
61 }
62
63 func SetReadyCB(cb ReadyCB, params interface{}) {
64         readyCb = cb
65         readyCbParams = params
66 }
67
68 func XappReadyCb(params interface{}) {
69         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
70         if readyCb != nil {
71                 readyCb(readyCbParams)
72         }
73 }
74
75 func SetShutdownCB(cb ShutdownCB) {
76         shutdownCb = cb
77 }
78
79 func XappShutdownCb() {
80         if err := doDeregister(); err != nil {
81                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
82         } else {
83                 Logger.Info("xApp deregistration successfull!")
84         }
85
86         if shutdownCb != nil {
87                 shutdownCb()
88         }
89 }
90
91 func registerXapp() {
92         for {
93                 time.Sleep(5 * time.Second)
94                 if !IsHealthProbeReady() {
95                         Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
96                         continue
97                 }
98
99                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
100                 if err := doRegister(); err == nil {
101                         Logger.Info("Registration done, proceeding with startup ...")
102                         break
103                 }
104         }
105 }
106
107 func getService(host, service string) string {
108         appnamespace := os.Getenv("APP_NAMESPACE")
109         if appnamespace == "" {
110                 appnamespace = DEFAULT_XAPP_NS
111         }
112
113         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
114         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
115
116         Logger.Info("getService: %+v %+v", svc, url)
117         if len(url) > 1 {
118                 return url[1]
119         }
120         return ""
121 }
122
123 func getPltNamespace(envName, defVal string) string {
124         pltnamespace := os.Getenv("PLT_NAMESPACE")
125         if pltnamespace == "" {
126                 pltnamespace = defVal
127         }
128
129         return pltnamespace
130 }
131
132 func doPost(pltNs, url string, msg []byte, status int) error {
133         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
134         if err != nil || resp == nil || resp.StatusCode != status {
135                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
136                 return err
137         }
138         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
139
140         return err
141 }
142
143 func doRegister() error {
144         host, _ := os.Hostname()
145         xappname := viper.GetString("name")
146         xappversion := viper.GetString("version")
147         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
148
149         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
150         if httpEp == "" || rmrEp == "" {
151                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
152                 return nil
153         }
154
155         requestBody, err := json.Marshal(map[string]string{
156                 "appName":         host,
157                 "httpEndpoint":    httpEp,
158                 "rmrEndpoint":     rmrEp,
159                 "appInstanceName": xappname,
160                 "appVersion":      xappversion,
161                 "configPath":      CONFIG_PATH,
162         })
163
164         if err != nil {
165                 Logger.Error("json.Marshal failed with error: %v", err)
166                 return err
167         }
168
169         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
170 }
171
172 func doDeregister() error {
173         if !IsHealthProbeReady() {
174                 return nil
175         }
176
177         name, _ := os.Hostname()
178         xappname := viper.GetString("name")
179         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
180
181         requestBody, err := json.Marshal(map[string]string{
182                 "appName":         name,
183                 "appInstanceName": xappname,
184         })
185
186         if err != nil {
187                 Logger.Error("json.Marshal failed with error: %v", err)
188                 return err
189         }
190
191         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
192 }
193
194 func InstallSignalHandler() {
195         //
196         // Signal handlers to really exit program.
197         // shutdownCb can hang until application has
198         // made all needed gracefull shutdown actions
199         // hardcoded limit for shutdown is 20 seconds
200         //
201         interrupt := make(chan os.Signal, 1)
202         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
203         //signal handler function
204         go func() {
205                 for range interrupt {
206                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
207                                 // close function
208                                 go func() {
209                                         timeout := int(20)
210                                         sentry := make(chan struct{})
211                                         defer close(sentry)
212
213                                         // close callback
214                                         go func() {
215                                                 XappShutdownCb()
216                                                 sentry <- struct{}{}
217                                         }()
218                                         select {
219                                         case <-time.After(time.Duration(timeout) * time.Second):
220                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
221                                         case <-sentry:
222                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
223                                         }
224                                         os.Exit(0)
225                                 }()
226                         } else {
227                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
228                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
229                                 if newCnt >= 5 {
230                                         Logger.Info("xapp-frame shutdown forced exit")
231                                         os.Exit(0)
232                                 }
233                                 continue
234                         }
235
236                 }
237         }()
238 }
239
240 func init() {
241         // Load xapp configuration
242         Logger = LoadConfig()
243
244         if viper.IsSet("controls.logger.level") {
245                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
246         } else {
247                 Logger.SetLevel(viper.GetInt("logger.level"))
248         }
249
250         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
251                 Logger.SetFormat(0)
252         }
253
254         Resource = NewRouter()
255         Config = Configurator{}
256         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
257         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
258         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
259         Rnib = NewRNIBClient()
260         Util = NewUtils()
261
262         InstallSignalHandler()
263 }
264
265 func RunWithParams(c MessageConsumer, sdlcheck bool) {
266         Rmr = NewRMRClient()
267
268         Rmr.SetReadyCB(XappReadyCb, nil)
269
270         host := fmt.Sprintf(":%d", GetPortData("http").Port)
271         go http.ListenAndServe(host, Resource.router)
272         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
273
274         if sdlcheck {
275                 Sdl.TestConnection()
276         }
277         go registerXapp()
278
279         Rmr.Start(c)
280 }
281
282 func Run(c MessageConsumer) {
283         RunWithParams(c, true)
284 }