X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=cmd%2Fvesmgr%2Fvesmgr.go;h=18ed7935fd048a14c5d4449dcf98a9bad0e24f5f;hb=412df96a23a30a82d2a031556888aeaf9604ada8;hp=0867517d9a638e354c262431d2624522069a67a8;hpb=66b7813221b6da5c15da40953d09d3846a4bc029;p=ric-plt%2Fvespamgr.git diff --git a/cmd/vesmgr/vesmgr.go b/cmd/vesmgr/vesmgr.go index 0867517..18ed793 100755 --- a/cmd/vesmgr/vesmgr.go +++ b/cmd/vesmgr/vesmgr.go @@ -18,55 +18,125 @@ package main import ( + "errors" + "io/ioutil" + "net" + "net/http" "os" "os/exec" + "time" + mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git" ) +var appmgrDomain string + +const appmgrXAppConfigPath = "/ric/v1/config" +const appmgrPort = "8080" + type VesAgent struct { - Pid int - name string + Pid int + name string + process *os.Process +} + +type VesMgr struct { + myIPAddress string + appmgrSubsId string +} + +type subsChannel struct { + subscribed bool + err error } var vesagent VesAgent +var vesmgr VesMgr var logger *mdcloggo.MdcLogger -var osExit = os.Exit + +const vesmgrXappNotifPort = "8080" +const vesmgrXappNotifPath = "/vesmgr_xappnotif/" +const timeoutPostXAppSubscriptions = 5 func init() { logger, _ = mdcloggo.InitLogger("vesmgr") } -/* Function to initialize vesmgr */ -func vesmgrInit() { - vesagent.name = "ves-agent" - logger.MdcAdd("vesmgr", "0.0.1") - logger.Info("vesmgrInit") - - /* Subscribe notifications from xAPP Mgr */ - //subscribexAppNotifications() +func getMyIP() (myIP string, retErr error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + logger.Error("net.InterfaceAddrs failed: %s", err.Error()) + return "", err + } + for _, addr := range addrs { + // check the address type and if it is not a loopback take it + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + logger.Info("My IP Address: %s", ipnet.IP.String()) + return ipnet.IP.String(), nil + } + } + } + return "", nil +} - // create configuration +func createConf(xappMetrics []byte) { f, err := os.Create("/etc/ves-agent/ves-agent.yaml") if err != nil { logger.Error("Cannot create vespa conf file: %s", err.Error()) - return + os.Exit(1) } defer f.Close() - createVespaConfig(f) + createVespaConfig(f, xappMetrics) + logger.Info("Vespa config created") +} + +func subscribeXAppNotifications(chSubscriptions chan subsChannel) { + xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath + subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath + go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl) + logger.Info("xApp notifications subscribed from %s", subsUrl) +} + +func vesmgrInit() { + vesagent.name = "ves-agent" + logger.Info("vesmgrInit") + + var err error + if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" { + logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error()) + return + } + + var ok bool + appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN") + if ok { + logger.Info("Using appmgrdomain %s", appmgrDomain) + } else { + appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local" + logger.Info("Using default appmgrdomain %s", appmgrDomain) + } + chXAppSubscriptions := make(chan subsChannel) + chXAppNotifications := make(chan []byte) + chSupervision := make(chan chan string) + chVesagent := make(chan error) + + listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort) + startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision) + + subscribeXAppNotifications(chXAppSubscriptions) - /* Start ves-agent */ - ch := startVesagent() + createConf([]byte{}) + startVesagent(chVesagent) - runVesmgr(ch) + runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions) } -func startVesagent() chan error { - /* Start ves-agent */ +func startVesagent(ch chan error) { cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR")) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - ch := make(chan error) if err := cmd.Start(); err != nil { logger.Error("vesmgr exiting, ves-agent start failed: %s", err) go func() { @@ -75,20 +145,108 @@ func startVesagent() chan error { } else { logger.Info("ves-agent started with pid %d", cmd.Process.Pid) vesagent.Pid = cmd.Process.Pid + vesagent.process = cmd.Process go func() { // wait ves-agent exit and then post the error to the channel err := cmd.Wait() ch <- err }() } +} - return ch +func killVespa(process *os.Process) { + logger.Info("Killing vespa") + err := process.Kill() + if err != nil { + logger.Error("Cannot kill vespa: %s", err.Error()) + } } -func runVesmgr(ch chan error) { +func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) { + + logger.Info("query xAppStatus started, url %s", appmgrUrl) + req, err := http.NewRequest("GET", appmgrUrl, nil) + if err != nil { + logger.Error("Failed to create a HTTP request: %s", err) + return nil, err + } + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + client.Timeout = time.Second * timeout + resp, err := client.Do(req) + if err != nil { + logger.Error("Query xApp status failed: %s", err) + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + logger.Error("Failed to read xApp status body: %s", err) + return nil, err + } + logger.Info("query xAppStatus completed") + return body, nil + } else { + logger.Error("Error from xApp status query: %s", resp.Status) + return nil, errors.New(resp.Status) + } +} + +type state int + +const ( + normalState state = iota + vespaTerminatingState state = iota +) + +func queryConf() ([]byte, error) { + return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, + 10*time.Second) +} + +func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) { + + logger.Info("vesmgr main loop ready") + mystate := normalState + var xappStatus []byte for { - err := <-ch - logger.Error("Vesagent exited: " + err.Error()) - osExit(1) + select { + case supervision := <-chSupervision: + logger.Info("vesmgr: supervision") + supervision <- "OK" + case xAppNotif := <-chXAppNotifications: + logger.Info("vesmgr: xApp notification") + logger.Info(string(xAppNotif)) + /* + * If xapp status query fails then we cannot create + * a new configuration and kill vespa. + * In that case we assume that + * the situation is fixed when the next + * xapp notif comes + */ + var err error + xappStatus, err = queryConf() + if err == nil { + killVespa(vesagent.process) + mystate = vespaTerminatingState + } + case err := <-chVesagent: + switch mystate { + case vespaTerminatingState: + logger.Info("Vesagent termination completed") + createConf(xappStatus) + startVesagent(chVesagent) + mystate = normalState + default: + logger.Error("Vesagent exited: " + err.Error()) + os.Exit(1) + } + case isSubscribed := <-chXAppSubscriptions: + if isSubscribed.err != nil { + logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err) + os.Exit(1) + } + } } }