0e278386f0226fc30e3bfd82eb9bdeb2d9f2ebc3
[ric-plt/vespamgr.git] / cmd / vesmgr / vesmgr.go
1 /*
2  *  Copyright (c) 2019 AT&T Intellectual Property.
3  *  Copyright (c) 2018-2019 Nokia.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package main
19
20 import (
21         "errors"
22         "io/ioutil"
23         "net"
24         "net/http"
25         "os"
26         "os/exec"
27         "time"
28
29         mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
30 )
31
32 var appmgrDomain string
33
34 const appmgrXAppConfigPath = "/ric/v1/config"
35 const appmgrPort = "8080"
36
37 type VesAgent struct {
38         Pid     int
39         name    string
40         process *os.Process
41 }
42
43 type VesMgr struct {
44         myIPAddress  string
45         appmgrSubsId string
46 }
47
48 type subsChannel struct {
49         subscribed bool
50         err        error
51 }
52
53 var vesagent VesAgent
54 var vesmgr VesMgr
55 var logger *mdcloggo.MdcLogger
56
57 const vesmgrXappNotifPort = "8080"
58 const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
59 const timeoutPostXAppSubscriptions = 5
60
61 func init() {
62         logger, _ = mdcloggo.InitLogger("vesmgr")
63 }
64
65 func getMyIP() (myIP string, retErr error) {
66         addrs, err := net.InterfaceAddrs()
67         if err != nil {
68                 logger.Error("net.InterfaceAddrs failed: %s", err.Error())
69                 return "", err
70         }
71         for _, addr := range addrs {
72                 // check the address type and if it is not a loopback take it
73                 if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
74                         if ipnet.IP.To4() != nil {
75                                 logger.Info("My IP Address: %s", ipnet.IP.String())
76                                 return ipnet.IP.String(), nil
77                         }
78                 }
79         }
80         return "", nil
81 }
82
83 func createConf(xappMetrics []byte) {
84         f, err := os.Create("/etc/ves-agent/ves-agent.yaml")
85         if err != nil {
86                 logger.Error("Cannot create vespa conf file: %s", err.Error())
87                 os.Exit(1)
88         }
89         defer f.Close()
90
91         createVespaConfig(f, xappMetrics)
92         logger.Info("Vespa config created")
93 }
94
95 func subscribeXAppNotifications(chSubscriptions chan subsChannel) {
96         xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
97         subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
98         go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl)
99         logger.Info("xApp notifications subscribed from %s", subsUrl)
100 }
101
102 func vesmgrInit() {
103         vesagent.name = "ves-agent"
104         logger.Info("vesmgrInit")
105
106         var err error
107         if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
108                 logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error())
109                 return
110         }
111
112         var ok bool
113         appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN")
114         if ok {
115                 logger.Info("Using appmgrdomain %s", appmgrDomain)
116         } else {
117                 appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
118                 logger.Info("Using default appmgrdomain %s", appmgrDomain)
119         }
120         chXAppSubscriptions := make(chan subsChannel)
121         chXAppNotifications := make(chan []byte)
122         chSupervision := make(chan chan string)
123         chVesagent := make(chan error)
124
125         listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort)
126         startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision)
127
128         subscribeXAppNotifications(chXAppSubscriptions)
129
130         runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions)
131 }
132
133 func startVesagent(ch chan error) {
134         cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"))
135         cmd.Stdout = os.Stdout
136         cmd.Stderr = os.Stderr
137         if err := cmd.Start(); err != nil {
138                 logger.Error("vesmgr exiting, ves-agent start failed: %s", err)
139                 go func() {
140                         ch <- err
141                 }()
142         } else {
143                 logger.Info("ves-agent started with pid %d", cmd.Process.Pid)
144                 vesagent.Pid = cmd.Process.Pid
145                 vesagent.process = cmd.Process
146                 go func() {
147                         // wait ves-agent exit and then post the error to the channel
148                         err := cmd.Wait()
149                         ch <- err
150                 }()
151         }
152 }
153
154 func killVespa(process *os.Process) {
155         logger.Info("Killing vespa")
156         err := process.Kill()
157         if err != nil {
158                 logger.Error("Cannot kill vespa: %s", err.Error())
159         }
160 }
161
162 func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) {
163
164         logger.Info("query xAppStatus started, url %s", appmgrUrl)
165         req, err := http.NewRequest("GET", appmgrUrl, nil)
166         if err != nil {
167                 logger.Error("Failed to create a HTTP request: %s", err)
168                 return nil, err
169         }
170         req.Header.Set("Content-Type", "application/json")
171         client := &http.Client{}
172         client.Timeout = time.Second * timeout
173         resp, err := client.Do(req)
174         if err != nil {
175                 logger.Error("Query xApp status failed: %s", err)
176                 return nil, err
177         }
178         defer resp.Body.Close()
179         if resp.StatusCode == http.StatusOK {
180                 body, err := ioutil.ReadAll(resp.Body)
181                 if err != nil {
182                         logger.Error("Failed to read xApp status body: %s", err)
183                         return nil, err
184                 }
185                 logger.Info("query xAppStatus completed")
186                 return body, nil
187         } else {
188                 logger.Error("Error from xApp status query: %s", resp.Status)
189                 return nil, errors.New(resp.Status)
190         }
191 }
192
193 type state int
194
195 const (
196         normalState           state = iota
197         vespaTerminatingState state = iota
198 )
199
200 func queryConf() ([]byte, error) {
201         return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
202                 10*time.Second)
203 }
204
205 func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) {
206
207         logger.Info("vesmgr main loop ready")
208         mystate := normalState
209         var xappStatus []byte
210         var err error
211         for {
212                 select {
213                 case supervision := <-chSupervision:
214                         logger.Info("vesmgr: supervision")
215                         supervision <- "OK"
216                 case xAppNotif := <-chXAppNotifications:
217                         logger.Info("vesmgr: xApp notification")
218                         logger.Info(string(xAppNotif))
219                         /*
220                          * If xapp status query fails then we cannot create
221                          * a new configuration and kill vespa.
222                          * In that case we assume that
223                          * the situation is fixed when the next
224                          * xapp notif comes
225                          */
226                         xappStatus, err = queryConf()
227                         if err == nil {
228                                 killVespa(vesagent.process)
229                                 mystate = vespaTerminatingState
230                         }
231                 case err := <-chVesagent:
232                         switch mystate {
233                         case vespaTerminatingState:
234                                 logger.Info("Vesagent termination completed")
235                                 createConf(xappStatus)
236                                 startVesagent(chVesagent)
237                                 mystate = normalState
238                         default:
239                                 logger.Error("Vesagent exited: " + err.Error())
240                                 os.Exit(1)
241                         }
242                 case isSubscribed := <-chXAppSubscriptions:
243                         if isSubscribed.err != nil {
244                                 logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
245                                 os.Exit(1)
246                         }
247                         xappStatus, err = queryConf()
248                         if err == nil {
249                                 createConf(xappStatus)
250                                 startVesagent(chVesagent)
251                         }
252                 }
253         }
254 }