xapp-frame go imrpovements
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "testing"
34         "time"
35
36         "github.com/spf13/viper"
37 )
38
39 // For testing purpose go version 1.13 ->
40
41 var _ = func() bool {
42         testing.Init()
43         return true
44 }()
45
46 type ReadyCB func(interface{})
47 type ShutdownCB func()
48
49 var (
50         // XApp is an application instance
51         Rmr           *RMRClient
52         Sdl           *SDLClient
53         SdlStorage    *SDLStorage
54         Rnib          *RNIBClient
55         Resource      *Router
56         Metric        *Metrics
57         Logger        *Log
58         Config        Configurator
59         Subscription  *Subscriber
60         Alarm         *AlarmClient
61         Util          *Utils
62         readyCb       ReadyCB
63         readyCbParams interface{}
64         shutdownCb    ShutdownCB
65         shutdownFlag  int32
66         shutdownCnt   int32
67 )
68
69 var startTime time.Time
70
71 func XappUpTime() time.Duration {
72         return time.Since(startTime)
73 }
74
75 func init() {
76         startTime = time.Now()
77 }
78
79 func IsReady() bool {
80         return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
81 }
82
83 func SetReadyCB(cb ReadyCB, params interface{}) {
84         readyCb = cb
85         readyCbParams = params
86 }
87
88 func XappReadyCb(params interface{}) {
89         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
90         if readyCb != nil {
91                 readyCb(readyCbParams)
92         }
93 }
94
95 func SetShutdownCB(cb ShutdownCB) {
96         shutdownCb = cb
97 }
98
99 func XappShutdownCb() {
100         if err := doDeregister(); err != nil {
101                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
102         } else {
103                 Logger.Info("xApp deregistration successfull!")
104         }
105
106         if shutdownCb != nil {
107                 shutdownCb()
108         }
109 }
110
111 func registerXapp() {
112         for {
113                 time.Sleep(5 * time.Second)
114                 if !IsHealthProbeReady() {
115                         Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
116                         continue
117                 }
118
119                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
120                 if err := doRegister(); err == nil {
121                         Logger.Info("Registration done, proceeding with startup ...")
122                         break
123                 }
124         }
125 }
126
127 func getService(host, service string) string {
128         appnamespace := os.Getenv("APP_NAMESPACE")
129         if appnamespace == "" {
130                 appnamespace = DEFAULT_XAPP_NS
131         }
132
133         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
134         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
135
136         Logger.Info("getService: %+v %+v", svc, url)
137         if len(url) > 1 {
138                 return url[1]
139         }
140         return ""
141 }
142
143 func getPltNamespace(envName, defVal string) string {
144         pltnamespace := os.Getenv("PLT_NAMESPACE")
145         if pltnamespace == "" {
146                 pltnamespace = defVal
147         }
148
149         return pltnamespace
150 }
151
152 func doPost(pltNs, url string, msg []byte, status int) error {
153         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
154         if err != nil || resp == nil || resp.StatusCode != status {
155                 logdesc := fmt.Sprintf("http.Post to '%s' failed with", fmt.Sprintf(url, pltNs, pltNs))
156                 if resp != nil {
157                         logdesc += fmt.Sprintf(" status: %d != %d", resp.StatusCode, status)
158                 } else {
159                         logdesc += fmt.Sprintf(" resp: nil")
160                 }
161                 if err != nil {
162                         logdesc += fmt.Sprintf(" err: %s", err.Error())
163                 } else {
164                         logdesc += fmt.Sprintf(" err: nil")
165                 }
166                 Logger.Info(logdesc)
167                 return fmt.Errorf(logdesc)
168         }
169
170         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
171
172         return err
173 }
174
175 func doRegister() error {
176         host, _ := os.Hostname()
177         xappname := viper.GetString("name")
178         xappversion := viper.GetString("version")
179         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
180
181         //httpEp, rmrEp := getService(xappname, SERVICE_HTTP), getService(xappname, SERVICE_RMR)
182         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
183         if httpEp == "" || rmrEp == "" {
184                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
185                 return nil
186         }
187
188         requestBody, err := json.Marshal(map[string]string{
189                 "appName":         host,
190                 "httpEndpoint":    httpEp,
191                 "rmrEndpoint":     rmrEp,
192                 "appInstanceName": xappname,
193                 "appVersion":      xappversion,
194                 "configPath":      CONFIG_PATH,
195         })
196
197         if err != nil {
198                 Logger.Error("json.Marshal failed with error: %v", err)
199                 return err
200         }
201
202         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
203 }
204
205 func doDeregister() error {
206         if !IsHealthProbeReady() {
207                 return nil
208         }
209
210         name, _ := os.Hostname()
211         xappname := viper.GetString("name")
212         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
213
214         requestBody, err := json.Marshal(map[string]string{
215                 "appName":         name,
216                 "appInstanceName": xappname,
217         })
218
219         if err != nil {
220                 Logger.Error("json.Marshal failed with error: %v", err)
221                 return err
222         }
223
224         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
225 }
226
227 func InstallSignalHandler() {
228         //
229         // Signal handlers to really exit program.
230         // shutdownCb can hang until application has
231         // made all needed gracefull shutdown actions
232         // hardcoded limit for shutdown is 20 seconds
233         //
234         interrupt := make(chan os.Signal, 1)
235         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
236         //signal handler function
237         go func() {
238                 for range interrupt {
239                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
240                                 // close function
241                                 go func() {
242                                         timeout := int(20)
243                                         sentry := make(chan struct{})
244                                         defer close(sentry)
245
246                                         // close callback
247                                         go func() {
248                                                 XappShutdownCb()
249                                                 sentry <- struct{}{}
250                                         }()
251                                         select {
252                                         case <-time.After(time.Duration(timeout) * time.Second):
253                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
254                                         case <-sentry:
255                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
256                                         }
257                                         os.Exit(0)
258                                 }()
259                         } else {
260                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
261                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
262                                 if newCnt >= 5 {
263                                         Logger.Info("xapp-frame shutdown forced exit")
264                                         os.Exit(0)
265                                 }
266                                 continue
267                         }
268
269                 }
270         }()
271 }
272
273 func init() {
274         // Load xapp configuration
275         Logger = LoadConfig()
276
277         if viper.IsSet("controls.logger.level") {
278                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
279         } else {
280                 Logger.SetLevel(viper.GetInt("logger.level"))
281         }
282
283         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
284                 Logger.SetFormat(0)
285         }
286
287         Resource = NewRouter()
288         Config = Configurator{}
289         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
290         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
291         SdlStorage = NewSdlStorage()
292         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
293         Rnib = GetNewRnibClient(SdlStorage.db)
294         Util = NewUtils()
295
296         InstallSignalHandler()
297 }
298
299 func GetIpAddress() (string, error) {
300         ifname := os.Getenv("INTERFACE_NAME")
301         itf, err := net.InterfaceByName(ifname)
302         if err != nil {
303                 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
304         }
305         item, err := itf.Addrs()
306         if err != nil {
307                 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
308         }
309         for _, addr := range item {
310                 switch v := addr.(type) {
311                 case *net.IPNet:
312                         if !v.IP.IsLinkLocalUnicast() {
313                                 return v.IP.String(), nil
314                         }
315                 }
316         }
317         return "<nil>", fmt.Errorf("Interface (%s) couldn't find ip", ifname)
318 }
319
320 func RunWithParams(c MessageConsumer, sdlcheck bool) {
321         Rmr = NewRMRClient()
322
323         Rmr.SetReadyCB(XappReadyCb, nil)
324         ipString, err := GetIpAddress()
325         if err != nil {
326                 Logger.Info("IP address is not able to resolve " + err.Error())
327         }
328         var host string
329         if ipString == "<nil>" {
330                 host = fmt.Sprintf(":%d", GetPortData("http").Port)
331         } else {
332                 host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port)
333         }
334         go http.ListenAndServe(host, Resource.router)
335         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
336
337         if sdlcheck {
338                 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
339         }
340         go registerXapp()
341
342         Rmr.Start(c)
343 }
344
345 func Run(c MessageConsumer) {
346         RunWithParams(c, true)
347 }