Newer go version 1.13 and above support
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net/http"
27         "os"
28         "os/signal"
29         "strings"
30         "sync/atomic"
31         "syscall"
32         "testing"
33         "time"
34
35         "github.com/spf13/viper"
36 )
37
38 // For testing purpose go version 1.13 ->
39 var _ = func() bool {
40         testing.Init()
41         return true
42 }()
43
44 type ReadyCB func(interface{})
45 type ShutdownCB func()
46
47 var (
48         // XApp is an application instance
49         Rmr           *RMRClient
50         Sdl           *SDLClient
51         Rnib          *RNIBClient
52         Resource      *Router
53         Metric        *Metrics
54         Logger        *Log
55         Config        Configurator
56         Subscription  *Subscriber
57         Alarm         *AlarmClient
58         Util          *Utils
59         readyCb       ReadyCB
60         readyCbParams interface{}
61         shutdownCb    ShutdownCB
62         shutdownFlag  int32
63         shutdownCnt   int32
64 )
65
66 func IsReady() bool {
67         return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
68 }
69
70 func SetReadyCB(cb ReadyCB, params interface{}) {
71         readyCb = cb
72         readyCbParams = params
73 }
74
75 func XappReadyCb(params interface{}) {
76         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
77         if readyCb != nil {
78                 readyCb(readyCbParams)
79         }
80 }
81
82 func SetShutdownCB(cb ShutdownCB) {
83         shutdownCb = cb
84 }
85
86 func XappShutdownCb() {
87         if err := doDeregister(); err != nil {
88                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
89         } else {
90                 Logger.Info("xApp deregistration successfull!")
91         }
92
93         if shutdownCb != nil {
94                 shutdownCb()
95         }
96 }
97
98 func registerXapp() {
99         for {
100                 time.Sleep(5 * time.Second)
101                 if !IsHealthProbeReady() {
102                         Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
103                         continue
104                 }
105
106                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
107                 if err := doRegister(); err == nil {
108                         Logger.Info("Registration done, proceeding with startup ...")
109                         break
110                 }
111         }
112 }
113
114 func getService(host, service string) string {
115         appnamespace := os.Getenv("APP_NAMESPACE")
116         if appnamespace == "" {
117                 appnamespace = DEFAULT_XAPP_NS
118         }
119
120         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
121         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
122
123         Logger.Info("getService: %+v %+v", svc, url)
124         if len(url) > 1 {
125                 return url[1]
126         }
127         return ""
128 }
129
130 func getPltNamespace(envName, defVal string) string {
131         pltnamespace := os.Getenv("PLT_NAMESPACE")
132         if pltnamespace == "" {
133                 pltnamespace = defVal
134         }
135
136         return pltnamespace
137 }
138
139 func doPost(pltNs, url string, msg []byte, status int) error {
140         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
141         if err != nil || resp == nil || resp.StatusCode != status {
142                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
143                 return err
144         }
145         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
146
147         return err
148 }
149
150 func doRegister() error {
151         host, _ := os.Hostname()
152         xappname := viper.GetString("name")
153         xappversion := viper.GetString("version")
154         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
155
156         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
157         if httpEp == "" || rmrEp == "" {
158                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
159                 return nil
160         }
161
162         requestBody, err := json.Marshal(map[string]string{
163                 "appName":         host,
164                 "httpEndpoint":    httpEp,
165                 "rmrEndpoint":     rmrEp,
166                 "appInstanceName": xappname,
167                 "appVersion":      xappversion,
168                 "configPath":      CONFIG_PATH,
169         })
170
171         if err != nil {
172                 Logger.Error("json.Marshal failed with error: %v", err)
173                 return err
174         }
175
176         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
177 }
178
179 func doDeregister() error {
180         if !IsHealthProbeReady() {
181                 return nil
182         }
183
184         name, _ := os.Hostname()
185         xappname := viper.GetString("name")
186         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
187
188         requestBody, err := json.Marshal(map[string]string{
189                 "appName":         name,
190                 "appInstanceName": xappname,
191         })
192
193         if err != nil {
194                 Logger.Error("json.Marshal failed with error: %v", err)
195                 return err
196         }
197
198         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
199 }
200
201 func InstallSignalHandler() {
202         //
203         // Signal handlers to really exit program.
204         // shutdownCb can hang until application has
205         // made all needed gracefull shutdown actions
206         // hardcoded limit for shutdown is 20 seconds
207         //
208         interrupt := make(chan os.Signal, 1)
209         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
210         //signal handler function
211         go func() {
212                 for range interrupt {
213                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
214                                 // close function
215                                 go func() {
216                                         timeout := int(20)
217                                         sentry := make(chan struct{})
218                                         defer close(sentry)
219
220                                         // close callback
221                                         go func() {
222                                                 XappShutdownCb()
223                                                 sentry <- struct{}{}
224                                         }()
225                                         select {
226                                         case <-time.After(time.Duration(timeout) * time.Second):
227                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
228                                         case <-sentry:
229                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
230                                         }
231                                         os.Exit(0)
232                                 }()
233                         } else {
234                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
235                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
236                                 if newCnt >= 5 {
237                                         Logger.Info("xapp-frame shutdown forced exit")
238                                         os.Exit(0)
239                                 }
240                                 continue
241                         }
242
243                 }
244         }()
245 }
246
247 func init() {
248         // Load xapp configuration
249         Logger = LoadConfig()
250
251         if viper.IsSet("controls.logger.level") {
252                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
253         } else {
254                 Logger.SetLevel(viper.GetInt("logger.level"))
255         }
256
257         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
258                 Logger.SetFormat(0)
259         }
260
261         Resource = NewRouter()
262         Config = Configurator{}
263         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
264         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
265         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
266         Rnib = NewRNIBClient()
267         Util = NewUtils()
268
269         InstallSignalHandler()
270 }
271
272 func RunWithParams(c MessageConsumer, sdlcheck bool) {
273         Rmr = NewRMRClient()
274
275         Rmr.SetReadyCB(XappReadyCb, nil)
276
277         host := fmt.Sprintf(":%d", GetPortData("http").Port)
278         go http.ListenAndServe(host, Resource.router)
279         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
280
281         if sdlcheck {
282                 Sdl.TestConnection()
283         }
284         go registerXapp()
285
286         Rmr.Start(c)
287 }
288
289 func Run(c MessageConsumer) {
290         RunWithParams(c, true)
291 }