package main
import (
+ "errors"
+ "io/ioutil"
+ "net"
+ "net/http"
"os"
"os/exec"
+ "time"
+
mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
)
+var appmgrDomain string
+
+const appmgrXAppConfigPath = "/ric/v1/config"
+const appmgrPort = "8080"
+
type VesAgent struct {
- Pid int
- name string
+ Pid int
+ name string
+ process *os.Process
+}
+
+type VesMgr struct {
+ myIPAddress string
+ appmgrSubsId string
+}
+
+type subsChannel struct {
+ subscribed bool
+ err error
}
var vesagent VesAgent
+var vesmgr VesMgr
var logger *mdcloggo.MdcLogger
-var osExit = os.Exit
+
+const vesmgrXappNotifPort = "8080"
+const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
+const timeoutPostXAppSubscriptions = 5
func init() {
logger, _ = mdcloggo.InitLogger("vesmgr")
}
-/* Function to initialize vesmgr */
-func vesmgrInit() {
- vesagent.name = "ves-agent"
- logger.MdcAdd("vesmgr", "0.0.1")
- logger.Info("vesmgrInit")
-
- /* Subscribe notifications from xAPP Mgr */
- //subscribexAppNotifications()
+func getMyIP() (myIP string, retErr error) {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ logger.Error("net.InterfaceAddrs failed: %s", err.Error())
+ return "", err
+ }
+ for _, addr := range addrs {
+ // check the address type and if it is not a loopback take it
+ if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if ipnet.IP.To4() != nil {
+ logger.Info("My IP Address: %s", ipnet.IP.String())
+ return ipnet.IP.String(), nil
+ }
+ }
+ }
+ return "", nil
+}
- // create configuration
+func createConf(xappMetrics []byte) {
f, err := os.Create("/etc/ves-agent/ves-agent.yaml")
if err != nil {
logger.Error("Cannot create vespa conf file: %s", err.Error())
- return
+ os.Exit(1)
}
defer f.Close()
- createVespaConfig(f)
+ createVespaConfig(f, xappMetrics)
+ logger.Info("Vespa config created")
+}
+
+func subscribeXAppNotifications(chSubscriptions chan subsChannel) {
+ xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
+ subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
+ go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl)
+ logger.Info("xApp notifications subscribed from %s", subsUrl)
+}
+
+func vesmgrInit() {
+ vesagent.name = "ves-agent"
+ logger.Info("vesmgrInit")
+
+ var err error
+ if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
+ logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error())
+ return
+ }
+
+ var ok bool
+ appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN")
+ if ok {
+ logger.Info("Using appmgrdomain %s", appmgrDomain)
+ } else {
+ appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
+ logger.Info("Using default appmgrdomain %s", appmgrDomain)
+ }
+ chXAppSubscriptions := make(chan subsChannel)
+ chXAppNotifications := make(chan []byte)
+ chSupervision := make(chan chan string)
+ chVesagent := make(chan error)
+
+ listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort)
+ startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision)
+
+ subscribeXAppNotifications(chXAppSubscriptions)
- /* Start ves-agent */
- ch := startVesagent()
+ createConf([]byte{})
+ startVesagent(chVesagent)
- runVesmgr(ch)
+ runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions)
}
-func startVesagent() chan error {
- /* Start ves-agent */
+func startVesagent(ch chan error) {
cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"))
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
- ch := make(chan error)
if err := cmd.Start(); err != nil {
logger.Error("vesmgr exiting, ves-agent start failed: %s", err)
go func() {
} else {
logger.Info("ves-agent started with pid %d", cmd.Process.Pid)
vesagent.Pid = cmd.Process.Pid
+ vesagent.process = cmd.Process
go func() {
// wait ves-agent exit and then post the error to the channel
err := cmd.Wait()
ch <- err
}()
}
+}
- return ch
+func killVespa(process *os.Process) {
+ logger.Info("Killing vespa")
+ err := process.Kill()
+ if err != nil {
+ logger.Error("Cannot kill vespa: %s", err.Error())
+ }
}
-func runVesmgr(ch chan error) {
+func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) {
+
+ logger.Info("query xAppStatus started, url %s", appmgrUrl)
+ req, err := http.NewRequest("GET", appmgrUrl, nil)
+ if err != nil {
+ logger.Error("Failed to create a HTTP request: %s", err)
+ return nil, err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ client := &http.Client{}
+ client.Timeout = time.Second * timeout
+ resp, err := client.Do(req)
+ if err != nil {
+ logger.Error("Query xApp status failed: %s", err)
+ return nil, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusOK {
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ logger.Error("Failed to read xApp status body: %s", err)
+ return nil, err
+ }
+ logger.Info("query xAppStatus completed")
+ return body, nil
+ } else {
+ logger.Error("Error from xApp status query: %s", resp.Status)
+ return nil, errors.New(resp.Status)
+ }
+}
+
+type state int
+
+const (
+ normalState state = iota
+ vespaTerminatingState state = iota
+)
+
+func queryConf() ([]byte, error) {
+ return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
+ 10*time.Second)
+}
+
+func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) {
+
+ logger.Info("vesmgr main loop ready")
+ mystate := normalState
+ var xappStatus []byte
for {
- err := <-ch
- logger.Error("Vesagent exited: " + err.Error())
- osExit(1)
+ select {
+ case supervision := <-chSupervision:
+ logger.Info("vesmgr: supervision")
+ supervision <- "OK"
+ case xAppNotif := <-chXAppNotifications:
+ logger.Info("vesmgr: xApp notification")
+ logger.Info(string(xAppNotif))
+ /*
+ * If xapp status query fails then we cannot create
+ * a new configuration and kill vespa.
+ * In that case we assume that
+ * the situation is fixed when the next
+ * xapp notif comes
+ */
+ var err error
+ xappStatus, err = queryConf()
+ if err == nil {
+ killVespa(vesagent.process)
+ mystate = vespaTerminatingState
+ }
+ case err := <-chVesagent:
+ switch mystate {
+ case vespaTerminatingState:
+ logger.Info("Vesagent termination completed")
+ createConf(xappStatus)
+ startVesagent(chVesagent)
+ mystate = normalState
+ default:
+ logger.Error("Vesagent exited: " + err.Error())
+ os.Exit(1)
+ }
+ case isSubscribed := <-chXAppSubscriptions:
+ if isSubscribed.err != nil {
+ logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
+ os.Exit(1)
+ }
+ }
}
}