Update RMR version v4.4.6
[ric-plt/vespamgr.git] / cmd / vespamgr / vespamgr.go
1 /*
2  *  Copyright (c) 2020 AT&T Intellectual Property.
3  *  Copyright (c) 2020 Nokiv.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
18  * platform project (RICP).
19  */
20
21 package main
22
23 import (
24         "bytes"
25         "encoding/json"
26         "fmt"
27         "io/ioutil"
28         "net/http"
29         "os"
30         "strings"
31         "time"
32
33         app "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 )
35
36 func NewVespaMgr() *VespaMgr {
37         return &VespaMgr{
38                 rmrReady:             false,
39                 chVesagent:           make(chan error),
40                 appmgrHost:           app.Config.GetString("controls.appManager.host"),
41                 appmgrUrl:            app.Config.GetString("controls.appManager.path"),
42                 appmgrNotifUrl:       app.Config.GetString("controls.appManager.notificationUrl"),
43                 appmgrSubsUrl:        app.Config.GetString("controls.appManager.subscriptionUrl"),
44                 appmgrRetry:          app.Config.GetInt("controls.appManager.appmgrRetry"),
45                 hbInterval:           app.Config.GetString("controls.vesagent.hbInterval"),
46                 measInterval:         app.Config.GetString("controls.vesagent.measInterval"),
47                 prometheusAddr:       app.Config.GetString("controls.vesagent.prometheusAddr"),
48                 alertManagerBindAddr: app.Config.GetString("controls.vesagent.alertManagerBindAddr"),
49         }
50 }
51
52 func (v *VespaMgr) Run(sdlcheck, runXapp bool) {
53         app.Logger.SetMdc("vespamgr", fmt.Sprintf("%s:%s", Version, Hash))
54         app.SetReadyCB(func(d interface{}) { v.rmrReady = true }, true)
55         app.Resource.InjectStatusCb(v.StatusCB)
56         app.AddConfigChangeListener(v.ConfigChangeCB)
57
58         measUrl := app.Config.GetString("controls.measurementUrl")
59         app.Resource.InjectRoute(v.appmgrNotifUrl, v.HandlexAppNotification, "POST")
60         app.Resource.InjectRoute(measUrl, v.HandleMeasurements, "POST")
61         app.Resource.InjectRoute("/supervision", v.HandleSupervision, "GET") // @todo: remove this
62
63         go v.SubscribeXappNotif(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrSubsUrl))
64
65         if runXapp {
66                 app.RunWithParams(v, sdlcheck)
67         }
68 }
69
70 func (v *VespaMgr) Consume(rp *app.RMRParams) (err error) {
71         app.Logger.Info("Message received!")
72
73         app.Rmr.Free(rp.Mbuf)
74         return nil
75 }
76
77 func (v *VespaMgr) StatusCB() bool {
78         if !v.rmrReady {
79                 app.Logger.Info("RMR not ready yet!")
80         }
81
82         return v.rmrReady
83 }
84
85 func (v *VespaMgr) ConfigChangeCB(configparam string) {
86         return
87 }
88
89 func (v *VespaMgr) CreateConf(fname string, xappMetrics []byte) {
90         f, err := os.Create(fname)
91         if err != nil {
92                 app.Logger.Error("os.Create failed: %s", err.Error())
93                 return
94         }
95         defer f.Close()
96
97         v.CreateConfig(f, xappMetrics)
98 }
99
100 func (v *VespaMgr) QueryXappConf(appmgrUrl string) (appConfig []byte, err error) {
101         client := http.Client{Timeout: 10 * time.Second}
102
103         for i := 0; i < v.appmgrRetry; i++ {
104                 app.Logger.Info("Getting xApp config from: %s [%d]", appmgrUrl, v.appmgrRetry)
105
106                 resp, err := client.Get(appmgrUrl)
107                 if err != nil || resp == nil {
108                         app.Logger.Error("client.Get failed: %v", err)
109                         time.Sleep(5 * time.Second)
110                         continue
111                 }
112
113                 defer resp.Body.Close()
114                 appConfig, err := ioutil.ReadAll(resp.Body)
115                 if err != nil {
116                         app.Logger.Error("ioutil.ReadAll failed: %v", err)
117                         time.Sleep(5 * time.Second)
118                         continue
119                 }
120
121                 app.Logger.Info("Received xApp config: %d", len(appConfig))
122                 if len(appConfig) > 0 {
123                         return appConfig, err
124                 }
125         }
126
127         return appConfig, err
128 }
129
130 func (v *VespaMgr) ReadPayload(w http.ResponseWriter, r *http.Request) ([]byte, error) {
131         payload, err := ioutil.ReadAll(r.Body)
132         defer r.Body.Close()
133         if err != nil {
134                 app.Logger.Error("ioutil.ReadAll failed: %v", err)
135                 return payload, err
136         }
137         v.respondWithJSON(w, http.StatusOK, err)
138
139         return payload, err
140 }
141
142 func (v *VespaMgr) HandleSupervision(w http.ResponseWriter, r *http.Request) {
143         v.respondWithJSON(w, http.StatusOK, nil)
144 }
145
146 func (v *VespaMgr) HandleMeasurements(w http.ResponseWriter, r *http.Request) {
147         if appConfig, err := v.ReadPayload(w, r); err == nil {
148                 filePath := app.Config.GetString("controls.pltFile")
149                 if err := ioutil.WriteFile(filePath, appConfig, 0666); err == nil {
150                         v.pltFileCreated = true
151                 }
152         }
153 }
154
155 func (v *VespaMgr) HandlexAppNotification(w http.ResponseWriter, r *http.Request) {
156         if _, err := v.ReadPayload(w, r); err != nil {
157                 return
158         }
159
160         app.Logger.Info("xApp event notification received!")
161         if appConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
162                 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), appConfig)
163                 v.RestartVesagent()
164         }
165 }
166
167 func (v *VespaMgr) DoSubscribe(appmgrUrl string, subscriptionData []byte) string {
168         resp, err := http.Post(appmgrUrl, "application/json", bytes.NewBuffer(subscriptionData))
169         if err != nil || resp == nil || resp.StatusCode != http.StatusCreated {
170                 app.Logger.Error("http.Post failed: %s", err)
171                 return ""
172         }
173
174         body, err := ioutil.ReadAll(resp.Body)
175         if err != nil {
176                 app.Logger.Error("ioutil.ReadAll for body failed: %s", err)
177                 return ""
178         }
179
180         var result map[string]interface{}
181         if err := json.Unmarshal([]byte(body), &result); err != nil {
182                 app.Logger.Error("json.Unmarshal failed: %s", err)
183                 return ""
184         }
185         v.subscriptionId = result["id"].(string)
186         app.Logger.Info("Subscription id from the response: %s", v.subscriptionId)
187
188         return v.subscriptionId
189 }
190
191 func (v *VespaMgr) SubscribeXappNotif(appmgrUrl string) {
192         targetUrl := fmt.Sprintf("%s%s", app.Config.GetString("controls.host"), v.appmgrNotifUrl)
193         subscriptionData := []byte(fmt.Sprintf(`{"Data": {"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}}`, targetUrl))
194
195         for i := 0; i < v.appmgrRetry; i++ {
196                 app.Logger.Info("Subscribing xApp notification from: %v", appmgrUrl)
197                 if id := v.DoSubscribe(appmgrUrl, subscriptionData); id != "" {
198                         app.Logger.Info("Subscription done, id=%s", id)
199                         break
200                 }
201
202                 app.Logger.Info("Subscription failed, retyring after short delay ...")
203                 time.Sleep(5 * time.Second)
204         }
205
206         if xappConfig, err := v.QueryXappConf(fmt.Sprintf("%s%s", v.appmgrHost, v.appmgrUrl)); err == nil {
207                 v.CreateConf(app.Config.GetString("controls.vesagent.configFile"), xappConfig)
208                 v.RestartVesagent()
209         }
210 }
211
212 func (v *VespaMgr) respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
213         w.Header().Set("Content-Type", "application/json")
214         w.WriteHeader(code)
215         if payload != nil {
216                 response, _ := json.Marshal(payload)
217                 w.Write(response)
218         }
219 }
220
221 func (v *VespaMgr) StartVesagent() {
222         v.vesAgent = NewCommandRunner("ves-agent", "-i", v.hbInterval, "-m", v.measInterval, "--Debug",
223                 "--Measurement.Prometheus.Address", v.prometheusAddr, "--AlertManager.Bind", v.alertManagerBindAddr)
224
225         v.vesAgent.Run(v.chVesagent)
226 }
227
228 func (v *VespaMgr) RestartVesagent() {
229         if strings.Contains(app.Config.GetString("controls.host"), "localhost") {
230                 return
231         }
232
233         if v.vesAgent != nil {
234                 err := v.vesAgent.Kill()
235                 if err != nil {
236                         app.Logger.Error("Couldn't kill vespa-agent: %s", err.Error())
237                         return
238                 }
239                 <-v.chVesagent
240         }
241
242         v.StartVesagent()
243 }
244
245 func main() {
246         NewVespaMgr().Run(false, true)
247 }