2 * Copyright (c) 2020 AT&T Intellectual Property.
3 * Copyright (c) 2020 Nokiv.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 * platform project (RICP).
33 app "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36 func NewVespaMgr() *VespaMgr {
39 chVesagent: make(chan error),
40 appmgrHost: app.Config.GetString("controls.appManager.host"),
41 appmgrUrl: app.Config.GetString("controls.appManager.path"),
42 appmgrNotifUrl: app.Config.GetString("controls.appManager.notificationUrl"),
43 appmgrSubsUrl: app.Config.GetString("controls.appManager.subscriptionUrl"),
44 appmgrRetry: app.Config.GetInt("controls.appManager.appmgrRetry"),
45 hbInterval: app.Config.GetString("controls.vesagent.hbInterval"),
46 measInterval: app.Config.GetString("controls.vesagent.measInterval"),
47 prometheusAddr: app.Config.GetString("controls.vesagent.prometheusAddr"),
48 alertManagerBindAddr: app.Config.GetString("controls.vesagent.alertManagerBindAddr"),
52 func (v *VespaMgr) Run(sdlcheck, runXapp bool) {
53 app.Logger.SetMdc("vespamgr", fmt.Sprintf("%s:%s", Version, Hash))
54 app.SetReadyCB(func(d interface{}) { v.rmrReady = true }, true)
55 app.Resource.InjectStatusCb(v.StatusCB)
56 app.AddConfigChangeListener(v.ConfigChangeCB)
58 measUrl := app.Config.GetString("controls.measurementUrl")
59 app.Resource.InjectRoute(v.appmgrNotifUrl, v.HandlexAppNotification, "POST")
60 app.Resource.InjectRoute(measUrl, v.HandleMeasurements, "POST")
61 app.Resource.InjectRoute("/supervision", v.HandleSupervision, "GET") // @todo: remove this
62 app.Resource.InjectRoute("/ric/v1/symptomdata", v.SymptomDataHandler, "GET")
64 go v.SubscribeXappNotif(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrSubsUrl))
67 app.RunWithParams(v, sdlcheck)
71 func (v *VespaMgr) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
72 appConfig, err := ioutil.ReadFile(app.Config.GetString("controls.vesagent.configFile"))
74 app.Logger.Error("Unable to read config file: %v", err)
76 app.Logger.Info("SymptomDataHandler: appConfig=%+v", string(appConfig))
78 baseDir := app.Resource.CollectDefaultSymptomData("app-config.json", appConfig)
80 app.Resource.SendSymptomDataFile(w, r, baseDir, "symptomdata.zip")
84 func (v *VespaMgr) Consume(rp *app.RMRParams) (err error) {
85 app.Logger.Info("Message received!")
91 func (v *VespaMgr) StatusCB() bool {
93 app.Logger.Info("RMR not ready yet!")
99 func (v *VespaMgr) ConfigChangeCB(configparam string) {
103 func (v *VespaMgr) CreateConf(fname string, xappMetrics []byte) {
104 f, err := os.Create(fname)
106 app.Logger.Error("os.Create failed: %s", err.Error())
111 v.CreateConfig(f, xappMetrics)
114 func (v *VespaMgr) QueryXappConf(appmgrUrl string) (appConfig []byte, err error) {
115 client := http.Client{Timeout: 10 * time.Second}
117 for i := 0; i < v.appmgrRetry; i++ {
118 app.Logger.Info("Getting xApp config from: %s [%d]", appmgrUrl, v.appmgrRetry)
120 resp, err := client.Get(appmgrUrl)
121 if err != nil || resp == nil {
122 app.Logger.Error("client.Get failed: %v", err)
123 time.Sleep(5 * time.Second)
127 defer resp.Body.Close()
128 appConfig, err := ioutil.ReadAll(resp.Body)
130 app.Logger.Error("ioutil.ReadAll failed: %v", err)
131 time.Sleep(5 * time.Second)
135 app.Logger.Info("Received xApp config: %d", len(appConfig))
136 if len(appConfig) > 0 {
137 return appConfig, err
141 return appConfig, err
144 func (v *VespaMgr) ReadPayload(w http.ResponseWriter, r *http.Request) ([]byte, error) {
145 payload, err := ioutil.ReadAll(r.Body)
148 app.Logger.Error("ioutil.ReadAll failed: %v", err)
151 v.respondWithJSON(w, http.StatusOK, err)
156 func (v *VespaMgr) HandleSupervision(w http.ResponseWriter, r *http.Request) {
157 v.respondWithJSON(w, http.StatusOK, nil)
160 func (v *VespaMgr) HandleMeasurements(w http.ResponseWriter, r *http.Request) {
161 if appConfig, err := v.ReadPayload(w, r); err == nil {
162 filePath := app.Config.GetString("controls.pltFile")
163 if err := ioutil.WriteFile(filePath, appConfig, 0666); err == nil {
164 v.pltFileCreated = true
169 func (v *VespaMgr) HandlexAppNotification(w http.ResponseWriter, r *http.Request) {
170 if _, err := v.ReadPayload(w, r); err != nil {
174 app.Logger.Info("xApp event notification received!")
175 if appConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
176 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), appConfig)
181 func (v *VespaMgr) DoSubscribe(appmgrUrl string, subscriptionData []byte) string {
182 resp, err := http.Post(appmgrUrl, "application/json", bytes.NewBuffer(subscriptionData))
183 if err != nil || resp == nil || resp.StatusCode != http.StatusCreated {
184 app.Logger.Error("http.Post failed: %s", err)
188 body, err := ioutil.ReadAll(resp.Body)
190 app.Logger.Error("ioutil.ReadAll for body failed: %s", err)
194 var result map[string]interface{}
195 if err := json.Unmarshal([]byte(body), &result); err != nil {
196 app.Logger.Error("json.Unmarshal failed: %s", err)
199 v.subscriptionId = result["id"].(string)
200 app.Logger.Info("Subscription id from the response: %s", v.subscriptionId)
202 return v.subscriptionId
205 func (v *VespaMgr) SubscribeXappNotif(appmgrUrl string) {
206 targetUrl := fmt.Sprintf("%s%s", app.Config.GetString("controls.host"), v.appmgrNotifUrl)
207 subscriptionData := []byte(fmt.Sprintf(`{"Data": {"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}}`, targetUrl))
209 for i := 0; i < v.appmgrRetry; i++ {
210 app.Logger.Info("Subscribing xApp notification from: %v", appmgrUrl)
211 if id := v.DoSubscribe(appmgrUrl, subscriptionData); id != "" {
212 app.Logger.Info("Subscription done, id=%s", id)
216 app.Logger.Info("Subscription failed, retyring after short delay ...")
217 time.Sleep(5 * time.Second)
220 if xappConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
221 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), xappConfig)
226 func (v *VespaMgr) respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
227 w.Header().Set("Content-Type", "application/json")
230 response, _ := json.Marshal(payload)
235 func (v *VespaMgr) StartVesagent() {
236 v.vesAgent = NewCommandRunner("ves-agent", "-i", v.hbInterval, "-m", v.measInterval, "--Debug",
237 "--Measurement.Prometheus.Address", v.prometheusAddr, "--AlertManager.Bind", v.alertManagerBindAddr)
239 v.vesAgent.Run(v.chVesagent)
242 func (v *VespaMgr) RestartVesagent() {
243 if strings.Contains(app.Config.GetString("controls.host"), "localhost") {
247 if v.vesAgent != nil {
248 err := v.vesAgent.Kill()
250 app.Logger.Error("Couldn't kill vespa-agent: %s", err.Error())
260 NewVespaMgr().Run(false, true)