b87463bc6865702d2432be33251eda5875215575
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "testing"
34         "time"
35
36         "github.com/spf13/viper"
37 )
38
39 // For testing purpose go version 1.13 ->
40 var _ = func() bool {
41         testing.Init()
42         return true
43 }()
44
45 type ReadyCB func(interface{})
46 type ShutdownCB func()
47
48 var (
49         // XApp is an application instance
50         Rmr           *RMRClient
51         Sdl           *SDLClient
52         SdlStorage    *SDLStorage
53         Rnib          *RNIBClient
54         Resource      *Router
55         Metric        *Metrics
56         Logger        *Log
57         Config        Configurator
58         Subscription  *Subscriber
59         Alarm         *AlarmClient
60         Util          *Utils
61         readyCb       ReadyCB
62         readyCbParams interface{}
63         shutdownCb    ShutdownCB
64         shutdownFlag  int32
65         shutdownCnt   int32
66 )
67
68 func IsReady() bool {
69         return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
70 }
71
72 func SetReadyCB(cb ReadyCB, params interface{}) {
73         readyCb = cb
74         readyCbParams = params
75 }
76
77 func XappReadyCb(params interface{}) {
78         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
79         if readyCb != nil {
80                 readyCb(readyCbParams)
81         }
82 }
83
84 func SetShutdownCB(cb ShutdownCB) {
85         shutdownCb = cb
86 }
87
88 func XappShutdownCb() {
89         if err := doDeregister(); err != nil {
90                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
91         } else {
92                 Logger.Info("xApp deregistration successfull!")
93         }
94
95         if shutdownCb != nil {
96                 shutdownCb()
97         }
98 }
99
100 func registerXapp() {
101         for {
102                 time.Sleep(5 * time.Second)
103                 if !IsHealthProbeReady() {
104                         Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
105                         continue
106                 }
107
108                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
109                 if err := doRegister(); err == nil {
110                         Logger.Info("Registration done, proceeding with startup ...")
111                         break
112                 }
113         }
114 }
115
116 func getService(host, service string) string {
117         appnamespace := os.Getenv("APP_NAMESPACE")
118         if appnamespace == "" {
119                 appnamespace = DEFAULT_XAPP_NS
120         }
121
122         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
123         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
124
125         Logger.Info("getService: %+v %+v", svc, url)
126         if len(url) > 1 {
127                 return url[1]
128         }
129         return ""
130 }
131
132 func getPltNamespace(envName, defVal string) string {
133         pltnamespace := os.Getenv("PLT_NAMESPACE")
134         if pltnamespace == "" {
135                 pltnamespace = defVal
136         }
137
138         return pltnamespace
139 }
140
141 func doPost(pltNs, url string, msg []byte, status int) error {
142         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
143         if err != nil || resp == nil || resp.StatusCode != status {
144                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
145                 return err
146         }
147         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
148
149         return err
150 }
151
152 func doRegister() error {
153         host, _ := os.Hostname()
154         xappname := viper.GetString("name")
155         xappversion := viper.GetString("version")
156         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
157
158         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
159         if httpEp == "" || rmrEp == "" {
160                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
161                 return nil
162         }
163
164         requestBody, err := json.Marshal(map[string]string{
165                 "appName":         host,
166                 "httpEndpoint":    httpEp,
167                 "rmrEndpoint":     rmrEp,
168                 "appInstanceName": xappname,
169                 "appVersion":      xappversion,
170                 "configPath":      CONFIG_PATH,
171         })
172
173         if err != nil {
174                 Logger.Error("json.Marshal failed with error: %v", err)
175                 return err
176         }
177
178         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
179 }
180
181 func doDeregister() error {
182         if !IsHealthProbeReady() {
183                 return nil
184         }
185
186         name, _ := os.Hostname()
187         xappname := viper.GetString("name")
188         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
189
190         requestBody, err := json.Marshal(map[string]string{
191                 "appName":         name,
192                 "appInstanceName": xappname,
193         })
194
195         if err != nil {
196                 Logger.Error("json.Marshal failed with error: %v", err)
197                 return err
198         }
199
200         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
201 }
202
203 func InstallSignalHandler() {
204         //
205         // Signal handlers to really exit program.
206         // shutdownCb can hang until application has
207         // made all needed gracefull shutdown actions
208         // hardcoded limit for shutdown is 20 seconds
209         //
210         interrupt := make(chan os.Signal, 1)
211         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
212         //signal handler function
213         go func() {
214                 for range interrupt {
215                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
216                                 // close function
217                                 go func() {
218                                         timeout := int(20)
219                                         sentry := make(chan struct{})
220                                         defer close(sentry)
221
222                                         // close callback
223                                         go func() {
224                                                 XappShutdownCb()
225                                                 sentry <- struct{}{}
226                                         }()
227                                         select {
228                                         case <-time.After(time.Duration(timeout) * time.Second):
229                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
230                                         case <-sentry:
231                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
232                                         }
233                                         os.Exit(0)
234                                 }()
235                         } else {
236                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
237                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
238                                 if newCnt >= 5 {
239                                         Logger.Info("xapp-frame shutdown forced exit")
240                                         os.Exit(0)
241                                 }
242                                 continue
243                         }
244
245                 }
246         }()
247 }
248
249 func init() {
250         // Load xapp configuration
251         Logger = LoadConfig()
252
253         if viper.IsSet("controls.logger.level") {
254                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
255         } else {
256                 Logger.SetLevel(viper.GetInt("logger.level"))
257         }
258
259         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
260                 Logger.SetFormat(0)
261         }
262
263         Resource = NewRouter()
264         Config = Configurator{}
265         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
266         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
267         SdlStorage = NewSdlStorage()
268         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
269         Rnib = GetNewRnibClient(SdlStorage.db)
270         Util = NewUtils()
271
272         InstallSignalHandler()
273 }
274
275 func getIpAdress() string {
276         var ip net.IP
277         itf, err := net.InterfaceByName(os.Getenv("INTERFACE_NAME"))
278         if err != nil {
279                 Logger.Info("Interface name is not able to resolve " + err.Error())
280                 return ip.String()
281         }
282         item, err := itf.Addrs()
283         if err != nil {
284                 Logger.Info("IP address is not able to resolve " + err.Error())
285                 return ip.String()
286         }
287         for _, addr := range item {
288                 switch v := addr.(type) {
289                 case *net.IPNet:
290                         if !v.IP.IsLinkLocalUnicast() {
291                                 ip = v.IP
292                         }
293                 }
294         }
295         return ip.String()
296 }
297
298 func RunWithParams(c MessageConsumer, sdlcheck bool) {
299         Rmr = NewRMRClient()
300
301         Rmr.SetReadyCB(XappReadyCb, nil)
302         ipString := getIpAdress()
303         var host string
304         if ipString == "<nil>" {
305                 host = fmt.Sprintf(":%d", GetPortData("http").Port)
306         } else {
307                 host = fmt.Sprintf("%s:%d", ipString, GetPortData("http").Port)
308         }
309         go http.ListenAndServe(host, Resource.router)
310         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
311
312         if sdlcheck {
313                 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
314         }
315         go registerXapp()
316
317         Rmr.Start(c)
318 }
319
320 func Run(c MessageConsumer) {
321         RunWithParams(c, true)
322 }