Prepare for SdlInstance usage removal in xapp-frame
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net/http"
27         "os"
28         "os/signal"
29         "strings"
30         "sync/atomic"
31         "syscall"
32         "testing"
33         "time"
34
35         "github.com/spf13/viper"
36 )
37
38 // For testing purpose go version 1.13 ->
39 var _ = func() bool {
40         testing.Init()
41         return true
42 }()
43
44 type ReadyCB func(interface{})
45 type ShutdownCB func()
46
47 var (
48         // XApp is an application instance
49         Rmr           *RMRClient
50         Sdl           *SDLClient
51         SdlStorage    *SDLStorage
52         Rnib          *RNIBClient
53         Resource      *Router
54         Metric        *Metrics
55         Logger        *Log
56         Config        Configurator
57         Subscription  *Subscriber
58         Alarm         *AlarmClient
59         Util          *Utils
60         readyCb       ReadyCB
61         readyCbParams interface{}
62         shutdownCb    ShutdownCB
63         shutdownFlag  int32
64         shutdownCnt   int32
65 )
66
67 func IsReady() bool {
68         return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
69 }
70
71 func SetReadyCB(cb ReadyCB, params interface{}) {
72         readyCb = cb
73         readyCbParams = params
74 }
75
76 func XappReadyCb(params interface{}) {
77         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
78         if readyCb != nil {
79                 readyCb(readyCbParams)
80         }
81 }
82
83 func SetShutdownCB(cb ShutdownCB) {
84         shutdownCb = cb
85 }
86
87 func XappShutdownCb() {
88         if err := doDeregister(); err != nil {
89                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
90         } else {
91                 Logger.Info("xApp deregistration successfull!")
92         }
93
94         if shutdownCb != nil {
95                 shutdownCb()
96         }
97 }
98
99 func registerXapp() {
100         for {
101                 time.Sleep(5 * time.Second)
102                 if !IsHealthProbeReady() {
103                         Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
104                         continue
105                 }
106
107                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
108                 if err := doRegister(); err == nil {
109                         Logger.Info("Registration done, proceeding with startup ...")
110                         break
111                 }
112         }
113 }
114
115 func getService(host, service string) string {
116         appnamespace := os.Getenv("APP_NAMESPACE")
117         if appnamespace == "" {
118                 appnamespace = DEFAULT_XAPP_NS
119         }
120
121         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
122         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
123
124         Logger.Info("getService: %+v %+v", svc, url)
125         if len(url) > 1 {
126                 return url[1]
127         }
128         return ""
129 }
130
131 func getPltNamespace(envName, defVal string) string {
132         pltnamespace := os.Getenv("PLT_NAMESPACE")
133         if pltnamespace == "" {
134                 pltnamespace = defVal
135         }
136
137         return pltnamespace
138 }
139
140 func doPost(pltNs, url string, msg []byte, status int) error {
141         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
142         if err != nil || resp == nil || resp.StatusCode != status {
143                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
144                 return err
145         }
146         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
147
148         return err
149 }
150
151 func doRegister() error {
152         host, _ := os.Hostname()
153         xappname := viper.GetString("name")
154         xappversion := viper.GetString("version")
155         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
156
157         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
158         if httpEp == "" || rmrEp == "" {
159                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
160                 return nil
161         }
162
163         requestBody, err := json.Marshal(map[string]string{
164                 "appName":         host,
165                 "httpEndpoint":    httpEp,
166                 "rmrEndpoint":     rmrEp,
167                 "appInstanceName": xappname,
168                 "appVersion":      xappversion,
169                 "configPath":      CONFIG_PATH,
170         })
171
172         if err != nil {
173                 Logger.Error("json.Marshal failed with error: %v", err)
174                 return err
175         }
176
177         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
178 }
179
180 func doDeregister() error {
181         if !IsHealthProbeReady() {
182                 return nil
183         }
184
185         name, _ := os.Hostname()
186         xappname := viper.GetString("name")
187         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
188
189         requestBody, err := json.Marshal(map[string]string{
190                 "appName":         name,
191                 "appInstanceName": xappname,
192         })
193
194         if err != nil {
195                 Logger.Error("json.Marshal failed with error: %v", err)
196                 return err
197         }
198
199         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
200 }
201
202 func InstallSignalHandler() {
203         //
204         // Signal handlers to really exit program.
205         // shutdownCb can hang until application has
206         // made all needed gracefull shutdown actions
207         // hardcoded limit for shutdown is 20 seconds
208         //
209         interrupt := make(chan os.Signal, 1)
210         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
211         //signal handler function
212         go func() {
213                 for range interrupt {
214                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
215                                 // close function
216                                 go func() {
217                                         timeout := int(20)
218                                         sentry := make(chan struct{})
219                                         defer close(sentry)
220
221                                         // close callback
222                                         go func() {
223                                                 XappShutdownCb()
224                                                 sentry <- struct{}{}
225                                         }()
226                                         select {
227                                         case <-time.After(time.Duration(timeout) * time.Second):
228                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
229                                         case <-sentry:
230                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
231                                         }
232                                         os.Exit(0)
233                                 }()
234                         } else {
235                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
236                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
237                                 if newCnt >= 5 {
238                                         Logger.Info("xapp-frame shutdown forced exit")
239                                         os.Exit(0)
240                                 }
241                                 continue
242                         }
243
244                 }
245         }()
246 }
247
248 func init() {
249         // Load xapp configuration
250         Logger = LoadConfig()
251
252         if viper.IsSet("controls.logger.level") {
253                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
254         } else {
255                 Logger.SetLevel(viper.GetInt("logger.level"))
256         }
257
258         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
259                 Logger.SetFormat(0)
260         }
261
262         Resource = NewRouter()
263         Config = Configurator{}
264         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
265         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
266         SdlStorage = NewSdlStorage()
267         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
268         Rnib = NewRNIBClient()
269         Util = NewUtils()
270
271         InstallSignalHandler()
272 }
273
274 func RunWithParams(c MessageConsumer, sdlcheck bool) {
275         Rmr = NewRMRClient()
276
277         Rmr.SetReadyCB(XappReadyCb, nil)
278
279         host := fmt.Sprintf(":%d", GetPortData("http").Port)
280         go http.ListenAndServe(host, Resource.router)
281         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
282
283         if sdlcheck {
284                 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
285         }
286         go registerXapp()
287
288         Rmr.Start(c)
289 }
290
291 func Run(c MessageConsumer) {
292         RunWithParams(c, true)
293 }