Update APIs
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/spf13/viper"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "testing"
34         "time"
35 )
36
37 // For testing purpose
38 var _ = func() bool {
39         testing.Init()
40         return true
41 }()
42
43 type ReadyCB func(interface{})
44 type ShutdownCB func()
45
46 var (
47         // XApp is an application instance
48         Rmr           *RMRClient
49         Sdl           *SDLClient
50         Rnib          *RNIBClient
51         Resource      *Router
52         Metric        *Metrics
53         Logger        *Log
54         Config        Configurator
55         Subscription  *Subscriber
56         Alarm         *AlarmClient
57         Util          *Utils
58         readyCb       ReadyCB
59         readyCbParams interface{}
60         shutdownCb    ShutdownCB
61         shutdownFlag  int32
62         shutdownCnt   int32
63 )
64
65 func IsReady() bool {
66         return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
67 }
68
69 func SetReadyCB(cb ReadyCB, params interface{}) {
70         readyCb = cb
71         readyCbParams = params
72 }
73
74 func XappReadyCb(params interface{}) {
75         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
76         if readyCb != nil {
77                 readyCb(readyCbParams)
78         }
79 }
80
81 func SetShutdownCB(cb ShutdownCB) {
82         shutdownCb = cb
83 }
84
85 func XappShutdownCb() {
86         if err := doDeregister(); err != nil {
87                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
88         } else {
89                 Logger.Info("xApp deregistration successfull!")
90         }
91
92         if shutdownCb != nil {
93                 shutdownCb()
94         }
95 }
96
97 func registerXapp() {
98         for {
99                 time.Sleep(5 * time.Second)
100                 if !IsHealthProbeReady() {
101                         Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
102                         continue
103                 }
104
105                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
106                 if err := doRegister(); err == nil {
107                         Logger.Info("Registration done, proceeding with startup ...")
108                         break
109                 }
110         }
111 }
112
113 func getService(host, service string) string {
114         appnamespace := os.Getenv("APP_NAMESPACE")
115         if appnamespace == "" {
116                 appnamespace = DEFAULT_XAPP_NS
117         }
118
119         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
120         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
121         if len(url) > 1 {
122                 return url[1]
123         }
124         return ""
125 }
126
127 func getPltNamespace(envName, defVal string) string {
128         pltnamespace := os.Getenv("PLT_NAMESPACE")
129         if pltnamespace == "" {
130                 pltnamespace = defVal
131         }
132
133         return pltnamespace
134 }
135
136 func doPost(pltNs, url string, msg []byte, status int) error {
137         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
138         if err != nil || resp == nil || resp.StatusCode != status {
139                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
140                 return err
141         }
142         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
143
144         return err
145 }
146
147 func doRegister() error {
148         host, _ := os.Hostname()
149         xappname := viper.GetString("name")
150         xappversion := viper.GetString("version")
151         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
152
153         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
154         if httpEp == "" || rmrEp == "" {
155                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
156                 return nil
157         }
158
159         requestBody, err := json.Marshal(map[string]string{
160                 "appName":         host,
161                 "httpEndpoint":    httpEp,
162                 "rmrEndpoint":     rmrEp,
163                 "appInstanceName": xappname,
164                 "appVersion":      xappversion,
165                 "configPath":      CONFIG_PATH,
166         })
167
168         if err != nil {
169                 Logger.Error("json.Marshal failed with error: %v", err)
170                 return err
171         }
172
173         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
174 }
175
176 func doDeregister() error {
177         if !IsHealthProbeReady() {
178                 return nil
179         }
180
181         name, _ := os.Hostname()
182         xappname := viper.GetString("name")
183         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
184
185         requestBody, err := json.Marshal(map[string]string{
186                 "appName":         name,
187                 "appInstanceName": xappname,
188         })
189
190         if err != nil {
191                 Logger.Error("json.Marshal failed with error: %v", err)
192                 return err
193         }
194
195         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
196 }
197
198 func InstallSignalHandler() {
199         //
200         // Signal handlers to really exit program.
201         // shutdownCb can hang until application has
202         // made all needed gracefull shutdown actions
203         // hardcoded limit for shutdown is 20 seconds
204         //
205         interrupt := make(chan os.Signal, 1)
206         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
207         //signal handler function
208         go func() {
209                 for range interrupt {
210                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
211                                 // close function
212                                 go func() {
213                                         timeout := int(20)
214                                         sentry := make(chan struct{})
215                                         defer close(sentry)
216
217                                         // close callback
218                                         go func() {
219                                                 XappShutdownCb()
220                                                 sentry <- struct{}{}
221                                         }()
222                                         select {
223                                         case <-time.After(time.Duration(timeout) * time.Second):
224                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
225                                         case <-sentry:
226                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
227                                         }
228                                         os.Exit(0)
229                                 }()
230                         } else {
231                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
232                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
233                                 if newCnt >= 5 {
234                                         Logger.Info("xapp-frame shutdown forced exit")
235                                         os.Exit(0)
236                                 }
237                                 continue
238                         }
239
240                 }
241         }()
242 }
243
244 func init() {
245         // Load xapp configuration
246         Logger = LoadConfig()
247
248         if viper.IsSet("controls.logger.level") {
249                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
250         } else {
251                 Logger.SetLevel(viper.GetInt("logger.level"))
252         }
253
254         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
255                 Logger.SetFormat(0)
256         }
257
258         Resource = NewRouter()
259         Config = Configurator{}
260         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
261         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
262         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
263         Rnib = NewRNIBClient()
264         Util = NewUtils()
265
266         InstallSignalHandler()
267 }
268
269 func RunWithParams(c MessageConsumer, sdlcheck bool) {
270         Rmr = NewRMRClient()
271
272         Rmr.SetReadyCB(XappReadyCb, nil)
273
274         host := fmt.Sprintf(":%d", GetPortData("http").Port)
275         go http.ListenAndServe(host, Resource.router)
276         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
277
278         if sdlcheck {
279                 Sdl.TestConnection()
280         }
281         go registerXapp()
282
283         Rmr.Start(c)
284 }
285
286 func Run(c MessageConsumer) {
287         RunWithParams(c, true)
288 }