Refactor the code
[ric-plt/vespamgr.git] / cmd / vesmgr / vesmgr.go
1 /*
2  *  Copyright (c) 2019 AT&T Intellectual Property.
3  *  Copyright (c) 2018-2019 Nokia.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package main
19
20 import (
21         "errors"
22         "io/ioutil"
23         "net"
24         "net/http"
25         "os"
26         "time"
27
28         mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
29 )
30
31 var appmgrDomain string
32
33 const appmgrXAppConfigPath = "/ric/v1/config"
34 const appmgrPort = "8080"
35
36 // VesMgr contains runtime information of the vesmgr process
37 type VesMgr struct {
38         myIPAddress         string
39         chXAppSubscriptions chan subscriptionNotification
40         chXAppNotifications chan []byte
41         chSupervision       chan chan string
42         chVesagent          chan error
43         vesagent            cmdRunner
44         httpServer          HTTPServer
45 }
46
47 type subscriptionNotification struct {
48         subscribed bool
49         err        error
50         subsID     string
51 }
52
53 var logger *mdcloggo.MdcLogger
54
55 const vesmgrXappNotifPort = "8080"
56 const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
57 const timeoutPostXAppSubscriptions = 5
58 const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml"
59
60 func init() {
61         logger, _ = mdcloggo.InitLogger("vesmgr")
62 }
63
64 func getMyIP() (myIP string, retErr error) {
65         addrs, err := net.InterfaceAddrs()
66         if err != nil {
67                 logger.Error("net.InterfaceAddrs failed: %s", err.Error())
68                 return "", err
69         }
70         for _, addr := range addrs {
71                 // check the address type and if it is not a loopback take it
72                 if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
73                         if ipnet.IP.To4() != nil {
74                                 logger.Info("My IP Address: %s", ipnet.IP.String())
75                                 return ipnet.IP.String(), nil
76                         }
77                 }
78         }
79         return "", nil
80 }
81
82 func createConf(fname string, xappMetrics []byte) {
83         f, err := os.Create(fname)
84         if err != nil {
85                 logger.Error("Cannot create vespa conf file: %s", err.Error())
86                 os.Exit(1)
87         }
88         defer f.Close()
89
90         createVespaConfig(f, xappMetrics)
91         logger.Info("Vespa config created")
92 }
93
94 func (vesmgr *VesMgr) subscribeXAppNotifications() {
95         xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
96         subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
97         go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL)
98         logger.Info("xApp notifications subscribed from %s", subsURL)
99 }
100
101 // Init initializes the vesmgr
102 func (vesmgr *VesMgr) Init(listenPort string) *VesMgr {
103         logger.Info("vesmgrInit")
104         var err error
105         if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
106                 logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error())
107                 panic("Cannot get my IP address")
108         }
109
110         var ok bool
111         appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN")
112         if ok {
113                 logger.Info("Using appmgrdomain %s", appmgrDomain)
114         } else {
115                 appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
116                 logger.Info("Using default appmgrdomain %s", appmgrDomain)
117         }
118         vesmgr.chXAppSubscriptions = make(chan subscriptionNotification)
119         // Create notifications as buffered channel so that
120         // xappmgr does not block if we are stuck somewhere
121         vesmgr.chXAppNotifications = make(chan []byte, 10)
122         vesmgr.chSupervision = make(chan chan string)
123         vesmgr.chVesagent = make(chan error)
124         vesmgr.httpServer = HTTPServer{}
125         vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort)
126         vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"),
127                 "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address",
128                 os.Getenv("VESMGR_PROMETHEUS_ADDR"))
129         return vesmgr
130 }
131
132 func (vesmgr *VesMgr) startVesagent() {
133         vesmgr.vesagent.run(vesmgr.chVesagent)
134 }
135
136 func (vesmgr *VesMgr) killVespa() error {
137         logger.Info("Killing vespa")
138         err := vesmgr.vesagent.kill()
139         if err != nil {
140                 logger.Error("Cannot kill vespa: %s", err.Error())
141                 return err
142         }
143         return <-vesmgr.chVesagent // wait vespa exit
144 }
145
146 func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) {
147         emptyConfig := []byte("{}")
148         logger.Info("query xAppConfig started, url %s", appmgrURL)
149         req, err := http.NewRequest("GET", appmgrURL, nil)
150         if err != nil {
151                 logger.Error("Failed to create a HTTP request: %s", err)
152                 return emptyConfig, err
153         }
154         req.Header.Set("Content-Type", "application/json")
155         client := &http.Client{}
156         client.Timeout = time.Second * timeout
157         resp, err := client.Do(req)
158         if err != nil {
159                 logger.Error("Query xApp config failed: %s", err)
160                 return emptyConfig, err
161         }
162         defer resp.Body.Close()
163         if resp.StatusCode == http.StatusOK {
164                 body, err := ioutil.ReadAll(resp.Body)
165                 if err != nil {
166                         logger.Error("Failed to read xApp config body: %s", err)
167                         return emptyConfig, err
168                 }
169                 logger.Info("query xAppConfig completed")
170                 return body, nil
171         }
172         logger.Error("Error from xApp config query: %s", resp.Status)
173         return emptyConfig, errors.New(resp.Status)
174 }
175
176 func queryConf() ([]byte, error) {
177         return queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
178                 10*time.Second)
179 }
180
181 func (vesmgr *VesMgr) emptyNotificationsChannel() {
182         for {
183                 select {
184                 case <-vesmgr.chXAppNotifications:
185                         // we don't care the content
186                 default:
187                         return
188                 }
189         }
190 }
191
192 func (vesmgr *VesMgr) servRequest() {
193         select {
194         case supervision := <-vesmgr.chSupervision:
195                 logger.Info("vesmgr: supervision")
196                 supervision <- "OK"
197         case xAppNotif := <-vesmgr.chXAppNotifications:
198                 logger.Info("vesmgr: xApp notification")
199                 logger.Info(string(xAppNotif))
200                 vesmgr.emptyNotificationsChannel()
201                 /*
202                 * If xapp config query fails then we cannot create
203                 * a new configuration and kill vespa.
204                 * In that case we assume that
205                 * the situation is fixed when the next
206                 * xapp notif comes
207                  */
208                 xappConfig, err := queryConf()
209                 if err == nil {
210                         vesmgr.killVespa()
211                         createConf(vespaConfigFile, xappConfig)
212                         vesmgr.startVesagent()
213                 }
214         case err := <-vesmgr.chVesagent:
215                 logger.Error("Vesagent exited: " + err.Error())
216                 os.Exit(1)
217         }
218 }
219
220 func (vesmgr *VesMgr) waitSubscriptionLoop() {
221         for {
222                 select {
223                 case supervision := <-vesmgr.chSupervision:
224                         logger.Info("vesmgr: supervision")
225                         supervision <- "OK"
226                 case isSubscribed := <-vesmgr.chXAppSubscriptions:
227                         if isSubscribed.err != nil {
228                                 logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
229                                 os.Exit(1)
230                         }
231                         return
232                 }
233         }
234 }
235
236 // Run the vesmgr process main loop
237 func (vesmgr *VesMgr) Run() {
238         logger.Info("vesmgr main loop ready")
239         vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision)
240         vesmgr.subscribeXAppNotifications()
241         vesmgr.waitSubscriptionLoop()
242         xappConfig, _ := queryConf()
243         createConf(vespaConfigFile, xappConfig)
244         vesmgr.startVesagent()
245         for {
246                 vesmgr.servRequest()
247         }
248 }