X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=cmd%2Fvesmgr%2Fvesmgr.go;h=0e775f86e7e52d37115ae70d2dc63f1e4402ed37;hb=8be02225a1f9cb781e809857be374df67f13226f;hp=436842bceb08ea025cbf1123e69a3664b7765a95;hpb=4b74f01111b3b14fbb3832d8aaf4946cded374a0;p=ric-plt%2Fvespamgr.git diff --git a/cmd/vesmgr/vesmgr.go b/cmd/vesmgr/vesmgr.go index 436842b..0e775f8 100755 --- a/cmd/vesmgr/vesmgr.go +++ b/cmd/vesmgr/vesmgr.go @@ -13,82 +13,250 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * This source code is part of the near-RT RIC (RAN Intelligent Controller) + * platform project (RICP). + * */ package main import ( + "errors" + "io/ioutil" + "net" + "net/http" "os" - "os/exec" + "time" + mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git" ) -type VesAgent struct { - Pid int - name string +var appmgrDomain string + +const appmgrXAppConfigPath = "/ric/v1/config" +const appmgrPort = "8080" + +// VesMgr contains runtime information of the vesmgr process +type VesMgr struct { + myIPAddress string + chXAppSubscriptions chan subscriptionNotification + chXAppNotifications chan []byte + chSupervision chan chan string + chVesagent chan error + vesagent cmdRunner + httpServer HTTPServer +} + +type subscriptionNotification struct { + subscribed bool + err error + subsID string } -var vesagent VesAgent var logger *mdcloggo.MdcLogger -var osExit = os.Exit + +// Version information, which is filled during compilation +// Version tag of vesmgr container +var Version string + +// Hash of the git commit used in building +var Hash string + +const vesmgrXappNotifPort = "8080" +const vesmgrXappNotifPath = "/vesmgr_xappnotif/" +const timeoutPostXAppSubscriptions = 5 +const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml" func init() { logger, _ = mdcloggo.InitLogger("vesmgr") } -/* Function to initialize vesmgr */ -func vesmgrInit() { - vesagent.name = "ves-agent" - logger.MdcAdd("vesmgr", "0.0.1") - logger.Info("vesmgrInit") - - /* Subscribe notifications from xAPP Mgr */ - //subscribexAppNotifications() +func getMyIP() (myIP string, retErr error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + logger.Error("net.InterfaceAddrs failed: %s", err.Error()) + return "", err + } + for _, addr := range addrs { + // check the address type and if it is not a loopback take it + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + logger.Info("My IP Address: %s", ipnet.IP.String()) + return ipnet.IP.String(), nil + } + } + } + return "", nil +} - // create configuration - f, err := os.Create("/etc/ves-agent/ves-agent.yaml") +func createConf(fname string, xappMetrics []byte) { + f, err := os.Create(fname) if err != nil { logger.Error("Cannot create vespa conf file: %s", err.Error()) - return + os.Exit(1) } defer f.Close() - createVespaConfig(f) - - /* Start ves-agent */ - ch := startVesagent() + createVespaConfig(f, xappMetrics) + logger.Info("Vespa config created") +} - runVesmgr(ch) +func (vesmgr *VesMgr) subscribeXAppNotifications() { + xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath + subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath + go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL) + logger.Info("xApp notifications subscribed from %s", subsURL) } -func startVesagent() chan error { - /* Start ves-agent */ - cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "-f", os.Getenv("VESMGR_PRICOLLECTOR_ADDR"), "-p", os.Getenv("VESMGR_PRICOLLECTOR_PORT"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR")) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - ch := make(chan error) - if err := cmd.Start(); err != nil { - logger.Error("vesmgr exiting, ves-agent start failed: %s", err) - go func() { - ch <- err - }() +// Init initializes the vesmgr +func (vesmgr *VesMgr) Init(listenPort string) *VesMgr { + logger.Info("vesmgrInit") + logger.Info("version: %s (%s)", Version, Hash) + + var err error + if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" { + logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error()) + panic("Cannot get my IP address") + } + + var ok bool + appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN") + if ok { + logger.Info("Using appmgrdomain %s", appmgrDomain) } else { - logger.Info("ves-agent started with pid %d", cmd.Process.Pid) - vesagent.Pid = cmd.Process.Pid - go func() { - // wait ves-agent exit and then post the error to the channel - err := cmd.Wait() - ch <- err - }() + appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local" + logger.Info("Using default appmgrdomain %s", appmgrDomain) } + vesmgr.chXAppSubscriptions = make(chan subscriptionNotification) + // Create notifications as buffered channel so that + // xappmgr does not block if we are stuck somewhere + vesmgr.chXAppNotifications = make(chan []byte, 10) + vesmgr.chSupervision = make(chan chan string) + vesmgr.chVesagent = make(chan error) + vesmgr.httpServer = HTTPServer{} + vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort) + vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"), + "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", + os.Getenv("VESMGR_PROMETHEUS_ADDR"), "--AlertManager.Bind", os.Getenv("VESMGR_ALERTMANAGER_BIND_ADDR"), + "--Debug") + return vesmgr +} - return ch +func (vesmgr *VesMgr) startVesagent() { + vesmgr.vesagent.run(vesmgr.chVesagent) } -func runVesmgr(ch chan error) { +func (vesmgr *VesMgr) killVespa() error { + logger.Info("Killing vespa") + err := vesmgr.vesagent.kill() + if err != nil { + logger.Error("Cannot kill vespa: %s", err.Error()) + return err + } + return <-vesmgr.chVesagent // wait vespa exit +} + +func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) { + emptyConfig := []byte("{}") + logger.Info("query xAppConfig started, url %s", appmgrURL) + req, err := http.NewRequest("GET", appmgrURL, nil) + if err != nil { + logger.Error("Failed to create a HTTP request: %s", err) + return emptyConfig, err + } + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + client.Timeout = time.Second * timeout + resp, err := client.Do(req) + if err != nil { + logger.Error("Query xApp config failed: %s", err) + return emptyConfig, err + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + logger.Error("Failed to read xApp config body: %s", err) + return emptyConfig, err + } + logger.Info("query xAppConfig completed") + return body, nil + } + logger.Error("Error from xApp config query: %s", resp.Status) + return emptyConfig, errors.New(resp.Status) +} + +func queryConf() ([]byte, error) { + return queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, + 10*time.Second) +} + +func (vesmgr *VesMgr) emptyNotificationsChannel() { for { - err := <-ch + select { + case <-vesmgr.chXAppNotifications: + // we don't care the content + default: + return + } + } +} + +func (vesmgr *VesMgr) servRequest() { + select { + case supervision := <-vesmgr.chSupervision: + logger.Info("vesmgr: supervision") + supervision <- "OK" + case xAppNotif := <-vesmgr.chXAppNotifications: + logger.Info("vesmgr: xApp notification") + logger.Info(string(xAppNotif)) + vesmgr.emptyNotificationsChannel() + /* + * If xapp config query fails then we cannot create + * a new configuration and kill vespa. + * In that case we assume that + * the situation is fixed when the next + * xapp notif comes + */ + xappConfig, err := queryConf() + if err == nil { + vesmgr.killVespa() + createConf(vespaConfigFile, xappConfig) + vesmgr.startVesagent() + } + case err := <-vesmgr.chVesagent: logger.Error("Vesagent exited: " + err.Error()) - osExit(1) + os.Exit(1) + } +} + +func (vesmgr *VesMgr) waitSubscriptionLoop() { + for { + select { + case supervision := <-vesmgr.chSupervision: + logger.Info("vesmgr: supervision") + supervision <- "OK" + case isSubscribed := <-vesmgr.chXAppSubscriptions: + if isSubscribed.err != nil { + logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err) + os.Exit(1) + } + return + } + } +} + +// Run the vesmgr process main loop +func (vesmgr *VesMgr) Run() { + logger.Info("vesmgr main loop ready") + vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision) + vesmgr.subscribeXAppNotifications() + vesmgr.waitSubscriptionLoop() + xappConfig, _ := queryConf() + createConf(vespaConfigFile, xappConfig) + vesmgr.startVesagent() + for { + vesmgr.servRequest() } }