Some code refactoring, etc.
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/spf13/viper"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "time"
34 )
35
36 type ReadyCB func(interface{})
37 type ShutdownCB func()
38
39 var (
40         // XApp is an application instance
41         Rmr           *RMRClient
42         Sdl           *SDLClient
43         Rnib          *RNIBClient
44         Resource      *Router
45         Metric        *Metrics
46         Logger        *Log
47         Config        Configurator
48         Subscription  *Subscriber
49         Alarm         *AlarmClient
50         readyCb       ReadyCB
51         readyCbParams interface{}
52         shutdownCb    ShutdownCB
53         shutdownFlag  int32
54         shutdownCnt   int32
55 )
56
57 func IsReady() bool {
58         return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
59 }
60
61 func SetReadyCB(cb ReadyCB, params interface{}) {
62         readyCb = cb
63         readyCbParams = params
64 }
65
66 func XappReadyCb(params interface{}) {
67         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
68         if readyCb != nil {
69                 readyCb(readyCbParams)
70         }
71 }
72
73 func SetShutdownCB(cb ShutdownCB) {
74         shutdownCb = cb
75 }
76
77 func XappShutdownCb() {
78         if err := doDeregister(); err != nil {
79                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
80         } else {
81                 Logger.Info("xApp deregistration successfull!")
82         }
83
84         if shutdownCb != nil {
85                 shutdownCb()
86         }
87 }
88
89 func registerXapp() {
90         for {
91                 time.Sleep(5 * time.Second)
92                 if !IsHealthProbeReady() {
93                         Logger.Info("xApp is not ready yet, waiting ...")
94                         continue
95                 }
96
97                 Logger.Info("xApp is now up and ready, continue with registration ...")
98                 if err := doRegister(); err == nil {
99                         Logger.Info("xApp registration done, proceeding with startup ...")
100                         break
101                 }
102         }
103 }
104
105 func getService(host, service string) string {
106         appnamespace := os.Getenv("APP_NAMESPACE")
107         if appnamespace == "" {
108                 appnamespace = DEFAULT_XAPP_NS
109         }
110
111         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
112         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
113         if len(url) > 1 {
114                 return url[1]
115         }
116         return ""
117 }
118
119 func getPltNamespace(envName, defVal string) string {
120         pltnamespace := os.Getenv("PLT_NAMESPACE")
121         if pltnamespace == "" {
122                 pltnamespace = defVal
123         }
124
125         return pltnamespace
126 }
127
128 func doPost(pltNs, url string, msg []byte, status int) error {
129         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
130         if err != nil || resp == nil || resp.StatusCode != status {
131                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
132                 return err
133         }
134         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
135
136         return err
137 }
138
139 func doRegister() error {
140         host, _ := os.Hostname()
141         xappname := viper.GetString("name")
142         xappversion := viper.GetString("version")
143         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
144
145         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
146         if httpEp == "" || rmrEp == "" {
147                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
148                 return nil
149         }
150
151         requestBody, err := json.Marshal(map[string]string{
152                 "appName":         host,
153                 "httpEndpoint":    httpEp,
154                 "rmrEndpoint":     rmrEp,
155                 "appInstanceName": xappname,
156                 "appVersion":      xappversion,
157                 "configPath":      CONFIG_PATH,
158         })
159
160         if err != nil {
161                 Logger.Error("json.Marshal failed with error: %v", err)
162                 return err
163         }
164
165         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
166 }
167
168 func doDeregister() error {
169         if !IsHealthProbeReady() {
170                 return nil
171         }
172
173         name, _ := os.Hostname()
174         xappname := viper.GetString("name")
175         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
176
177         requestBody, err := json.Marshal(map[string]string{
178                 "appName":         name,
179                 "appInstanceName": xappname,
180         })
181
182         if err != nil {
183                 Logger.Error("json.Marshal failed with error: %v", err)
184                 return err
185         }
186
187         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
188 }
189
190 func InstallSignalHandler() {
191         //
192         // Signal handlers to really exit program.
193         // shutdownCb can hang until application has
194         // made all needed gracefull shutdown actions
195         // hardcoded limit for shutdown is 20 seconds
196         //
197         interrupt := make(chan os.Signal, 1)
198         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
199         //signal handler function
200         go func() {
201                 for range interrupt {
202                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
203                                 // close function
204                                 go func() {
205                                         timeout := int(20)
206                                         sentry := make(chan struct{})
207                                         defer close(sentry)
208
209                                         // close callback
210                                         go func() {
211                                                 XappShutdownCb()
212                                                 sentry <- struct{}{}
213                                         }()
214                                         select {
215                                         case <-time.After(time.Duration(timeout) * time.Second):
216                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
217                                         case <-sentry:
218                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
219                                         }
220                                         os.Exit(0)
221                                 }()
222                         } else {
223                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
224                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
225                                 if newCnt >= 5 {
226                                         Logger.Info("xapp-frame shutdown forced exit")
227                                         os.Exit(0)
228                                 }
229                                 continue
230                         }
231
232                 }
233         }()
234 }
235
236 func init() {
237         // Load xapp configuration
238         Logger = LoadConfig()
239
240         if viper.IsSet("controls.logger.level") {
241                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
242         } else {
243                 Logger.SetLevel(viper.GetInt("logger.level"))
244         }
245         Logger.SetFormat(0)
246
247         Resource = NewRouter()
248         Config = Configurator{}
249         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
250         Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
251         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
252         Rnib = NewRNIBClient()
253
254         InstallSignalHandler()
255 }
256
257 func RunWithParams(c MessageConsumer, sdlcheck bool) {
258         Rmr = NewRMRClient()
259
260         Rmr.SetReadyCB(XappReadyCb, nil)
261
262         host := fmt.Sprintf(":%d", GetPortData("http").Port)
263         go http.ListenAndServe(host, Resource.router)
264         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
265
266         if sdlcheck {
267                 Sdl.TestConnection()
268         }
269         go registerXapp()
270
271         Rmr.Start(c)
272 }
273
274 func Run(c MessageConsumer) {
275         RunWithParams(c, true)
276 }