2 * Copyright (c) 2020 AT&T Intellectual Property.
3 * Copyright (c) 2020 Nokiv.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 * platform project (RICP).
33 app "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36 func NewVespaMgr() *VespaMgr {
39 chVesagent: make(chan error),
40 appmgrHost: app.Config.GetString("controls.appManager.host"),
41 appmgrUrl: app.Config.GetString("controls.appManager.path"),
42 appmgrNotifUrl: app.Config.GetString("controls.appManager.notificationUrl"),
43 appmgrSubsUrl: app.Config.GetString("controls.appManager.subscriptionUrl"),
44 appmgrRetry: app.Config.GetInt("controls.appManager.appmgrRetry"),
45 hbInterval: app.Config.GetString("controls.vesagent.hbInterval"),
46 measInterval: app.Config.GetString("controls.vesagent.measInterval"),
47 prometheusAddr: app.Config.GetString("controls.vesagent.prometheusAddr"),
48 alertManagerBindAddr: app.Config.GetString("controls.vesagent.alertManagerBindAddr"),
52 func (v *VespaMgr) Run(sdlcheck, runXapp bool) {
53 app.Logger.SetMdc("vespamgr", fmt.Sprintf("%s:%s", Version, Hash))
54 app.SetReadyCB(func(d interface{}) { v.rmrReady = true }, true)
55 app.Resource.InjectStatusCb(v.StatusCB)
56 app.AddConfigChangeListener(v.ConfigChangeCB)
58 measUrl := app.Config.GetString("controls.measurementUrl")
59 app.Resource.InjectRoute(v.appmgrNotifUrl, v.HandlexAppNotification, "POST")
60 app.Resource.InjectRoute(measUrl, v.HandleMeasurements, "POST")
61 app.Resource.InjectRoute("/supervision", v.HandleSupervision, "GET") // @todo: remove this
63 go v.SubscribeXappNotif(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrSubsUrl))
66 app.RunWithParams(v, sdlcheck)
70 func (v *VespaMgr) Consume(rp *app.RMRParams) (err error) {
71 app.Logger.Info("Message received!")
77 func (v *VespaMgr) StatusCB() bool {
79 app.Logger.Info("RMR not ready yet!")
85 func (v *VespaMgr) ConfigChangeCB(configparam string) {
89 func (v *VespaMgr) CreateConf(fname string, xappMetrics []byte) {
90 f, err := os.Create(fname)
92 app.Logger.Error("os.Create failed: %s", err.Error())
97 v.CreateConfig(f, xappMetrics)
100 func (v *VespaMgr) QueryXappConf(appmgrUrl string) (appConfig []byte, err error) {
101 client := http.Client{Timeout: 10 * time.Second}
103 for i := 0; i < v.appmgrRetry; i++ {
104 app.Logger.Info("Getting xApp config from: %s [%d]", appmgrUrl, v.appmgrRetry)
106 resp, err := client.Get(appmgrUrl)
107 if err != nil || resp == nil {
108 app.Logger.Error("client.Get failed: %v", err)
109 time.Sleep(5 * time.Second)
113 defer resp.Body.Close()
114 appConfig, err := ioutil.ReadAll(resp.Body)
116 app.Logger.Error("ioutil.ReadAll failed: %v", err)
117 time.Sleep(5 * time.Second)
121 app.Logger.Info("Received xApp config: %d", len(appConfig))
122 if len(appConfig) > 0 {
123 return appConfig, err
127 return appConfig, err
130 func (v *VespaMgr) ReadPayload(w http.ResponseWriter, r *http.Request) ([]byte, error) {
131 payload, err := ioutil.ReadAll(r.Body)
134 app.Logger.Error("ioutil.ReadAll failed: %v", err)
137 v.respondWithJSON(w, http.StatusOK, err)
142 func (v *VespaMgr) HandleSupervision(w http.ResponseWriter, r *http.Request) {
143 v.respondWithJSON(w, http.StatusOK, nil)
146 func (v *VespaMgr) HandleMeasurements(w http.ResponseWriter, r *http.Request) {
147 if appConfig, err := v.ReadPayload(w, r); err == nil {
148 filePath := app.Config.GetString("controls.pltFile")
149 if err := ioutil.WriteFile(filePath, appConfig, 0666); err == nil {
150 v.pltFileCreated = true
155 func (v *VespaMgr) HandlexAppNotification(w http.ResponseWriter, r *http.Request) {
156 if _, err := v.ReadPayload(w, r); err != nil {
160 app.Logger.Info("xApp event notification received!")
161 if appConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
162 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), appConfig)
167 func (v *VespaMgr) DoSubscribe(appmgrUrl string, subscriptionData []byte) string {
168 resp, err := http.Post(appmgrUrl, "application/json", bytes.NewBuffer(subscriptionData))
169 if err != nil || resp == nil || resp.StatusCode != http.StatusCreated {
170 app.Logger.Error("http.Post failed: %s", err)
174 body, err := ioutil.ReadAll(resp.Body)
176 app.Logger.Error("ioutil.ReadAll for body failed: %s", err)
180 var result map[string]interface{}
181 if err := json.Unmarshal([]byte(body), &result); err != nil {
182 app.Logger.Error("json.Unmarshal failed: %s", err)
185 v.subscriptionId = result["id"].(string)
186 app.Logger.Info("Subscription id from the response: %s", v.subscriptionId)
188 return v.subscriptionId
191 func (v *VespaMgr) SubscribeXappNotif(appmgrUrl string) {
192 targetUrl := fmt.Sprintf("%s%s", app.Config.GetString("controls.host"), v.appmgrNotifUrl)
193 subscriptionData := []byte(fmt.Sprintf(`{"Data": {"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}}`, targetUrl))
195 for i := 0; i < v.appmgrRetry; i++ {
196 app.Logger.Info("Subscribing xApp notification from: %v", appmgrUrl)
197 if id := v.DoSubscribe(appmgrUrl, subscriptionData); id != "" {
198 app.Logger.Info("Subscription done, id=%s", id)
202 app.Logger.Info("Subscription failed, retyring after short delay ...")
203 time.Sleep(5 * time.Second)
206 if xappConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
207 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), xappConfig)
212 func (v *VespaMgr) respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
213 w.Header().Set("Content-Type", "application/json")
216 response, _ := json.Marshal(payload)
221 func (v *VespaMgr) StartVesagent() {
222 v.vesAgent = NewCommandRunner("ves-agent", "-i", v.hbInterval, "-m", v.measInterval, "--Debug",
223 "--Measurement.Prometheus.Address", v.prometheusAddr, "--AlertManager.Bind", v.alertManagerBindAddr)
225 v.vesAgent.Run(v.chVesagent)
228 func (v *VespaMgr) RestartVesagent() {
229 if strings.Contains(app.Config.GetString("controls.host"), "localhost") {
233 if v.vesAgent != nil {
234 err := v.vesAgent.Kill()
236 app.Logger.Error("Couldn't kill vespa-agent: %s", err.Error())
246 NewVespaMgr().Run(false, true)