RIC-1017: xapp-frame may not be able to successfully subscribe to messages
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "net"
27         "net/http"
28         "os"
29         "os/signal"
30         "strings"
31         "sync/atomic"
32         "syscall"
33         "testing"
34         "time"
35
36         "github.com/spf13/viper"
37 )
38
39 // For testing purpose go version 1.13 ->
40
41 var _ = func() bool {
42         testing.Init()
43         return true
44 }()
45
46 type ReadyCB func(interface{})
47 type ShutdownCB func()
48
49 var (
50         // XApp is an application instance
51         Rmr                *RMRClient
52         Sdl                *SDLClient
53         SdlStorage         *SDLStorage
54         Rnib               *RNIBClient
55         Resource           *Router
56         Metric             *Metrics
57         Logger             *Log
58         Config             Configurator
59         Subscription       *Subscriber
60         Alarm              *AlarmClient
61         Util               *Utils
62         readyCb            ReadyCB
63         readyCbParams      interface{}
64         shutdownCb         ShutdownCB
65         shutdownFlag       int32
66         shutdownCnt        int32
67         disableAlarmClient bool
68         isRegistered       bool
69 )
70
71 var startTime time.Time
72
73 func XappUpTime() time.Duration {
74         return time.Since(startTime)
75 }
76
77 func init() {
78         startTime = time.Now()
79 }
80
81 func IsReady() bool {
82         return Rmr != nil && Rmr.IsReady() && SdlStorage != nil && SdlStorage.IsReady()
83 }
84
85 func IsRegistered() bool {
86         return isRegistered
87 }
88
89 func SetReadyCB(cb ReadyCB, params interface{}) {
90         readyCb = cb
91         readyCbParams = params
92 }
93
94 func XappReadyCb(params interface{}) {
95         if disableAlarmClient == false {
96                 Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
97         }
98         if readyCb != nil {
99                 readyCb(readyCbParams)
100         }
101 }
102
103 func SetShutdownCB(cb ShutdownCB) {
104         shutdownCb = cb
105 }
106
107 func XappShutdownCb() {
108         if err := doDeregister(); err != nil {
109                 Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
110         } else {
111                 Logger.Info("xApp deregistration successfull!")
112         }
113
114         if shutdownCb != nil {
115                 shutdownCb()
116         }
117
118         isRegistered = false
119 }
120
121 func registerXapp() {
122         for {
123                 time.Sleep(5 * time.Second)
124                 if !IsHealthProbeReady() {
125                         Logger.Debug("Application='%s' is not ready yet, waiting ...", viper.GetString("name"))
126                         continue
127                 }
128
129                 Logger.Debug("Application='%s' is now up and ready, continue with registration ...", viper.GetString("name"))
130                 if err := doRegister(); err == nil {
131                         isRegistered = true
132                         Logger.Info("Registration done, proceeding with startup ...")
133                         break
134                 }
135         }
136 }
137
138 func getService(host, service string) string {
139         appnamespace := os.Getenv("APP_NAMESPACE")
140         if appnamespace == "" {
141                 appnamespace = DEFAULT_XAPP_NS
142         }
143
144         svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
145         url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
146
147         Logger.Info("getService: %+v %+v", svc, url)
148         if len(url) > 1 {
149                 return url[1]
150         }
151         return ""
152 }
153
154 func getPltNamespace(envName, defVal string) string {
155         pltnamespace := os.Getenv("PLT_NAMESPACE")
156         if pltnamespace == "" {
157                 pltnamespace = defVal
158         }
159
160         return pltnamespace
161 }
162
163 func doPost(pltNs, url string, msg []byte, status int) error {
164         resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
165         if err != nil || resp == nil || resp.StatusCode != status {
166                 logdesc := fmt.Sprintf("http.Post to '%s' failed with", fmt.Sprintf(url, pltNs, pltNs))
167                 if resp != nil {
168                         logdesc += fmt.Sprintf(" status: %d != %d", resp.StatusCode, status)
169                 } else {
170                         logdesc += fmt.Sprintf(" resp: nil")
171                 }
172                 if err != nil {
173                         logdesc += fmt.Sprintf(" err: %s", err.Error())
174                 } else {
175                         logdesc += fmt.Sprintf(" err: nil")
176                 }
177                 Logger.Info(logdesc)
178                 return fmt.Errorf(logdesc)
179         }
180
181         Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
182
183         return err
184 }
185
186 func doRegister() error {
187         host, _ := os.Hostname()
188         xappname := viper.GetString("name")
189         xappversion := viper.GetString("version")
190         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
191
192         //httpEp, rmrEp := getService(xappname, SERVICE_HTTP), getService(xappname, SERVICE_RMR)
193         httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
194         if httpEp == "" || rmrEp == "" {
195                 Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
196                 return nil
197         }
198
199         requestBody, err := json.Marshal(map[string]string{
200                 "appName":         host,
201                 "httpEndpoint":    httpEp,
202                 "rmrEndpoint":     rmrEp,
203                 "appInstanceName": xappname,
204                 "appVersion":      xappversion,
205                 "configPath":      CONFIG_PATH,
206         })
207
208         if err != nil {
209                 Logger.Error("json.Marshal failed with error: %v", err)
210                 return err
211         }
212
213         return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
214 }
215
216 func doDeregister() error {
217         if !IsHealthProbeReady() {
218                 return nil
219         }
220
221         name, _ := os.Hostname()
222         xappname := viper.GetString("name")
223         pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
224
225         requestBody, err := json.Marshal(map[string]string{
226                 "appName":         name,
227                 "appInstanceName": xappname,
228         })
229
230         if err != nil {
231                 Logger.Error("json.Marshal failed with error: %v", err)
232                 return err
233         }
234
235         return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
236 }
237
238 func InstallSignalHandler() {
239         //
240         // Signal handlers to really exit program.
241         // shutdownCb can hang until application has
242         // made all needed gracefull shutdown actions
243         // hardcoded limit for shutdown is 20 seconds
244         //
245         interrupt := make(chan os.Signal, 1)
246         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
247         //signal handler function
248         go func() {
249                 for range interrupt {
250                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
251                                 // close function
252                                 go func() {
253                                         timeout := int(20)
254                                         sentry := make(chan struct{})
255                                         defer close(sentry)
256
257                                         // close callback
258                                         go func() {
259                                                 XappShutdownCb()
260                                                 sentry <- struct{}{}
261                                         }()
262                                         select {
263                                         case <-time.After(time.Duration(timeout) * time.Second):
264                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
265                                         case <-sentry:
266                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
267                                         }
268                                         os.Exit(0)
269                                 }()
270                         } else {
271                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
272                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
273                                 if newCnt >= 5 {
274                                         Logger.Info("xapp-frame shutdown forced exit")
275                                         os.Exit(0)
276                                 }
277                                 continue
278                         }
279
280                 }
281         }()
282 }
283
284 func init() {
285         // Load xapp configuration
286         Logger = LoadConfig()
287
288         if viper.IsSet("controls.logger.level") {
289                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
290         } else {
291                 Logger.SetLevel(viper.GetInt("logger.level"))
292         }
293
294         if !viper.IsSet("controls.logger.noFormat") || !viper.GetBool("controls.logger.noFormat") {
295                 Logger.SetFormat(0)
296         }
297
298         Resource = NewRouter()
299         Config = Configurator{}
300         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
301         Subscription = NewSubscriber(viper.GetString("controls.subscription.host"), viper.GetInt("controls.subscription.timeout"))
302         SdlStorage = NewSdlStorage()
303         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
304         Rnib = GetNewRnibClient(SdlStorage.db)
305         Util = NewUtils()
306
307         InstallSignalHandler()
308 }
309
310 func GetIpAddress() (string, error) {
311         ifname := os.Getenv("INTERFACE_NAME")
312         itf, err := net.InterfaceByName(ifname)
313         if err != nil {
314                 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
315         }
316         item, err := itf.Addrs()
317         if err != nil {
318                 return "<nil>", fmt.Errorf("Interface (%s) %w", ifname, err)
319         }
320         for _, addr := range item {
321                 switch v := addr.(type) {
322                 case *net.IPNet:
323                         if !v.IP.IsLinkLocalUnicast() {
324                                 return v.IP.String(), nil
325                         }
326                 }
327         }
328         return "<nil>", fmt.Errorf("Interface (%s) couldn't find ip", ifname)
329 }
330
331 type RunParams struct {
332         SdlCheck           bool
333         DisableAlarmClient bool
334 }
335
336 func RunWithRunParams(c MessageConsumer, params RunParams) {
337
338         if params.DisableAlarmClient {
339                 disableAlarmClient = true
340         } else {
341                 disableAlarmClient = false
342         }
343
344         Rmr = NewRMRClient()
345
346         Rmr.SetReadyCB(XappReadyCb, nil)
347         ipString, err := GetIpAddress()
348         if err != nil {
349                 Logger.Info("IP address is not able to resolve " + err.Error())
350         }
351         var host string
352         if ipString == "<nil>" {
353                 host = fmt.Sprintf(":%d", GetPortData("http").Port)
354         } else {
355                 host = fmt.Sprintf("[%s]:%d", ipString, GetPortData("http").Port)
356         }
357         go http.ListenAndServe(host, Resource.router)
358         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
359
360         if params.SdlCheck {
361                 SdlStorage.TestConnection(viper.GetString("controls.db.namespace"))
362         }
363         go registerXapp()
364
365         Rmr.Start(c)
366 }
367
368 func RunWithParams(c MessageConsumer, sdlcheck bool) {
369         RunWithRunParams(c, RunParams{SdlCheck: sdlcheck, DisableAlarmClient: false})
370 }
371
372 func Run(c MessageConsumer) {
373         RunWithParams(c, true)
374 }