1a5246f2d6f427d0612313334ecfd655dab1622c
[ric-plt/vespamgr.git] / cmd / vespamgr / vespamgr.go
1 /*
2  *  Copyright (c) 2020 AT&T Intellectual Property.
3  *  Copyright (c) 2020 Nokiv.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
18  * platform project (RICP).
19  */
20
21 package main
22
23 import (
24         "bytes"
25         "encoding/json"
26         "fmt"
27         "io/ioutil"
28         "net/http"
29         "os"
30         "strings"
31         "time"
32
33         app "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 )
35
36 func NewVespaMgr() *VespaMgr {
37         return &VespaMgr{
38                 rmrReady:             false,
39                 chVesagent:           make(chan error),
40                 appmgrHost:           app.Config.GetString("controls.appManager.host"),
41                 appmgrUrl:            app.Config.GetString("controls.appManager.path"),
42                 appmgrNotifUrl:       app.Config.GetString("controls.appManager.notificationUrl"),
43                 appmgrSubsUrl:        app.Config.GetString("controls.appManager.subscriptionUrl"),
44                 appmgrRetry:          app.Config.GetInt("controls.appManager.appmgrRetry"),
45                 hbInterval:           app.Config.GetString("controls.vesagent.hbInterval"),
46                 measInterval:         app.Config.GetString("controls.vesagent.measInterval"),
47                 prometheusAddr:       app.Config.GetString("controls.vesagent.prometheusAddr"),
48                 alertManagerBindAddr: app.Config.GetString("controls.vesagent.alertManagerBindAddr"),
49         }
50 }
51
52 func (v *VespaMgr) Run(sdlcheck, runXapp bool) {
53         app.Logger.SetMdc("vespamgr", fmt.Sprintf("%s:%s", Version, Hash))
54         app.SetReadyCB(func(d interface{}) { v.rmrReady = true }, true)
55         app.Resource.InjectStatusCb(v.StatusCB)
56         app.AddConfigChangeListener(v.ConfigChangeCB)
57
58         measUrl := app.Config.GetString("controls.measurementUrl")
59         app.Resource.InjectRoute(v.appmgrNotifUrl, v.HandlexAppNotification, "POST")
60         app.Resource.InjectRoute(measUrl, v.HandleMeasurements, "POST")
61         app.Resource.InjectRoute("/supervision", v.HandleSupervision, "GET") // @todo: remove this
62
63         go v.SubscribeXappNotif(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrSubsUrl))
64
65         if runXapp {
66                 app.RunWithParams(v, sdlcheck)
67         }
68 }
69
70 func (v *VespaMgr) Consume(rp *app.RMRParams) (err error) {
71         app.Logger.Info("Message received!")
72
73         app.Rmr.Free(rp.Mbuf)
74         return nil
75 }
76
77 func (v *VespaMgr) StatusCB() bool {
78         if !v.rmrReady {
79                 app.Logger.Info("RMR not ready yet!")
80         }
81
82         return v.rmrReady
83 }
84
85 func (v *VespaMgr) ConfigChangeCB(configparam string) {
86         return
87 }
88
89 func (v *VespaMgr) CreateConf(fname string, xappMetrics []byte) {
90         f, err := os.Create(fname)
91         if err != nil {
92                 app.Logger.Error("os.Create failed: %s", err.Error())
93                 return
94         }
95         defer f.Close()
96
97         v.CreateConfig(f, xappMetrics)
98 }
99
100 func (v *VespaMgr) QueryXappConf(appmgrUrl string) (appConfig []byte, err error) {
101         client := http.Client{Timeout: 10 * time.Second}
102
103         for i := 0; i < v.appmgrRetry; i++ {
104                 app.Logger.Info("Getting xApp config from: %s [%d]", appmgrUrl, v.appmgrRetry)
105
106                 resp, err := client.Get(appmgrUrl)
107                 if err != nil || resp == nil {
108                         app.Logger.Error("client.Get failed: %v", err)
109                         time.Sleep(5 * time.Second)
110                         continue
111                 }
112
113                 defer resp.Body.Close()
114                 appConfig, err := ioutil.ReadAll(resp.Body)
115                 if err != nil {
116                         app.Logger.Error("ioutil.ReadAll failed: %v", err)
117                         time.Sleep(5 * time.Second)
118                         continue
119                 }
120
121                 app.Logger.Info("Received xApp config: %d", len(appConfig))
122                 if len(appConfig) > 0 {
123                         return appConfig, err
124                 }
125         }
126
127         return appConfig, err
128 }
129
130 func (v *VespaMgr) ReadPayload(w http.ResponseWriter, r *http.Request) ([]byte, error) {
131         payload, err := ioutil.ReadAll(r.Body)
132         defer r.Body.Close()
133         if err != nil {
134                 app.Logger.Error("ioutil.ReadAll failed: %v", err)
135                 return payload, err
136         }
137         v.respondWithJSON(w, http.StatusOK, err)
138
139         return payload, err
140 }
141
142 func (v *VespaMgr) HandleSupervision(w http.ResponseWriter, r *http.Request) {
143         v.respondWithJSON(w, http.StatusOK, nil)
144 }
145
146 func (v *VespaMgr) HandleMeasurements(w http.ResponseWriter, r *http.Request) {
147         app.Logger.Info("HandleMeasurements called!")
148         if appConfig, err := v.ReadPayload(w, r); err == nil {
149                 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), appConfig)
150         }
151 }
152
153 func (v *VespaMgr) HandlexAppNotification(w http.ResponseWriter, r *http.Request) {
154         if _, err := v.ReadPayload(w, r); err != nil {
155                 return
156         }
157
158         app.Logger.Info("xApp event notification received!")
159         if appConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
160                 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), appConfig)
161                 v.RestartVesagent()
162         }
163 }
164
165 func (v *VespaMgr) DoSubscribe(appmgrUrl string, subscriptionData []byte) string {
166         resp, err := http.Post(appmgrUrl, "application/json", bytes.NewBuffer(subscriptionData))
167         if err != nil || resp == nil || resp.StatusCode != http.StatusCreated {
168                 app.Logger.Error("http.Post failed: %s", err)
169                 return ""
170         }
171
172         body, err := ioutil.ReadAll(resp.Body)
173         if err != nil {
174                 app.Logger.Error("ioutil.ReadAll for body failed: %s", err)
175                 return ""
176         }
177
178         var result map[string]interface{}
179         if err := json.Unmarshal([]byte(body), &result); err != nil {
180                 app.Logger.Error("json.Unmarshal failed: %s", err)
181                 return ""
182         }
183         v.subscriptionId = result["id"].(string)
184         app.Logger.Info("Subscription id from the response: %s", v.subscriptionId)
185
186         return v.subscriptionId
187 }
188
189 func (v *VespaMgr) SubscribeXappNotif(appmgrUrl string) {
190         targetUrl := fmt.Sprintf("%s%s", app.Config.GetString("controls.host"), v.appmgrNotifUrl)
191         subscriptionData := []byte(fmt.Sprintf(`{"Data": {"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}}`, targetUrl))
192
193         for {
194                 app.Logger.Info("Subscribing xApp notification from: %v", appmgrUrl)
195
196                 if id := v.DoSubscribe(appmgrUrl, subscriptionData); id != "" {
197                         app.Logger.Info("Subscription done, id=%s", id)
198                         break
199                 }
200
201                 app.Logger.Info("Subscription failed, retyring after short delay ...")
202                 time.Sleep(5 * time.Second)
203         }
204
205         if xappConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
206                 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), xappConfig)
207                 v.RestartVesagent()
208         }
209 }
210
211 func (v *VespaMgr) respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
212         w.Header().Set("Content-Type", "application/json")
213         w.WriteHeader(code)
214         if payload != nil {
215                 response, _ := json.Marshal(payload)
216                 w.Write(response)
217         }
218 }
219
220 func (v *VespaMgr) StartVesagent() {
221         v.vesAgent = NewCommandRunner("ves-agent", "-i", v.hbInterval, "-m", v.measInterval, "--Debug",
222                 "--Measurement.Prometheus.Address", v.prometheusAddr, "--AlertManager.Bind", v.alertManagerBindAddr)
223
224         v.vesAgent.Run(v.chVesagent)
225 }
226
227 func (v *VespaMgr) RestartVesagent() {
228         if strings.Contains(app.Config.GetString("controls.host"), "localhost") {
229                 return
230         }
231
232         if v.vesAgent != nil {
233                 err := v.vesAgent.Kill()
234                 if err != nil {
235                         app.Logger.Error("Couldn't kill vespa-agent: %s", err.Error())
236                         return
237                 }
238                 <-v.chVesagent
239         }
240
241         v.StartVesagent()
242 }
243
244 func main() {
245         NewVespaMgr().Run(false, true)
246 }