* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+ *
*/
package main
"net"
"net/http"
"os"
- "os/exec"
"time"
mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
const appmgrXAppConfigPath = "/ric/v1/config"
const appmgrPort = "8080"
-type VesAgent struct {
- Pid int
- name string
- process *os.Process
-}
-
+// VesMgr contains runtime information of the vesmgr process
type VesMgr struct {
- myIPAddress string
- appmgrSubsId string
+ myIPAddress string
+ chXAppSubscriptions chan subscriptionNotification
+ chXAppNotifications chan []byte
+ chSupervision chan chan string
+ chVesagent chan error
+ vesagent cmdRunner
+ httpServer HTTPServer
}
-type subsChannel struct {
+type subscriptionNotification struct {
subscribed bool
err error
+ subsID string
}
-var vesagent VesAgent
-var vesmgr VesMgr
var logger *mdcloggo.MdcLogger
+// Version information, which is filled during compilation
+// Version tag of vesmgr container
+var Version string
+
+// Hash of the git commit used in building
+var Hash string
+
const vesmgrXappNotifPort = "8080"
const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
const timeoutPostXAppSubscriptions = 5
+const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml"
func init() {
logger, _ = mdcloggo.InitLogger("vesmgr")
return "", nil
}
-func createConf(xappMetrics []byte) {
- f, err := os.Create("/etc/ves-agent/ves-agent.yaml")
+func createConf(fname string, xappMetrics []byte) {
+ f, err := os.Create(fname)
if err != nil {
logger.Error("Cannot create vespa conf file: %s", err.Error())
os.Exit(1)
logger.Info("Vespa config created")
}
-func subscribeXAppNotifications(chSubscriptions chan subsChannel) {
- xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
- subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
- go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl)
- logger.Info("xApp notifications subscribed from %s", subsUrl)
+func (vesmgr *VesMgr) subscribeXAppNotifications() {
+ xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
+ subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
+ go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL)
+ logger.Info("xApp notifications subscribed from %s", subsURL)
}
-func vesmgrInit() {
- vesagent.name = "ves-agent"
+// Init initializes the vesmgr
+func (vesmgr *VesMgr) Init(listenPort string) *VesMgr {
logger.Info("vesmgrInit")
+ logger.Info("version: %s (%s)", Version, Hash)
var err error
if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
- logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error())
- return
+ logger.Error("Cannot get myIPAddress: IP %s", vesmgr.myIPAddress)
+ panic("Cannot get my IP address")
}
var ok bool
appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
logger.Info("Using default appmgrdomain %s", appmgrDomain)
}
- chXAppSubscriptions := make(chan subsChannel)
- chXAppNotifications := make(chan []byte)
- chSupervision := make(chan chan string)
- chVesagent := make(chan error)
-
- listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort)
- startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision)
-
- subscribeXAppNotifications(chXAppSubscriptions)
-
- runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions)
+ vesmgr.chXAppSubscriptions = make(chan subscriptionNotification)
+ // Create notifications as buffered channel so that
+ // xappmgr does not block if we are stuck somewhere
+ vesmgr.chXAppNotifications = make(chan []byte, 10)
+ vesmgr.chSupervision = make(chan chan string)
+ vesmgr.chVesagent = make(chan error)
+ vesmgr.httpServer = HTTPServer{}
+ vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort)
+ vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"),
+ "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address",
+ os.Getenv("VESMGR_PROMETHEUS_ADDR"), "--AlertManager.Bind", os.Getenv("VESMGR_ALERTMANAGER_BIND_ADDR"),
+ "--Debug")
+ return vesmgr
}
-func startVesagent(ch chan error) {
- cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"))
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- if err := cmd.Start(); err != nil {
- logger.Error("vesmgr exiting, ves-agent start failed: %s", err)
- go func() {
- ch <- err
- }()
- } else {
- logger.Info("ves-agent started with pid %d", cmd.Process.Pid)
- vesagent.Pid = cmd.Process.Pid
- vesagent.process = cmd.Process
- go func() {
- // wait ves-agent exit and then post the error to the channel
- err := cmd.Wait()
- ch <- err
- }()
- }
+func (vesmgr *VesMgr) startVesagent() {
+ vesmgr.vesagent.run(vesmgr.chVesagent)
}
-func killVespa(process *os.Process) {
+func (vesmgr *VesMgr) killVespa() error {
logger.Info("Killing vespa")
- err := process.Kill()
+ err := vesmgr.vesagent.kill()
if err != nil {
logger.Error("Cannot kill vespa: %s", err.Error())
+ return err
}
+ return <-vesmgr.chVesagent // wait vespa exit
}
-func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) {
-
- logger.Info("query xAppStatus started, url %s", appmgrUrl)
- req, err := http.NewRequest("GET", appmgrUrl, nil)
+func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) {
+ emptyConfig := []byte("{}")
+ logger.Info("query xAppConfig started, url %s", appmgrURL)
+ req, err := http.NewRequest("GET", appmgrURL, nil)
if err != nil {
logger.Error("Failed to create a HTTP request: %s", err)
- return nil, err
+ return emptyConfig, err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
client.Timeout = time.Second * timeout
resp, err := client.Do(req)
if err != nil {
- logger.Error("Query xApp status failed: %s", err)
- return nil, err
+ logger.Error("Query xApp config failed: %s", err)
+ return emptyConfig, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
- logger.Error("Failed to read xApp status body: %s", err)
- return nil, err
+ logger.Error("Failed to read xApp config body: %s", err)
+ return emptyConfig, err
}
- logger.Info("query xAppStatus completed")
+ logger.Info("query xAppConfig completed")
return body, nil
- } else {
- logger.Error("Error from xApp status query: %s", resp.Status)
- return nil, errors.New(resp.Status)
}
+ logger.Error("Error from xApp config query: %s", resp.Status)
+ return emptyConfig, errors.New(resp.Status)
}
-type state int
-
-const (
- normalState state = iota
- vespaTerminatingState state = iota
-)
+func queryConf() (appConfig []byte, err error) {
+ for i := 0; i < 10; i++ {
+ appConfig, err = queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, 10*time.Second)
+ if len(appConfig) > 0 {
+ break
+ }
+ time.Sleep(5 * time.Second)
+ }
+ return appConfig, err
+}
-func queryConf() ([]byte, error) {
- return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
- 10*time.Second)
+func (vesmgr *VesMgr) emptyNotificationsChannel() {
+ for {
+ select {
+ case <-vesmgr.chXAppNotifications:
+ // we don't care the content
+ default:
+ return
+ }
+ }
}
-func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) {
+func (vesmgr *VesMgr) servRequest() {
+ select {
+ case supervision := <-vesmgr.chSupervision:
+ logger.Info("vesmgr: supervision")
+ supervision <- "OK"
+ case xAppNotif := <-vesmgr.chXAppNotifications:
+ logger.Info("vesmgr: xApp notification")
+ logger.Info(string(xAppNotif))
+ vesmgr.emptyNotificationsChannel()
+ /*
+ * If xapp config query fails then we cannot create
+ * a new configuration and kill vespa.
+ * In that case we assume that
+ * the situation is fixed when the next
+ * xapp notif comes
+ */
+ xappConfig, err := queryConf()
+ if err == nil {
+ vesmgr.killVespa()
+ createConf(vespaConfigFile, xappConfig)
+ vesmgr.startVesagent()
+ }
+ case err := <-vesmgr.chVesagent:
+ logger.Error("Vesagent exited: " + err.Error())
+ os.Exit(1)
+ }
+}
- logger.Info("vesmgr main loop ready")
- mystate := normalState
- var xappStatus []byte
- var err error
+func (vesmgr *VesMgr) waitSubscriptionLoop() {
for {
select {
- case supervision := <-chSupervision:
+ case supervision := <-vesmgr.chSupervision:
logger.Info("vesmgr: supervision")
supervision <- "OK"
- case xAppNotif := <-chXAppNotifications:
- logger.Info("vesmgr: xApp notification")
- logger.Info(string(xAppNotif))
- /*
- * If xapp status query fails then we cannot create
- * a new configuration and kill vespa.
- * In that case we assume that
- * the situation is fixed when the next
- * xapp notif comes
- */
- xappStatus, err = queryConf()
- if err == nil {
- killVespa(vesagent.process)
- mystate = vespaTerminatingState
- }
- case err := <-chVesagent:
- switch mystate {
- case vespaTerminatingState:
- logger.Info("Vesagent termination completed")
- createConf(xappStatus)
- startVesagent(chVesagent)
- mystate = normalState
- default:
- logger.Error("Vesagent exited: " + err.Error())
- os.Exit(1)
- }
- case isSubscribed := <-chXAppSubscriptions:
+ case isSubscribed := <-vesmgr.chXAppSubscriptions:
if isSubscribed.err != nil {
logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
os.Exit(1)
}
- xappStatus, err = queryConf()
- if err == nil {
- createConf(xappStatus)
- startVesagent(chVesagent)
- }
+ return
}
}
}
+
+// Run the vesmgr process main loop
+func (vesmgr *VesMgr) Run() {
+ logger.Info("vesmgr main loop ready")
+
+ vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision)
+
+ vesmgr.subscribeXAppNotifications()
+
+ vesmgr.waitSubscriptionLoop()
+
+ xappConfig, _ := queryConf()
+
+ createConf(vespaConfigFile, xappConfig)
+
+ vesmgr.startVesagent()
+ for {
+ vesmgr.servRequest()
+ }
+}