439c1778ab0834bf89037441ce75be2fa4266369
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "testing"
34         "time"
35
36         "github.com/spf13/viper"
37 )
38
39 // For testing purpose go version 1.13 ->
40
41 var _ = func() bool {
42         testing.Init()
43         return true
44 }()
45
46 type ReadyCB func(interface{})
47 type ShutdownCB func()
48
49 var (
50         // XApp is an application instance
51         Rmr                *RMRClient
52         Sdl                *SDLClient
53         SdlStorage         *SDLStorage
54         Rnib               *RNIBClient
55         Resource           *Router
56         Metric             *Metrics
57         Logger             *Log
58         Config             Configurator
59         Subscription       *Subscriber
60         Alarm              *AlarmClient
61         Util               *Utils
62         readyCb            ReadyCB
63         readyCbParams      interface{}
64         shutdownCb         ShutdownCB
65         shutdownFlag       int32
66         shutdownCnt        int32
67         disableAlarmClient bool
68 )
69
70 var startTime time.Time
71
72 func XappUpTime() time.Duration {
73         return time.Since(startTime)
74 }
75
76 func init() {
77         startTime = time.Now()
78 }
79
80 func IsReady() bool {
81         return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
82 }
83
84 func SetReadyCB(cb ReadyCB, params interface{}) {
85         readyCb = cb
86         readyCbParams = params
87 }
88
89 func XappReadyCb(params interface{}) {
90         if disableAlarmClient == false {
91                 Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
92         }
93         if readyCb != nil {
94                 readyCb(readyCbParams)
95         }
96 }
97
98 func SetShutdownCB(cb ShutdownCB) {
99         shutdownCb = cb
100 }
101
102 func XappShutdownCb() {
103         if err := doDeregister(); err != nil {
104                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
105         } else {
106                 Logger.Info("xApp deregistration successfull!")
107         }
108
109         if shutdownCb != nil {
110                 shutdownCb()
111         }
112 }
113
114 func registerXapp() {
115         for {
116                 time.Sleep(5 * time.Second)
117                 if !IsHealthProbeReady() {
118                         Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
119                         continue
120                 }
121
122                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
123                 if err := doRegister(); err == nil {
124                         Logger.Info("Registration done, proceeding with startup ...")
125                         break
126                 }
127         }
128 }
129
130 func getService(host, service string) string {
131         appnamespace := os.Getenv("APP_NAMESPACE")
132         if appnamespace == "" {
133                 appnamespace = DEFAULT_XAPP_NS
134         }
135
136         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
137         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
138
139         Logger.Info("getService: %+v %+v", svc, url)
140         if len(url) > 1 {
141                 return url[1]
142         }
143         return ""
144 }
145
146 func getPltNamespace(envName, defVal string) string {
147         pltnamespace := os.Getenv("PLT_NAMESPACE")
148         if pltnamespace == "" {
149                 pltnamespace = defVal
150         }
151
152         return pltnamespace
153 }
154
155 func doPost(pltNs, url string, msg []byte, status int) error {
156         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
157         if err != nil || resp == nil || resp.StatusCode != status {
158                 logdesc := fmt.Sprintf("http.Post to '%s' failed with", fmt.Sprintf(url, pltNs, pltNs))
159                 if resp != nil {
160                         logdesc += fmt.Sprintf(" status: %d != %d", resp.StatusCode, status)
161                 } else {
162                         logdesc += fmt.Sprintf(" resp: nil")
163                 }
164                 if err != nil {
165                         logdesc += fmt.Sprintf(" err: %s", err.Error())
166                 } else {
167                         logdesc += fmt.Sprintf(" err: nil")
168                 }
169                 Logger.Info(logdesc)
170                 return fmt.Errorf(logdesc)
171         }
172
173         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
174
175         return err
176 }
177
178 func doRegister() error {
179         host, _ := os.Hostname()
180         xappname := viper.GetString("name")
181         xappversion := viper.GetString("version")
182         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
183
184         //httpEp, rmrEp := getService(xappname, SERVICE_HTTP), getService(xappname, SERVICE_RMR)
185         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
186         if httpEp == "" || rmrEp == "" {
187                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
188                 return nil
189         }
190
191         requestBody, err := json.Marshal(map[string]string{
192                 "appName":         host,
193                 "httpEndpoint":    httpEp,
194                 "rmrEndpoint":     rmrEp,
195                 "appInstanceName": xappname,
196                 "appVersion":      xappversion,
197                 "configPath":      CONFIG_PATH,
198         })
199
200         if err != nil {
201                 Logger.Error("json.Marshal failed with error: %v", err)
202                 return err
203         }
204
205         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
206 }
207
208 func doDeregister() error {
209         if !IsHealthProbeReady() {
210                 return nil
211         }
212
213         name, _ := os.Hostname()
214         xappname := viper.GetString("name")
215         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
216
217         requestBody, err := json.Marshal(map[string]string{
218                 "appName":         name,
219                 "appInstanceName": xappname,
220         })
221
222         if err != nil {
223                 Logger.Error("json.Marshal failed with error: %v", err)
224                 return err
225         }
226
227         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
228 }
229
230 func InstallSignalHandler() {
231         //
232         // Signal handlers to really exit program.
233         // shutdownCb can hang until application has
234         // made all needed gracefull shutdown actions
235         // hardcoded limit for shutdown is 20 seconds
236         //
237         interrupt := make(chan os.Signal, 1)
238         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
239         //signal handler function
240         go func() {
241                 for range interrupt {
242                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
243                                 // close function
244                                 go func() {
245                                         timeout := int(20)
246                                         sentry := make(chan struct{})
247                                         defer close(sentry)
248
249                                         // close callback
250                                         go func() {
251                                                 XappShutdownCb()
252                                                 sentry <- struct{}{}
253                                         }()
254                                         select {
255                                         case <-time.After(time.Duration(timeout) * time.Second):
256                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
257                                         case <-sentry:
258                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
259                                         }
260                                         os.Exit(0)
261                                 }()
262                         } else {
263                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
264                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
265                                 if newCnt >= 5 {
266                                         Logger.Info("xapp-frame shutdown forced exit")
267                                         os.Exit(0)
268                                 }
269                                 continue
270                         }
271
272                 }
273         }()
274 }
275
276 func init() {
277         // Load xapp configuration
278         Logger = LoadConfig()
279
280         if viper.IsSet("controls.logger.level") {
281                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
282         } else {
283                 Logger.SetLevel(viper.GetInt("logger.level"))
284         }
285
286         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
287                 Logger.SetFormat(0)
288         }
289
290         Resource = NewRouter()
291         Config = Configurator{}
292         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
293         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
294         SdlStorage = NewSdlStorage()
295         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
296         Rnib = GetNewRnibClient(SdlStorage.db)
297         Util = NewUtils()
298
299         InstallSignalHandler()
300 }
301
302 func GetIpAddress() (string, error) {
303         ifname := os.Getenv("INTERFACE_NAME")
304         itf, err := net.InterfaceByName(ifname)
305         if err != nil {
306                 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
307         }
308         item, err := itf.Addrs()
309         if err != nil {
310                 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
311         }
312         for _, addr := range item {
313                 switch v := addr.(type) {
314                 case *net.IPNet:
315                         if !v.IP.IsLinkLocalUnicast() {
316                                 return v.IP.String(), nil
317                         }
318                 }
319         }
320         return "<nil>", fmt.Errorf("Interface (%s) couldn't find ip", ifname)
321 }
322
323 type RunParams struct {
324         SdlCheck           bool
325         DisableAlarmClient bool
326 }
327
328 func RunWithRunParams(c MessageConsumer, params RunParams) {
329
330         if params.DisableAlarmClient {
331                 disableAlarmClient = true
332         } else {
333                 disableAlarmClient = false
334         }
335
336         Rmr = NewRMRClient()
337
338         Rmr.SetReadyCB(XappReadyCb, nil)
339         ipString, err := GetIpAddress()
340         if err != nil {
341                 Logger.Info("IP address is not able to resolve " + err.Error())
342         }
343         var host string
344         if ipString == "<nil>" {
345                 host = fmt.Sprintf(":%d", GetPortData("http").Port)
346         } else {
347                 host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port)
348         }
349         go http.ListenAndServe(host, Resource.router)
350         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
351
352         if params.SdlCheck {
353                 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
354         }
355         go registerXapp()
356
357         Rmr.Start(c)
358 }
359
360 func RunWithParams(c MessageConsumer, sdlcheck bool) {
361         RunWithRunParams(c, RunParams{SdlCheck: sdlcheck, DisableAlarmClient: false})
362 }
363
364 func Run(c MessageConsumer) {
365         RunWithParams(c, true)
366 }