INTERNAL: improve symptomdata
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "testing"
34         "time"
35
36         "github.com/spf13/viper"
37 )
38
39 // For testing purpose go version 1.13 ->
40
41 var _ = func() bool {
42         testing.Init()
43         return true
44 }()
45
46 type ReadyCB func(interface{})
47 type ShutdownCB func()
48
49 var (
50         // XApp is an application instance
51         Rmr           *RMRClient
52         Sdl           *SDLClient
53         SdlStorage    *SDLStorage
54         Rnib          *RNIBClient
55         Resource      *Router
56         Metric        *Metrics
57         Logger        *Log
58         Config        Configurator
59         Subscription  *Subscriber
60         Alarm         *AlarmClient
61         Util          *Utils
62         readyCb       ReadyCB
63         readyCbParams interface{}
64         shutdownCb    ShutdownCB
65         shutdownFlag  int32
66         shutdownCnt   int32
67 )
68
69 func IsReady() bool {
70         return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
71 }
72
73 func SetReadyCB(cb ReadyCB, params interface{}) {
74         readyCb = cb
75         readyCbParams = params
76 }
77
78 func XappReadyCb(params interface{}) {
79         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
80         if readyCb != nil {
81                 readyCb(readyCbParams)
82         }
83 }
84
85 func SetShutdownCB(cb ShutdownCB) {
86         shutdownCb = cb
87 }
88
89 func XappShutdownCb() {
90         if err := doDeregister(); err != nil {
91                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
92         } else {
93                 Logger.Info("xApp deregistration successfull!")
94         }
95
96         if shutdownCb != nil {
97                 shutdownCb()
98         }
99 }
100
101 func registerXapp() {
102         for {
103                 time.Sleep(5 * time.Second)
104                 if !IsHealthProbeReady() {
105                         Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
106                         continue
107                 }
108
109                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
110                 if err := doRegister(); err == nil {
111                         Logger.Info("Registration done, proceeding with startup ...")
112                         break
113                 }
114         }
115 }
116
117 func getService(host, service string) string {
118         appnamespace := os.Getenv("APP_NAMESPACE")
119         if appnamespace == "" {
120                 appnamespace = DEFAULT_XAPP_NS
121         }
122
123         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
124         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
125
126         Logger.Info("getService: %+v %+v", svc, url)
127         if len(url) > 1 {
128                 return url[1]
129         }
130         return ""
131 }
132
133 func getPltNamespace(envName, defVal string) string {
134         pltnamespace := os.Getenv("PLT_NAMESPACE")
135         if pltnamespace == "" {
136                 pltnamespace = defVal
137         }
138
139         return pltnamespace
140 }
141
142 func doPost(pltNs, url string, msg []byte, status int) error {
143         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
144         if err != nil || resp == nil || resp.StatusCode != status {
145                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
146                 return err
147         }
148         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
149
150         return err
151 }
152
153 func doRegister() error {
154         host, _ := os.Hostname()
155         xappname := viper.GetString("name")
156         xappversion := viper.GetString("version")
157         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
158
159         //httpEp, rmrEp := getService(xappname, SERVICE_HTTP), getService(xappname, SERVICE_RMR)
160         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
161         if httpEp == "" || rmrEp == "" {
162                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
163                 return nil
164         }
165
166         requestBody, err := json.Marshal(map[string]string{
167                 "appName":         host,
168                 "httpEndpoint":    httpEp,
169                 "rmrEndpoint":     rmrEp,
170                 "appInstanceName": xappname,
171                 "appVersion":      xappversion,
172                 "configPath":      CONFIG_PATH,
173         })
174
175         if err != nil {
176                 Logger.Error("json.Marshal failed with error: %v", err)
177                 return err
178         }
179
180         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
181 }
182
183 func doDeregister() error {
184         if !IsHealthProbeReady() {
185                 return nil
186         }
187
188         name, _ := os.Hostname()
189         xappname := viper.GetString("name")
190         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
191
192         requestBody, err := json.Marshal(map[string]string{
193                 "appName":         name,
194                 "appInstanceName": xappname,
195         })
196
197         if err != nil {
198                 Logger.Error("json.Marshal failed with error: %v", err)
199                 return err
200         }
201
202         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
203 }
204
205 func InstallSignalHandler() {
206         //
207         // Signal handlers to really exit program.
208         // shutdownCb can hang until application has
209         // made all needed gracefull shutdown actions
210         // hardcoded limit for shutdown is 20 seconds
211         //
212         interrupt := make(chan os.Signal, 1)
213         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
214         //signal handler function
215         go func() {
216                 for range interrupt {
217                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
218                                 // close function
219                                 go func() {
220                                         timeout := int(20)
221                                         sentry := make(chan struct{})
222                                         defer close(sentry)
223
224                                         // close callback
225                                         go func() {
226                                                 XappShutdownCb()
227                                                 sentry <- struct{}{}
228                                         }()
229                                         select {
230                                         case <-time.After(time.Duration(timeout) * time.Second):
231                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
232                                         case <-sentry:
233                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
234                                         }
235                                         os.Exit(0)
236                                 }()
237                         } else {
238                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
239                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
240                                 if newCnt >= 5 {
241                                         Logger.Info("xapp-frame shutdown forced exit")
242                                         os.Exit(0)
243                                 }
244                                 continue
245                         }
246
247                 }
248         }()
249 }
250
251 func init() {
252         // Load xapp configuration
253         Logger = LoadConfig()
254
255         if viper.IsSet("controls.logger.level") {
256                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
257         } else {
258                 Logger.SetLevel(viper.GetInt("logger.level"))
259         }
260
261         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
262                 Logger.SetFormat(0)
263         }
264
265         Resource = NewRouter()
266         Config = Configurator{}
267         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
268         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
269         SdlStorage = NewSdlStorage()
270         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
271         Rnib = GetNewRnibClient(SdlStorage.db)
272         Util = NewUtils()
273
274         InstallSignalHandler()
275 }
276
277 func getIpAdress() string {
278         var ip net.IP
279         itf, err := net.InterfaceByName(os.Getenv("INTERFACE_NAME"))
280         if err != nil {
281                 Logger.Info("Interface name is not able to resolve " + err.Error())
282                 return ip.String()
283         }
284         item, err := itf.Addrs()
285         if err != nil {
286                 Logger.Info("IP address is not able to resolve " + err.Error())
287                 return ip.String()
288         }
289         for _, addr := range item {
290                 switch v := addr.(type) {
291                 case *net.IPNet:
292                         if !v.IP.IsLinkLocalUnicast() {
293                                 ip = v.IP
294                         }
295                 }
296         }
297         return ip.String()
298 }
299
300 func RunWithParams(c MessageConsumer, sdlcheck bool) {
301         Rmr = NewRMRClient()
302
303         Rmr.SetReadyCB(XappReadyCb, nil)
304         ipString := getIpAdress()
305         var host string
306         if ipString == "<nil>" {
307                 host = fmt.Sprintf(":%d", GetPortData("http").Port)
308         } else {
309                 host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port)
310         }
311         go http.ListenAndServe(host, Resource.router)
312         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
313
314         if sdlcheck {
315                 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
316         }
317         go registerXapp()
318
319         Rmr.Start(c)
320 }
321
322 func Run(c MessageConsumer) {
323         RunWithParams(c, true)
324 }