INTERNAL: Symptom improvements
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "testing"
34         "time"
35
36         "github.com/spf13/viper"
37 )
38
39 // For testing purpose go version 1.13 ->
40
41 var _ = func() bool {
42         testing.Init()
43         return true
44 }()
45
46 type ReadyCB func(interface{})
47 type ShutdownCB func()
48
49 var (
50         // XApp is an application instance
51         Rmr           *RMRClient
52         Sdl           *SDLClient
53         SdlStorage    *SDLStorage
54         Rnib          *RNIBClient
55         Resource      *Router
56         Metric        *Metrics
57         Logger        *Log
58         Config        Configurator
59         Subscription  *Subscriber
60         Alarm         *AlarmClient
61         Util          *Utils
62         readyCb       ReadyCB
63         readyCbParams interface{}
64         shutdownCb    ShutdownCB
65         shutdownFlag  int32
66         shutdownCnt   int32
67 )
68
69 var startTime time.Time
70
71 func XappUpTime() time.Duration {
72         return time.Since(startTime)
73 }
74
75 func init() {
76         startTime = time.Now()
77 }
78
79 func IsReady() bool {
80         return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
81 }
82
83 func SetReadyCB(cb ReadyCB, params interface{}) {
84         readyCb = cb
85         readyCbParams = params
86 }
87
88 func XappReadyCb(params interface{}) {
89         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
90         if readyCb != nil {
91                 readyCb(readyCbParams)
92         }
93 }
94
95 func SetShutdownCB(cb ShutdownCB) {
96         shutdownCb = cb
97 }
98
99 func XappShutdownCb() {
100         if err := doDeregister(); err != nil {
101                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
102         } else {
103                 Logger.Info("xApp deregistration successfull!")
104         }
105
106         if shutdownCb != nil {
107                 shutdownCb()
108         }
109 }
110
111 func registerXapp() {
112         for {
113                 time.Sleep(5 * time.Second)
114                 if !IsHealthProbeReady() {
115                         Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
116                         continue
117                 }
118
119                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
120                 if err := doRegister(); err == nil {
121                         Logger.Info("Registration done, proceeding with startup ...")
122                         break
123                 }
124         }
125 }
126
127 func getService(host, service string) string {
128         appnamespace := os.Getenv("APP_NAMESPACE")
129         if appnamespace == "" {
130                 appnamespace = DEFAULT_XAPP_NS
131         }
132
133         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
134         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
135
136         Logger.Info("getService: %+v %+v", svc, url)
137         if len(url) > 1 {
138                 return url[1]
139         }
140         return ""
141 }
142
143 func getPltNamespace(envName, defVal string) string {
144         pltnamespace := os.Getenv("PLT_NAMESPACE")
145         if pltnamespace == "" {
146                 pltnamespace = defVal
147         }
148
149         return pltnamespace
150 }
151
152 func doPost(pltNs, url string, msg []byte, status int) error {
153         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
154         if err != nil || resp == nil || resp.StatusCode != status {
155                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
156                 return err
157         }
158         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
159
160         return err
161 }
162
163 func doRegister() error {
164         host, _ := os.Hostname()
165         xappname := viper.GetString("name")
166         xappversion := viper.GetString("version")
167         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
168
169         //httpEp, rmrEp := getService(xappname, SERVICE_HTTP), getService(xappname, SERVICE_RMR)
170         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
171         if httpEp == "" || rmrEp == "" {
172                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
173                 return nil
174         }
175
176         requestBody, err := json.Marshal(map[string]string{
177                 "appName":         host,
178                 "httpEndpoint":    httpEp,
179                 "rmrEndpoint":     rmrEp,
180                 "appInstanceName": xappname,
181                 "appVersion":      xappversion,
182                 "configPath":      CONFIG_PATH,
183         })
184
185         if err != nil {
186                 Logger.Error("json.Marshal failed with error: %v", err)
187                 return err
188         }
189
190         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
191 }
192
193 func doDeregister() error {
194         if !IsHealthProbeReady() {
195                 return nil
196         }
197
198         name, _ := os.Hostname()
199         xappname := viper.GetString("name")
200         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
201
202         requestBody, err := json.Marshal(map[string]string{
203                 "appName":         name,
204                 "appInstanceName": xappname,
205         })
206
207         if err != nil {
208                 Logger.Error("json.Marshal failed with error: %v", err)
209                 return err
210         }
211
212         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
213 }
214
215 func InstallSignalHandler() {
216         //
217         // Signal handlers to really exit program.
218         // shutdownCb can hang until application has
219         // made all needed gracefull shutdown actions
220         // hardcoded limit for shutdown is 20 seconds
221         //
222         interrupt := make(chan os.Signal, 1)
223         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
224         //signal handler function
225         go func() {
226                 for range interrupt {
227                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
228                                 // close function
229                                 go func() {
230                                         timeout := int(20)
231                                         sentry := make(chan struct{})
232                                         defer close(sentry)
233
234                                         // close callback
235                                         go func() {
236                                                 XappShutdownCb()
237                                                 sentry <- struct{}{}
238                                         }()
239                                         select {
240                                         case <-time.After(time.Duration(timeout) * time.Second):
241                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
242                                         case <-sentry:
243                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
244                                         }
245                                         os.Exit(0)
246                                 }()
247                         } else {
248                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
249                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
250                                 if newCnt >= 5 {
251                                         Logger.Info("xapp-frame shutdown forced exit")
252                                         os.Exit(0)
253                                 }
254                                 continue
255                         }
256
257                 }
258         }()
259 }
260
261 func init() {
262         // Load xapp configuration
263         Logger = LoadConfig()
264
265         if viper.IsSet("controls.logger.level") {
266                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
267         } else {
268                 Logger.SetLevel(viper.GetInt("logger.level"))
269         }
270
271         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
272                 Logger.SetFormat(0)
273         }
274
275         Resource = NewRouter()
276         Config = Configurator{}
277         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
278         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
279         SdlStorage = NewSdlStorage()
280         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
281         Rnib = GetNewRnibClient(SdlStorage.db)
282         Util = NewUtils()
283
284         InstallSignalHandler()
285 }
286
287 func GetIpAddress() (string, error) {
288         ifname := os.Getenv("INTERFACE_NAME")
289         itf, err := net.InterfaceByName(ifname)
290         if err != nil {
291                 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
292         }
293         item, err := itf.Addrs()
294         if err != nil {
295                 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
296         }
297         for _, addr := range item {
298                 switch v := addr.(type) {
299                 case *net.IPNet:
300                         if !v.IP.IsLinkLocalUnicast() {
301                                 return v.IP.String(), nil
302                         }
303                 }
304         }
305         return "<nil>", fmt.Errorf("Interface (%s) couldn't find ip", ifname)
306 }
307
308 func RunWithParams(c MessageConsumer, sdlcheck bool) {
309         Rmr = NewRMRClient()
310
311         Rmr.SetReadyCB(XappReadyCb, nil)
312         ipString, err := GetIpAddress()
313         if err != nil {
314                 Logger.Info("IP address is not able to resolve " + err.Error())
315         }
316         var host string
317         if ipString == "<nil>" {
318                 host = fmt.Sprintf(":%d", GetPortData("http").Port)
319         } else {
320                 host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port)
321         }
322         go http.ListenAndServe(host, Resource.router)
323         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
324
325         if sdlcheck {
326                 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
327         }
328         go registerXapp()
329
330         Rmr.Start(c)
331 }
332
333 func Run(c MessageConsumer) {
334         RunWithParams(c, true)
335 }