2f5f4643f6b4217c481ce32946e67144a9ee8430
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net/http"
27         "os"
28         "os/signal"
29         "strings"
30         "sync/atomic"
31         "syscall"
32         "time"
33
34         "github.com/spf13/viper"
35 )
36
37 type ReadyCB func(interface{})
38 type ShutdownCB func()
39
40 var (
41         // XApp is an application instance
42         Rmr           *RMRClient
43         Sdl           *SDLClient
44         Rnib          *RNIBClient
45         Resource      *Router
46         Metric        *Metrics
47         Logger        *Log
48         Config        Configurator
49         Subscription  *Subscriber
50         Util          *Utils
51         readyCb       ReadyCB
52         readyCbParams interface{}
53         shutdownCb    ShutdownCB
54         shutdownFlag  int32
55         shutdownCnt   int32
56 )
57
58 func IsReady() bool {
59         return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
60 }
61
62 func SetReadyCB(cb ReadyCB, params interface{}) {
63         readyCb = cb
64         readyCbParams = params
65 }
66
67 func XappReadyCb(params interface{}) {
68         if readyCb != nil {
69                 readyCb(readyCbParams)
70         }
71 }
72
73 func SetShutdownCB(cb ShutdownCB) {
74         shutdownCb = cb
75 }
76
77 func XappShutdownCb() {
78         if err := doDeregister(); err != nil {
79                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
80         } else {
81                 Logger.Info("xApp deregistration successfull!")
82         }
83
84         if shutdownCb != nil {
85                 shutdownCb()
86         }
87 }
88
89 func registerXapp() {
90         for {
91                 time.Sleep(5 * time.Second)
92                 if !IsHealthProbeReady() {
93                         Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
94                         continue
95                 }
96
97                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
98                 if err := doRegister(); err == nil {
99                         Logger.Info("Registration done, proceeding with startup ...")
100                         break
101                 }
102         }
103 }
104
105 func getService(host, service string) string {
106         appnamespace := os.Getenv("APP_NAMESPACE")
107         if appnamespace == "" {
108                 appnamespace = DEFAULT_XAPP_NS
109         }
110
111         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
112         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
113         if len(url) > 1 {
114                 return url[1]
115         }
116         return ""
117 }
118
119 func getPltNamespace(envName, defVal string) string {
120         pltnamespace := os.Getenv("PLT_NAMESPACE")
121         if pltnamespace == "" {
122                 pltnamespace = defVal
123         }
124
125         return pltnamespace
126 }
127
128 func doPost(pltNs, url string, msg []byte, status int) error {
129         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
130         if err != nil || resp == nil || resp.StatusCode != status {
131                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
132                 return err
133         }
134         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
135
136         return err
137 }
138
139 func doRegister() error {
140         host, _ := os.Hostname()
141         xappname := viper.GetString("name")
142         xappversion := viper.GetString("version")
143         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
144
145         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
146         if httpEp == "" || rmrEp == "" {
147                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
148                 return nil
149         }
150
151         requestBody, err := json.Marshal(map[string]string{
152                 "appName":         host,
153                 "httpEndpoint":    httpEp,
154                 "rmrEndpoint":     rmrEp,
155                 "appInstanceName": xappname,
156                 "appVersion":      xappversion,
157                 "configPath":      CONFIG_PATH,
158         })
159
160         if err != nil {
161                 Logger.Error("json.Marshal failed with error: %v", err)
162                 return err
163         }
164
165         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
166 }
167
168 func doDeregister() error {
169         if !IsHealthProbeReady() {
170                 return nil
171         }
172
173         name, _ := os.Hostname()
174         xappname := viper.GetString("name")
175         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
176
177         requestBody, err := json.Marshal(map[string]string{
178                 "appName":         name,
179                 "appInstanceName": xappname,
180         })
181
182         if err != nil {
183                 Logger.Error("json.Marshal failed with error: %v", err)
184                 return err
185         }
186
187         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
188 }
189
190 func InstallSignalHandler() {
191         //
192         // Signal handlers to really exit program.
193         // shutdownCb can hang until application has
194         // made all needed gracefull shutdown actions
195         // hardcoded limit for shutdown is 20 seconds
196         //
197         interrupt := make(chan os.Signal, 1)
198         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
199         //signal handler function
200         go func() {
201                 for range interrupt {
202                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
203                                 // close function
204                                 go func() {
205                                         timeout := int(20)
206                                         sentry := make(chan struct{})
207                                         defer close(sentry)
208
209                                         // close callback
210                                         go func() {
211                                                 XappShutdownCb()
212                                                 sentry <- struct{}{}
213                                         }()
214                                         select {
215                                         case <-time.After(time.Duration(timeout) * time.Second):
216                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
217                                         case <-sentry:
218                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
219                                         }
220                                         os.Exit(0)
221                                 }()
222                         } else {
223                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
224                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
225                                 if newCnt >= 5 {
226                                         Logger.Info("xapp-frame shutdown forced exit")
227                                         os.Exit(0)
228                                 }
229                                 continue
230                         }
231
232                 }
233         }()
234 }
235
236 func init() {
237         // Load xapp configuration
238         Logger = LoadConfig()
239
240         if viper.IsSet("controls.logger.level") {
241                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
242         } else {
243                 Logger.SetLevel(viper.GetInt("logger.level"))
244         }
245
246         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
247                 Logger.SetFormat(0)
248         }
249
250         Resource = NewRouter()
251         Config = Configurator{}
252         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
253         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
254         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
255         Rnib = NewRNIBClient()
256         Util = NewUtils()
257
258         InstallSignalHandler()
259 }
260
261 func RunWithParams(c MessageConsumer, sdlcheck bool) {
262         Rmr = NewRMRClient()
263
264         Rmr.SetReadyCB(XappReadyCb, nil)
265
266         host := fmt.Sprintf(":%d", GetPortData("http").Port)
267         go http.ListenAndServe(host, Resource.router)
268         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
269
270         if sdlcheck {
271                 Sdl.TestConnection()
272         }
273         go registerXapp()
274
275         Rmr.Start(c)
276 }
277
278 func Run(c MessageConsumer) {
279         RunWithParams(c, true)
280 }