Refactor the code
[ric-plt/vespamgr.git] / cmd / vesmgr / vesmgr.go
index 0e27838..aeebb15 100755 (executable)
@@ -23,7 +23,6 @@ import (
        "net"
        "net/http"
        "os"
-       "os/exec"
        "time"
 
        mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
@@ -34,29 +33,29 @@ var appmgrDomain string
 const appmgrXAppConfigPath = "/ric/v1/config"
 const appmgrPort = "8080"
 
-type VesAgent struct {
-       Pid     int
-       name    string
-       process *os.Process
-}
-
+// VesMgr contains runtime information of the vesmgr process
 type VesMgr struct {
-       myIPAddress  string
-       appmgrSubsId string
+       myIPAddress         string
+       chXAppSubscriptions chan subscriptionNotification
+       chXAppNotifications chan []byte
+       chSupervision       chan chan string
+       chVesagent          chan error
+       vesagent            cmdRunner
+       httpServer          HTTPServer
 }
 
-type subsChannel struct {
+type subscriptionNotification struct {
        subscribed bool
        err        error
+       subsID     string
 }
 
-var vesagent VesAgent
-var vesmgr VesMgr
 var logger *mdcloggo.MdcLogger
 
 const vesmgrXappNotifPort = "8080"
 const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
 const timeoutPostXAppSubscriptions = 5
+const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml"
 
 func init() {
        logger, _ = mdcloggo.InitLogger("vesmgr")
@@ -80,8 +79,8 @@ func getMyIP() (myIP string, retErr error) {
        return "", nil
 }
 
-func createConf(xappMetrics []byte) {
-       f, err := os.Create("/etc/ves-agent/ves-agent.yaml")
+func createConf(fname string, xappMetrics []byte) {
+       f, err := os.Create(fname)
        if err != nil {
                logger.Error("Cannot create vespa conf file: %s", err.Error())
                os.Exit(1)
@@ -92,21 +91,20 @@ func createConf(xappMetrics []byte) {
        logger.Info("Vespa config created")
 }
 
-func subscribeXAppNotifications(chSubscriptions chan subsChannel) {
-       xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
-       subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
-       go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl)
-       logger.Info("xApp notifications subscribed from %s", subsUrl)
+func (vesmgr *VesMgr) subscribeXAppNotifications() {
+       xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
+       subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
+       go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL)
+       logger.Info("xApp notifications subscribed from %s", subsURL)
 }
 
-func vesmgrInit() {
-       vesagent.name = "ves-agent"
+// Init initializes the vesmgr
+func (vesmgr *VesMgr) Init(listenPort string) *VesMgr {
        logger.Info("vesmgrInit")
-
        var err error
        if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
                logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error())
-               return
+               panic("Cannot get my IP address")
        }
 
        var ok bool
@@ -117,138 +115,134 @@ func vesmgrInit() {
                appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
                logger.Info("Using default appmgrdomain %s", appmgrDomain)
        }
-       chXAppSubscriptions := make(chan subsChannel)
-       chXAppNotifications := make(chan []byte)
-       chSupervision := make(chan chan string)
-       chVesagent := make(chan error)
-
-       listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort)
-       startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision)
-
-       subscribeXAppNotifications(chXAppSubscriptions)
-
-       runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions)
+       vesmgr.chXAppSubscriptions = make(chan subscriptionNotification)
+       // Create notifications as buffered channel so that
+       // xappmgr does not block if we are stuck somewhere
+       vesmgr.chXAppNotifications = make(chan []byte, 10)
+       vesmgr.chSupervision = make(chan chan string)
+       vesmgr.chVesagent = make(chan error)
+       vesmgr.httpServer = HTTPServer{}
+       vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort)
+       vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"),
+               "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address",
+               os.Getenv("VESMGR_PROMETHEUS_ADDR"))
+       return vesmgr
 }
 
-func startVesagent(ch chan error) {
-       cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"))
-       cmd.Stdout = os.Stdout
-       cmd.Stderr = os.Stderr
-       if err := cmd.Start(); err != nil {
-               logger.Error("vesmgr exiting, ves-agent start failed: %s", err)
-               go func() {
-                       ch <- err
-               }()
-       } else {
-               logger.Info("ves-agent started with pid %d", cmd.Process.Pid)
-               vesagent.Pid = cmd.Process.Pid
-               vesagent.process = cmd.Process
-               go func() {
-                       // wait ves-agent exit and then post the error to the channel
-                       err := cmd.Wait()
-                       ch <- err
-               }()
-       }
+func (vesmgr *VesMgr) startVesagent() {
+       vesmgr.vesagent.run(vesmgr.chVesagent)
 }
 
-func killVespa(process *os.Process) {
+func (vesmgr *VesMgr) killVespa() error {
        logger.Info("Killing vespa")
-       err := process.Kill()
+       err := vesmgr.vesagent.kill()
        if err != nil {
                logger.Error("Cannot kill vespa: %s", err.Error())
+               return err
        }
+       return <-vesmgr.chVesagent // wait vespa exit
 }
 
-func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) {
-
-       logger.Info("query xAppStatus started, url %s", appmgrUrl)
-       req, err := http.NewRequest("GET", appmgrUrl, nil)
+func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) {
+       emptyConfig := []byte("{}")
+       logger.Info("query xAppConfig started, url %s", appmgrURL)
+       req, err := http.NewRequest("GET", appmgrURL, nil)
        if err != nil {
                logger.Error("Failed to create a HTTP request: %s", err)
-               return nil, err
+               return emptyConfig, err
        }
        req.Header.Set("Content-Type", "application/json")
        client := &http.Client{}
        client.Timeout = time.Second * timeout
        resp, err := client.Do(req)
        if err != nil {
-               logger.Error("Query xApp status failed: %s", err)
-               return nil, err
+               logger.Error("Query xApp config failed: %s", err)
+               return emptyConfig, err
        }
        defer resp.Body.Close()
        if resp.StatusCode == http.StatusOK {
                body, err := ioutil.ReadAll(resp.Body)
                if err != nil {
-                       logger.Error("Failed to read xApp status body: %s", err)
-                       return nil, err
+                       logger.Error("Failed to read xApp config body: %s", err)
+                       return emptyConfig, err
                }
-               logger.Info("query xAppStatus completed")
+               logger.Info("query xAppConfig completed")
                return body, nil
-       } else {
-               logger.Error("Error from xApp status query: %s", resp.Status)
-               return nil, errors.New(resp.Status)
        }
+       logger.Error("Error from xApp config query: %s", resp.Status)
+       return emptyConfig, errors.New(resp.Status)
 }
 
-type state int
-
-const (
-       normalState           state = iota
-       vespaTerminatingState state = iota
-)
-
 func queryConf() ([]byte, error) {
-       return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
+       return queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
                10*time.Second)
 }
 
-func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) {
+func (vesmgr *VesMgr) emptyNotificationsChannel() {
+       for {
+               select {
+               case <-vesmgr.chXAppNotifications:
+                       // we don't care the content
+               default:
+                       return
+               }
+       }
+}
+
+func (vesmgr *VesMgr) servRequest() {
+       select {
+       case supervision := <-vesmgr.chSupervision:
+               logger.Info("vesmgr: supervision")
+               supervision <- "OK"
+       case xAppNotif := <-vesmgr.chXAppNotifications:
+               logger.Info("vesmgr: xApp notification")
+               logger.Info(string(xAppNotif))
+               vesmgr.emptyNotificationsChannel()
+               /*
+               * If xapp config query fails then we cannot create
+               * a new configuration and kill vespa.
+               * In that case we assume that
+               * the situation is fixed when the next
+               * xapp notif comes
+                */
+               xappConfig, err := queryConf()
+               if err == nil {
+                       vesmgr.killVespa()
+                       createConf(vespaConfigFile, xappConfig)
+                       vesmgr.startVesagent()
+               }
+       case err := <-vesmgr.chVesagent:
+               logger.Error("Vesagent exited: " + err.Error())
+               os.Exit(1)
+       }
+}
 
-       logger.Info("vesmgr main loop ready")
-       mystate := normalState
-       var xappStatus []byte
-       var err error
+func (vesmgr *VesMgr) waitSubscriptionLoop() {
        for {
                select {
-               case supervision := <-chSupervision:
+               case supervision := <-vesmgr.chSupervision:
                        logger.Info("vesmgr: supervision")
                        supervision <- "OK"
-               case xAppNotif := <-chXAppNotifications:
-                       logger.Info("vesmgr: xApp notification")
-                       logger.Info(string(xAppNotif))
-                       /*
-                        * If xapp status query fails then we cannot create
-                        * a new configuration and kill vespa.
-                        * In that case we assume that
-                        * the situation is fixed when the next
-                        * xapp notif comes
-                        */
-                       xappStatus, err = queryConf()
-                       if err == nil {
-                               killVespa(vesagent.process)
-                               mystate = vespaTerminatingState
-                       }
-               case err := <-chVesagent:
-                       switch mystate {
-                       case vespaTerminatingState:
-                               logger.Info("Vesagent termination completed")
-                               createConf(xappStatus)
-                               startVesagent(chVesagent)
-                               mystate = normalState
-                       default:
-                               logger.Error("Vesagent exited: " + err.Error())
-                               os.Exit(1)
-                       }
-               case isSubscribed := <-chXAppSubscriptions:
+               case isSubscribed := <-vesmgr.chXAppSubscriptions:
                        if isSubscribed.err != nil {
                                logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
                                os.Exit(1)
                        }
-                       xappStatus, err = queryConf()
-                       if err == nil {
-                               createConf(xappStatus)
-                               startVesagent(chVesagent)
-                       }
+                       return
                }
        }
 }
+
+// Run the vesmgr process main loop
+func (vesmgr *VesMgr) Run() {
+       logger.Info("vesmgr main loop ready")
+       vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision)
+       vesmgr.subscribeXAppNotifications()
+       vesmgr.waitSubscriptionLoop()
+       xappConfig, _ := queryConf()
+       createConf(vespaConfigFile, xappConfig)
+       vesmgr.startVesagent()
+       for {
+               vesmgr.servRequest()
+       }
+}