X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=cmd%2Fvesmgr%2Fvesmgr.go;h=dfe9864cd1c3d8588c7dc6245487f0b7d35fcab7;hb=refs%2Ftags%2F0.7.0;hp=18ed7935fd048a14c5d4449dcf98a9bad0e24f5f;hpb=412df96a23a30a82d2a031556888aeaf9604ada8;p=ric-plt%2Fvespamgr.git diff --git a/cmd/vesmgr/vesmgr.go b/cmd/vesmgr/vesmgr.go index 18ed793..dfe9864 100755 --- a/cmd/vesmgr/vesmgr.go +++ b/cmd/vesmgr/vesmgr.go @@ -13,6 +13,10 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * This source code is part of the near-RT RIC (RAN Intelligent Controller) + * platform project (RICP). + * */ package main @@ -23,7 +27,6 @@ import ( "net" "net/http" "os" - "os/exec" "time" mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git" @@ -34,29 +37,36 @@ var appmgrDomain string const appmgrXAppConfigPath = "/ric/v1/config" const appmgrPort = "8080" -type VesAgent struct { - Pid int - name string - process *os.Process -} - +// VesMgr contains runtime information of the vesmgr process type VesMgr struct { - myIPAddress string - appmgrSubsId string + myIPAddress string + chXAppSubscriptions chan subscriptionNotification + chXAppNotifications chan []byte + chSupervision chan chan string + chVesagent chan error + vesagent cmdRunner + httpServer HTTPServer } -type subsChannel struct { +type subscriptionNotification struct { subscribed bool err error + subsID string } -var vesagent VesAgent -var vesmgr VesMgr var logger *mdcloggo.MdcLogger +// Version information, which is filled during compilation +// Version tag of vesmgr container +var Version string + +// Hash of the git commit used in building +var Hash string + const vesmgrXappNotifPort = "8080" const vesmgrXappNotifPath = "/vesmgr_xappnotif/" const timeoutPostXAppSubscriptions = 5 +const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml" func init() { logger, _ = mdcloggo.InitLogger("vesmgr") @@ -80,8 +90,8 @@ func getMyIP() (myIP string, retErr error) { return "", nil } -func createConf(xappMetrics []byte) { - f, err := os.Create("/etc/ves-agent/ves-agent.yaml") +func createConf(fname string, xappMetrics []byte) { + f, err := os.Create(fname) if err != nil { logger.Error("Cannot create vespa conf file: %s", err.Error()) os.Exit(1) @@ -92,21 +102,22 @@ func createConf(xappMetrics []byte) { logger.Info("Vespa config created") } -func subscribeXAppNotifications(chSubscriptions chan subsChannel) { - xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath - subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath - go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl) - logger.Info("xApp notifications subscribed from %s", subsUrl) +func (vesmgr *VesMgr) subscribeXAppNotifications() { + xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath + subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath + go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL) + logger.Info("xApp notifications subscribed from %s", subsURL) } -func vesmgrInit() { - vesagent.name = "ves-agent" +// Init initializes the vesmgr +func (vesmgr *VesMgr) Init(listenPort string) *VesMgr { logger.Info("vesmgrInit") + logger.Info("version %s (%s)", Version, Hash) var err error if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" { logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error()) - return + panic("Cannot get my IP address") } var ok bool @@ -117,136 +128,134 @@ func vesmgrInit() { appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local" logger.Info("Using default appmgrdomain %s", appmgrDomain) } - chXAppSubscriptions := make(chan subsChannel) - chXAppNotifications := make(chan []byte) - chSupervision := make(chan chan string) - chVesagent := make(chan error) - - listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort) - startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision) - - subscribeXAppNotifications(chXAppSubscriptions) - - createConf([]byte{}) - startVesagent(chVesagent) - - runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions) + vesmgr.chXAppSubscriptions = make(chan subscriptionNotification) + // Create notifications as buffered channel so that + // xappmgr does not block if we are stuck somewhere + vesmgr.chXAppNotifications = make(chan []byte, 10) + vesmgr.chSupervision = make(chan chan string) + vesmgr.chVesagent = make(chan error) + vesmgr.httpServer = HTTPServer{} + vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort) + vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"), + "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", + os.Getenv("VESMGR_PROMETHEUS_ADDR")) + return vesmgr } -func startVesagent(ch chan error) { - cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR")) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Start(); err != nil { - logger.Error("vesmgr exiting, ves-agent start failed: %s", err) - go func() { - ch <- err - }() - } else { - logger.Info("ves-agent started with pid %d", cmd.Process.Pid) - vesagent.Pid = cmd.Process.Pid - vesagent.process = cmd.Process - go func() { - // wait ves-agent exit and then post the error to the channel - err := cmd.Wait() - ch <- err - }() - } +func (vesmgr *VesMgr) startVesagent() { + vesmgr.vesagent.run(vesmgr.chVesagent) } -func killVespa(process *os.Process) { +func (vesmgr *VesMgr) killVespa() error { logger.Info("Killing vespa") - err := process.Kill() + err := vesmgr.vesagent.kill() if err != nil { logger.Error("Cannot kill vespa: %s", err.Error()) + return err } + return <-vesmgr.chVesagent // wait vespa exit } -func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) { - - logger.Info("query xAppStatus started, url %s", appmgrUrl) - req, err := http.NewRequest("GET", appmgrUrl, nil) +func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) { + emptyConfig := []byte("{}") + logger.Info("query xAppConfig started, url %s", appmgrURL) + req, err := http.NewRequest("GET", appmgrURL, nil) if err != nil { logger.Error("Failed to create a HTTP request: %s", err) - return nil, err + return emptyConfig, err } req.Header.Set("Content-Type", "application/json") client := &http.Client{} client.Timeout = time.Second * timeout resp, err := client.Do(req) if err != nil { - logger.Error("Query xApp status failed: %s", err) - return nil, err + logger.Error("Query xApp config failed: %s", err) + return emptyConfig, err } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { body, err := ioutil.ReadAll(resp.Body) if err != nil { - logger.Error("Failed to read xApp status body: %s", err) - return nil, err + logger.Error("Failed to read xApp config body: %s", err) + return emptyConfig, err } - logger.Info("query xAppStatus completed") + logger.Info("query xAppConfig completed") return body, nil - } else { - logger.Error("Error from xApp status query: %s", resp.Status) - return nil, errors.New(resp.Status) } + logger.Error("Error from xApp config query: %s", resp.Status) + return emptyConfig, errors.New(resp.Status) } -type state int - -const ( - normalState state = iota - vespaTerminatingState state = iota -) - func queryConf() ([]byte, error) { - return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, + return queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, 10*time.Second) } -func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) { +func (vesmgr *VesMgr) emptyNotificationsChannel() { + for { + select { + case <-vesmgr.chXAppNotifications: + // we don't care the content + default: + return + } + } +} + +func (vesmgr *VesMgr) servRequest() { + select { + case supervision := <-vesmgr.chSupervision: + logger.Info("vesmgr: supervision") + supervision <- "OK" + case xAppNotif := <-vesmgr.chXAppNotifications: + logger.Info("vesmgr: xApp notification") + logger.Info(string(xAppNotif)) + vesmgr.emptyNotificationsChannel() + /* + * If xapp config query fails then we cannot create + * a new configuration and kill vespa. + * In that case we assume that + * the situation is fixed when the next + * xapp notif comes + */ + xappConfig, err := queryConf() + if err == nil { + vesmgr.killVespa() + createConf(vespaConfigFile, xappConfig) + vesmgr.startVesagent() + } + case err := <-vesmgr.chVesagent: + logger.Error("Vesagent exited: " + err.Error()) + os.Exit(1) + } +} - logger.Info("vesmgr main loop ready") - mystate := normalState - var xappStatus []byte +func (vesmgr *VesMgr) waitSubscriptionLoop() { for { select { - case supervision := <-chSupervision: + case supervision := <-vesmgr.chSupervision: logger.Info("vesmgr: supervision") supervision <- "OK" - case xAppNotif := <-chXAppNotifications: - logger.Info("vesmgr: xApp notification") - logger.Info(string(xAppNotif)) - /* - * If xapp status query fails then we cannot create - * a new configuration and kill vespa. - * In that case we assume that - * the situation is fixed when the next - * xapp notif comes - */ - var err error - xappStatus, err = queryConf() - if err == nil { - killVespa(vesagent.process) - mystate = vespaTerminatingState - } - case err := <-chVesagent: - switch mystate { - case vespaTerminatingState: - logger.Info("Vesagent termination completed") - createConf(xappStatus) - startVesagent(chVesagent) - mystate = normalState - default: - logger.Error("Vesagent exited: " + err.Error()) - os.Exit(1) - } - case isSubscribed := <-chXAppSubscriptions: + case isSubscribed := <-vesmgr.chXAppSubscriptions: if isSubscribed.err != nil { logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err) os.Exit(1) } + return } } } + +// Run the vesmgr process main loop +func (vesmgr *VesMgr) Run() { + logger.Info("vesmgr main loop ready") + vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision) + vesmgr.subscribeXAppNotifications() + vesmgr.waitSubscriptionLoop() + xappConfig, _ := queryConf() + createConf(vespaConfigFile, xappConfig) + vesmgr.startVesagent() + for { + vesmgr.servRequest() + } +}