e832436500517b2ddb3500af406f75eba6c1a293
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "testing"
34         "time"
35
36         "github.com/spf13/viper"
37 )
38
39 // For testing purpose go version 1.13 ->
40
41 var _ = func() bool {
42         testing.Init()
43         return true
44 }()
45
46 type ReadyCB func(interface{})
47 type ShutdownCB func()
48
49 var (
50         // XApp is an application instance
51         Rmr           *RMRClient
52         Sdl           *SDLClient
53         SdlStorage    *SDLStorage
54         Rnib          *RNIBClient
55         Resource      *Router
56         Metric        *Metrics
57         Logger        *Log
58         Config        Configurator
59         Subscription  *Subscriber
60         Alarm         *AlarmClient
61         Util          *Utils
62         readyCb       ReadyCB
63         readyCbParams interface{}
64         shutdownCb    ShutdownCB
65         shutdownFlag  int32
66         shutdownCnt   int32
67 )
68
69 func IsReady() bool {
70         return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
71 }
72
73 func SetReadyCB(cb ReadyCB, params interface{}) {
74         readyCb = cb
75         readyCbParams = params
76 }
77
78 func XappReadyCb(params interface{}) {
79         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
80         if readyCb != nil {
81                 readyCb(readyCbParams)
82         }
83 }
84
85 func SetShutdownCB(cb ShutdownCB) {
86         shutdownCb = cb
87 }
88
89 func XappShutdownCb() {
90         if err := doDeregister(); err != nil {
91                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
92         } else {
93                 Logger.Info("xApp deregistration successfull!")
94         }
95
96         if shutdownCb != nil {
97                 shutdownCb()
98         }
99 }
100
101 func registerXapp() {
102         for {
103                 time.Sleep(5 * time.Second)
104                 if !IsHealthProbeReady() {
105                         Logger.Info("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
106                         continue
107                 }
108
109                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
110                 if err := doRegister(); err == nil {
111                         Logger.Info("Registration done, proceeding with startup ...")
112                         break
113                 }
114         }
115 }
116
117 func getService(host, service string) string {
118         appnamespace := os.Getenv("APP_NAMESPACE")
119         if appnamespace == "" {
120                 appnamespace = DEFAULT_XAPP_NS
121         }
122
123         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
124         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
125
126         Logger.Info("getService: %+v %+v", svc, url)
127         if len(url) > 1 {
128                 return url[1]
129         }
130         return ""
131 }
132
133 func getPltNamespace(envName, defVal string) string {
134         pltnamespace := os.Getenv("PLT_NAMESPACE")
135         if pltnamespace == "" {
136                 pltnamespace = defVal
137         }
138
139         return pltnamespace
140 }
141
142 func doPost(pltNs, url string, msg []byte, status int) error {
143         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
144         if err != nil || resp == nil || resp.StatusCode != status {
145                 Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
146                 return err
147         }
148         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
149
150         return err
151 }
152
153 func doRegister() error {
154         host, _ := os.Hostname()
155         xappname := viper.GetString("name")
156         xappversion := viper.GetString("version")
157         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
158
159         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
160         if httpEp == "" || rmrEp == "" {
161                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
162                 return nil
163         }
164
165         requestBody, err := json.Marshal(map[string]string{
166                 "appName":         host,
167                 "httpEndpoint":    httpEp,
168                 "rmrEndpoint":     rmrEp,
169                 "appInstanceName": xappname,
170                 "appVersion":      xappversion,
171                 "configPath":      CONFIG_PATH,
172         })
173
174         if err != nil {
175                 Logger.Error("json.Marshal failed with error: %v", err)
176                 return err
177         }
178
179         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
180 }
181
182 func doDeregister() error {
183         if !IsHealthProbeReady() {
184                 return nil
185         }
186
187         name, _ := os.Hostname()
188         xappname := viper.GetString("name")
189         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
190
191         requestBody, err := json.Marshal(map[string]string{
192                 "appName":         name,
193                 "appInstanceName": xappname,
194         })
195
196         if err != nil {
197                 Logger.Error("json.Marshal failed with error: %v", err)
198                 return err
199         }
200
201         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
202 }
203
204 func InstallSignalHandler() {
205         //
206         // Signal handlers to really exit program.
207         // shutdownCb can hang until application has
208         // made all needed gracefull shutdown actions
209         // hardcoded limit for shutdown is 20 seconds
210         //
211         interrupt := make(chan os.Signal, 1)
212         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
213         //signal handler function
214         go func() {
215                 for range interrupt {
216                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
217                                 // close function
218                                 go func() {
219                                         timeout := int(20)
220                                         sentry := make(chan struct{})
221                                         defer close(sentry)
222
223                                         // close callback
224                                         go func() {
225                                                 XappShutdownCb()
226                                                 sentry <- struct{}{}
227                                         }()
228                                         select {
229                                         case <-time.After(time.Duration(timeout) * time.Second):
230                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
231                                         case <-sentry:
232                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
233                                         }
234                                         os.Exit(0)
235                                 }()
236                         } else {
237                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
238                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
239                                 if newCnt >= 5 {
240                                         Logger.Info("xapp-frame shutdown forced exit")
241                                         os.Exit(0)
242                                 }
243                                 continue
244                         }
245
246                 }
247         }()
248 }
249
250 func init() {
251         // Load xapp configuration
252         Logger = LoadConfig()
253
254         if viper.IsSet("controls.logger.level") {
255                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
256         } else {
257                 Logger.SetLevel(viper.GetInt("logger.level"))
258         }
259
260         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
261                 Logger.SetFormat(0)
262         }
263
264         Resource = NewRouter()
265         Config = Configurator{}
266         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
267         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
268         SdlStorage = NewSdlStorage()
269         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
270         Rnib = GetNewRnibClient(SdlStorage.db)
271         Util = NewUtils()
272
273         InstallSignalHandler()
274 }
275
276 func getIpAdress() string {
277         var ip net.IP
278         itf, err := net.InterfaceByName(os.Getenv("INTERFACE_NAME"))
279         if err != nil {
280                 Logger.Info("Interface name is not able to resolve " + err.Error())
281                 return ip.String()
282         }
283         item, err := itf.Addrs()
284         if err != nil {
285                 Logger.Info("IP address is not able to resolve " + err.Error())
286                 return ip.String()
287         }
288         for _, addr := range item {
289                 switch v := addr.(type) {
290                 case *net.IPNet:
291                         if !v.IP.IsLinkLocalUnicast() {
292                                 ip = v.IP
293                         }
294                 }
295         }
296         return ip.String()
297 }
298
299 func RunWithParams(c MessageConsumer, sdlcheck bool) {
300         Rmr = NewRMRClient()
301
302         Rmr.SetReadyCB(XappReadyCb, nil)
303         ipString := getIpAdress()
304         var host string
305         if ipString == "<nil>" {
306                 host = fmt.Sprintf(":%d", GetPortData("http").Port)
307         } else {
308                 host = fmt.Sprintf("%s:%d", ipString, GetPortData("http").Port)
309         }
310         go http.ListenAndServe(host, Resource.router)
311         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
312
313         if sdlcheck {
314                 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
315         }
316         go registerXapp()
317
318         Rmr.Start(c)
319 }
320
321 func Run(c MessageConsumer) {
322         RunWithParams(c, true)
323 }