2 * Copyright (c) 2020 AT&T Intellectual Property.
3 * Copyright (c) 2020 Nokiv.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 * platform project (RICP).
33 app "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36 func NewVespaMgr() *VespaMgr {
39 chVesagent: make(chan error),
40 appmgrHost: app.Config.GetString("controls.appManager.host"),
41 appmgrUrl: app.Config.GetString("controls.appManager.path"),
42 appmgrNotifUrl: app.Config.GetString("controls.appManager.notificationUrl"),
43 appmgrSubsUrl: app.Config.GetString("controls.appManager.subscriptionUrl"),
44 appmgrRetry: app.Config.GetInt("controls.appManager.appmgrRetry"),
45 hbInterval: app.Config.GetString("controls.vesagent.hbInterval"),
46 measInterval: app.Config.GetString("controls.vesagent.measInterval"),
47 prometheusAddr: app.Config.GetString("controls.vesagent.prometheusAddr"),
48 alertManagerBindAddr: app.Config.GetString("controls.vesagent.alertManagerBindAddr"),
52 func (v *VespaMgr) Run(sdlcheck, runXapp bool) {
53 app.Logger.SetMdc("vespamgr", fmt.Sprintf("%s:%s", Version, Hash))
54 app.SetReadyCB(func(d interface{}) { v.rmrReady = true }, true)
55 app.Resource.InjectStatusCb(v.StatusCB)
56 app.AddConfigChangeListener(v.ConfigChangeCB)
58 measUrl := app.Config.GetString("controls.measurementUrl")
59 app.Resource.InjectRoute(v.appmgrNotifUrl, v.HandlexAppNotification, "POST")
60 app.Resource.InjectRoute(measUrl, v.HandleMeasurements, "POST")
61 app.Resource.InjectRoute("/supervision", v.HandleSupervision, "GET") // @todo: remove this
63 go v.SubscribeXappNotif(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrSubsUrl))
66 app.RunWithParams(v, sdlcheck)
70 func (v *VespaMgr) Consume(rp *app.RMRParams) (err error) {
71 app.Logger.Info("Message received!")
77 func (v *VespaMgr) StatusCB() bool {
79 app.Logger.Info("RMR not ready yet!")
85 func (v *VespaMgr) ConfigChangeCB(configparam string) {
89 func (v *VespaMgr) CreateConf(fname string, xappMetrics []byte) {
90 f, err := os.Create(fname)
92 app.Logger.Error("os.Create failed: %s", err.Error())
97 v.CreateConfig(f, xappMetrics)
100 func (v *VespaMgr) QueryXappConf(appmgrUrl string) (appConfig []byte, err error) {
101 client := http.Client{Timeout: 10 * time.Second}
103 for i := 0; i < v.appmgrRetry; i++ {
104 app.Logger.Info("Getting xApp config from: %s [%d]", appmgrUrl, v.appmgrRetry)
106 resp, err := client.Get(appmgrUrl)
107 if err != nil || resp == nil {
108 app.Logger.Error("client.Get failed: %v", err)
109 time.Sleep(5 * time.Second)
113 defer resp.Body.Close()
114 appConfig, err := ioutil.ReadAll(resp.Body)
116 app.Logger.Error("ioutil.ReadAll failed: %v", err)
117 time.Sleep(5 * time.Second)
121 app.Logger.Info("Received xApp config: %d", len(appConfig))
122 if len(appConfig) > 0 {
123 return appConfig, err
127 return appConfig, err
130 func (v *VespaMgr) ReadPayload(w http.ResponseWriter, r *http.Request) ([]byte, error) {
131 payload, err := ioutil.ReadAll(r.Body)
134 app.Logger.Error("ioutil.ReadAll failed: %v", err)
137 v.respondWithJSON(w, http.StatusOK, err)
142 func (v *VespaMgr) HandleSupervision(w http.ResponseWriter, r *http.Request) {
143 v.respondWithJSON(w, http.StatusOK, nil)
146 func (v *VespaMgr) HandleMeasurements(w http.ResponseWriter, r *http.Request) {
147 app.Logger.Info("HandleMeasurements called!")
148 if appConfig, err := v.ReadPayload(w, r); err == nil {
149 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), appConfig)
153 func (v *VespaMgr) HandlexAppNotification(w http.ResponseWriter, r *http.Request) {
154 if _, err := v.ReadPayload(w, r); err != nil {
158 app.Logger.Info("xApp event notification received!")
159 if appConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
160 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), appConfig)
165 func (v *VespaMgr) DoSubscribe(appmgrUrl string, subscriptionData []byte) string {
166 resp, err := http.Post(appmgrUrl, "application/json", bytes.NewBuffer(subscriptionData))
167 if err != nil || resp == nil || resp.StatusCode != http.StatusCreated {
168 app.Logger.Error("http.Post failed: %s", err)
172 body, err := ioutil.ReadAll(resp.Body)
174 app.Logger.Error("ioutil.ReadAll for body failed: %s", err)
178 var result map[string]interface{}
179 if err := json.Unmarshal([]byte(body), &result); err != nil {
180 app.Logger.Error("json.Unmarshal failed: %s", err)
183 v.subscriptionId = result["id"].(string)
184 app.Logger.Info("Subscription id from the response: %s", v.subscriptionId)
186 return v.subscriptionId
189 func (v *VespaMgr) SubscribeXappNotif(appmgrUrl string) {
190 targetUrl := fmt.Sprintf("%s%s", app.Config.GetString("controls.host"), v.appmgrNotifUrl)
191 subscriptionData := []byte(fmt.Sprintf(`{"Data": {"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}}`, targetUrl))
194 app.Logger.Info("Subscribing xApp notification from: %v", appmgrUrl)
196 if id := v.DoSubscribe(appmgrUrl, subscriptionData); id != "" {
197 app.Logger.Info("Subscription done, id=%s", id)
201 app.Logger.Info("Subscription failed, retyring after short delay ...")
202 time.Sleep(5 * time.Second)
205 if xappConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
206 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), xappConfig)
211 func (v *VespaMgr) respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
212 w.Header().Set("Content-Type", "application/json")
215 response, _ := json.Marshal(payload)
220 func (v *VespaMgr) StartVesagent() {
221 v.vesAgent = NewCommandRunner("ves-agent", "-i", v.hbInterval, "-m", v.measInterval, "--Debug",
222 "--Measurement.Prometheus.Address", v.prometheusAddr, "--AlertManager.Bind", v.alertManagerBindAddr)
224 v.vesAgent.Run(v.chVesagent)
227 func (v *VespaMgr) RestartVesagent() {
228 if strings.Contains(app.Config.GetString("controls.host"), "localhost") {
232 if v.vesAgent != nil {
233 err := v.vesAgent.Kill()
235 app.Logger.Error("Couldn't kill vespa-agent: %s", err.Error())
245 NewVespaMgr().Run(false, true)