2bf77999bb9fed08e1e3de5bca1f133562bc22e6
[ric-plt/xapp-frame.git] / pkg / xapp / xapp.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/spf13/viper"
27         "io/ioutil"
28         "net/http"
29         "os"
30         "os/signal"
31         "strings"
32         "sync/atomic"
33         "syscall"
34         "time"
35 )
36
37 type ReadyCB func(interface{})
38 type ShutdownCB func()
39
40 var (
41         // XApp is an application instance
42         Rmr           *RMRClient
43         Sdl           *SDLClient
44         Rnib          *RNIBClient
45         Resource      *Router
46         Metric        *Metrics
47         Logger        *Log
48         Config        Configurator
49         Subscription  *Subscriber
50         Alarm         *AlarmClient
51         readyCb       ReadyCB
52         readyCbParams interface{}
53         shutdownCb    ShutdownCB
54         shutdownFlag  int32
55         shutdownCnt   int32
56 )
57
58 func IsReady() bool {
59         return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
60 }
61
62 func SetReadyCB(cb ReadyCB, params interface{}) {
63         readyCb = cb
64         readyCbParams = params
65 }
66
67 func XappReadyCb(params interface{}) {
68         Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
69         if readyCb != nil {
70                 readyCb(readyCbParams)
71         }
72 }
73
74 func xappShutdownCb() {
75         SendDeregistermsg()
76         Logger.Info("Wait for xapp to get unregistered")
77         time.Sleep(10 * time.Second)
78 }
79
80 func registerxapp() {
81         var (
82                 retries int = 10
83         )
84         for retries > 0 {
85                 name, _ := os.Hostname()
86                 httpservicename := "SERVICE_RICXAPP_" + strings.ToUpper(name) + "_HTTP_PORT"
87                 httpendpoint := os.Getenv(strings.Replace(httpservicename, "-", "_", -1))
88                 urlString := strings.Split(httpendpoint, "//")
89                 // Added this check to make UT pass
90                 if urlString[0] == "" {
91                         return
92                 }
93                 resp, err := http.Get(fmt.Sprintf("http://%s/ric/v1/health/ready", urlString[1]))
94                 retries -= 1
95                 time.Sleep(5 * time.Second)
96                 if err != nil {
97                         Logger.Error("Error in health check: %v", err)
98                 }
99                 if err == nil {
100                         retries -= 10
101                         Logger.Info("Health Probe Success with resp.StatusCode is %v", resp.StatusCode)
102                         if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
103                                 go SendRegistermsg()
104                         }
105                 } else {
106                         Logger.Info("Health Probe failed, retrying...")
107                 }
108         }
109 }
110
111 func SendRegistermsg() {
112         name, _ := os.Hostname()
113         xappname := viper.GetString("name")
114         xappversion := viper.GetString("version")
115         appnamespace := os.Getenv("APP_NAMESPACE")
116         if appnamespace == "" {
117                 appnamespace = "ricxapp"
118         }
119         httpservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_HTTP_PORT"
120         rmrservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_RMR_PORT"
121         httpendpointstr := os.Getenv(strings.Replace(httpservicename, "-", "_", -1))
122         rmrendpointstr := os.Getenv(strings.Replace(rmrservicename, "-", "_", -1))
123         httpendpoint := strings.Split(httpendpointstr, "//")
124         rmrendpoint := strings.Split(rmrendpointstr, "//")
125         if httpendpoint[0] == "" || rmrendpoint[0] == "" {
126                 return
127         }
128
129         pltnamespace := os.Getenv("PLT_NAMESPACE")
130         if pltnamespace == "" {
131                 pltnamespace = "ricplt"
132         }
133
134         configpath := "/ric/v1/config"
135
136         requestBody, err := json.Marshal(map[string]string{
137                 "appName":         name,
138                 "httpEndpoint":    httpendpoint[1],
139                 "rmrEndpoint":     rmrendpoint[1],
140                 "appInstanceName": xappname,
141                 "appVersion":      xappversion,
142                 "configPath":      configpath,
143         })
144
145         if err != nil {
146                 Logger.Info("Error while compiling request to appmgr: %v", err)
147         } else {
148                 url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/register", pltnamespace, pltnamespace)
149                 resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
150                 Logger.Info(" Resp is %v", resp)
151                 if err != nil {
152                         Logger.Info("Error  compiling request to appmgr: %v", err)
153                 }
154                 Logger.Info("Registering request sent. Response received is :%v", resp)
155
156                 if resp != nil {
157                         body, err := ioutil.ReadAll(resp.Body)
158                         Logger.Info("Post body is %v", resp.Body)
159                         if err != nil {
160                                 Logger.Info("rsp: Error  compiling request to appmgr: %v", string(body))
161                         }
162                         defer resp.Body.Close()
163                 }
164         }
165 }
166
167 func SendDeregistermsg() {
168
169         name, _ := os.Hostname()
170         xappname := viper.GetString("name")
171
172         appnamespace := os.Getenv("APP_NAMESPACE")
173         if appnamespace == "" {
174                 appnamespace = "ricxapp"
175         }
176         pltnamespace := os.Getenv("PLT_NAMESPACE")
177         if pltnamespace == "" {
178                 pltnamespace = "ricplt"
179         }
180
181         requestBody, err := json.Marshal(map[string]string{
182                 "appName":         name,
183                 "appInstanceName": xappname,
184         })
185
186         if err != nil {
187                 Logger.Info("Error while compiling request to appmgr: %v", err)
188         } else {
189                 url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/deregister", pltnamespace, pltnamespace)
190                 resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
191                 Logger.Info(" Resp is %v", resp)
192                 if err != nil {
193                         Logger.Info("Error  compiling request to appmgr: %v", err)
194                 }
195                 Logger.Info("Deregistering request sent. Response received is :%v", resp)
196
197                 if resp != nil {
198                         body, err := ioutil.ReadAll(resp.Body)
199                         Logger.Info("Post body is %v", resp.Body)
200                         if err != nil {
201                                 Logger.Info("rsp: Error  compiling request to appmgr: %v", string(body))
202                         }
203                         defer resp.Body.Close()
204                 }
205         }
206 }
207
208 func SetShutdownCB(cb ShutdownCB) {
209         shutdownCb = cb
210 }
211
212 func InstallSignalHandler() {
213         //
214         // Signal handlers to really exit program.
215         // shutdownCb can hang until application has
216         // made all needed gracefull shutdown actions
217         // hardcoded limit for shutdown is 20 seconds
218         //
219         interrupt := make(chan os.Signal, 1)
220         signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
221         //signal handler function
222         go func() {
223                 for range interrupt {
224                         if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
225                                 // close function
226                                 go func() {
227                                         timeout := int(20)
228                                         sentry := make(chan struct{})
229                                         defer close(sentry)
230
231                                         // close callback
232                                         go func() {
233                                                 if shutdownCb != nil {
234                                                         shutdownCb()
235                                                 }
236                                                 sentry <- struct{}{}
237                                         }()
238                                         select {
239                                         case <-time.After(time.Duration(timeout) * time.Second):
240                                                 Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
241                                         case <-sentry:
242                                                 Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
243                                         }
244                                         os.Exit(0)
245                                 }()
246                         } else {
247                                 newCnt := atomic.AddInt32(&shutdownCnt, 1)
248                                 Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
249                                 if newCnt >= 5 {
250                                         Logger.Info("xapp-frame shutdown forced exit")
251                                         os.Exit(0)
252                                 }
253                                 continue
254                         }
255
256                 }
257         }()
258 }
259
260 func init() {
261         // Load xapp configuration
262         Logger = LoadConfig()
263
264         if viper.IsSet("controls.logger.level") {
265                 Logger.SetLevel(viper.GetInt("controls.logger.level"))
266         } else {
267                 Logger.SetLevel(viper.GetInt("logger.level"))
268         }
269         Logger.SetFormat(0)
270
271         Resource = NewRouter()
272         Config = Configurator{}
273         Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
274         Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
275         Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
276         Rnib = NewRNIBClient()
277
278         InstallSignalHandler()
279 }
280
281 func RunWithParams(c MessageConsumer, sdlcheck bool) {
282         Rmr = NewRMRClient()
283         Rmr.SetReadyCB(XappReadyCb, nil)
284         SetShutdownCB(xappShutdownCb)
285         host := fmt.Sprintf(":%d", GetPortData("http").Port)
286         go http.ListenAndServe(host, Resource.router)
287         Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
288         if sdlcheck {
289                 Sdl.TestConnection()
290         }
291         go registerxapp()
292         Rmr.Start(c)
293 }
294
295 func Run(c MessageConsumer) {
296         RunWithParams(c, true)
297 }