/* * Copyright (c) 2019 AT&T Intellectual Property. * Copyright (c) 2018-2019 Nokia. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * This source code is part of the near-RT RIC (RAN Intelligent Controller) * platform project (RICP). * */ package main import ( "errors" "io/ioutil" "net" "net/http" "os" "time" "fmt" mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git" ) var appmgrDomain string const appmgrXAppConfigPath = "/ric/v1/config" const appmgrPort = "8080" // VesMgr contains runtime information of the vesmgr process type VesMgr struct { myIPAddress string chXAppSubscriptions chan subscriptionNotification chXAppNotifications chan []byte chSupervision chan chan string chVesagent chan error vesagent cmdRunner httpServer HTTPServer } type subscriptionNotification struct { subscribed bool err error subsID string } var logger *mdcloggo.MdcLogger // Version information, which is filled during compilation // Version tag of vesmgr container var Version string // Hash of the git commit used in building var Hash string const vesmgrXappNotifPort = "8080" const vesmgrXappNotifPath = "/vesmgr_xappnotif/" const timeoutPostXAppSubscriptions = 5 const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml" func init() { logger, _ = mdcloggo.InitLogger("vesmgr") } func getMyIP() (myIP string, retErr error) { addrs, err := net.InterfaceAddrs() if err != nil { logger.Error("net.InterfaceAddrs failed: %s", err.Error()) return "", err } for _, addr := range addrs { // check the address type and if it is not a loopback take it if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { logger.Info("My IP Address: %s", ipnet.IP.String()) return ipnet.IP.String(), nil } } } return "", nil } func createConf(fname string, xappMetrics []byte) { f, err := os.Create(fname) if err != nil { logger.Error("Cannot create vespa conf file: %s", err.Error()) os.Exit(1) } defer f.Close() createVespaConfig(f, xappMetrics) logger.Info("Vespa config created") } func (vesmgr *VesMgr) subscribeXAppNotifications() { xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL) logger.Info("xApp notifications subscribed from %s", subsURL) } // Init initializes the vesmgr func (vesmgr *VesMgr) Init(listenPort string) *VesMgr { logger.Info("vesmgrInit") logger.Info("version: %s (%s)", Version, Hash) var err error if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" { logger.Error("Cannot get myIPAddress: IP %s", vesmgr.myIPAddress) panic("Cannot get my IP address") } var ok bool appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN") if ok { logger.Info("Using appmgrdomain %s", appmgrDomain) } else { pltnamespace := os.Getenv("PLT_NAMESPACE") if pltnamespace == "" { pltnamespace = "ricplt" } appmgrDomain = fmt.Sprintf("service-%s-appmgr-http.%s.svc.cluster.local",pltnamespace, pltnamespace) logger.Info("Using default appmgrdomain %s", appmgrDomain) } vesmgr.chXAppSubscriptions = make(chan subscriptionNotification) // Create notifications as buffered channel so that // xappmgr does not block if we are stuck somewhere vesmgr.chXAppNotifications = make(chan []byte, 10) vesmgr.chSupervision = make(chan chan string) vesmgr.chVesagent = make(chan error) vesmgr.httpServer = HTTPServer{} vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort) vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"), "--AlertManager.Bind", os.Getenv("VESMGR_ALERTMANAGER_BIND_ADDR"), "--Debug") return vesmgr } func (vesmgr *VesMgr) startVesagent() { vesmgr.vesagent.run(vesmgr.chVesagent) } func (vesmgr *VesMgr) killVespa() error { logger.Info("Killing vespa") err := vesmgr.vesagent.kill() if err != nil { logger.Error("Cannot kill vespa: %s", err.Error()) return err } return <-vesmgr.chVesagent // wait vespa exit } func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) { emptyConfig := []byte("{}") logger.Info("query xAppConfig started, url %s", appmgrURL) req, err := http.NewRequest("GET", appmgrURL, nil) if err != nil { logger.Error("Failed to create a HTTP request: %s", err) return emptyConfig, err } req.Header.Set("Content-Type", "application/json") client := &http.Client{} client.Timeout = time.Second * timeout resp, err := client.Do(req) if err != nil { logger.Error("Query xApp config failed: %s", err) return emptyConfig, err } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { body, err := ioutil.ReadAll(resp.Body) if err != nil { logger.Error("Failed to read xApp config body: %s", err) return emptyConfig, err } logger.Info("query xAppConfig completed") return body, nil } logger.Error("Error from xApp config query: %s", resp.Status) return emptyConfig, errors.New(resp.Status) } func queryConf() (appConfig []byte, err error) { for i := 0; i < 10; i++ { appConfig, err = queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, 10*time.Second) if len(appConfig) > 0 { break } time.Sleep(5 * time.Second) } return appConfig, err } func (vesmgr *VesMgr) emptyNotificationsChannel() { for { select { case <-vesmgr.chXAppNotifications: // we don't care the content default: return } } } func (vesmgr *VesMgr) servRequest() { select { case supervision := <-vesmgr.chSupervision: logger.Info("vesmgr: supervision") supervision <- "OK" case xAppNotif := <-vesmgr.chXAppNotifications: logger.Info("vesmgr: xApp notification") logger.Info(string(xAppNotif)) vesmgr.emptyNotificationsChannel() /* * If xapp config query fails then we cannot create * a new configuration and kill vespa. * In that case we assume that * the situation is fixed when the next * xapp notif comes */ xappConfig, err := queryConf() if err == nil { vesmgr.killVespa() createConf(vespaConfigFile, xappConfig) vesmgr.startVesagent() } case err := <-vesmgr.chVesagent: logger.Error("Vesagent exited: " + err.Error()) os.Exit(1) } } func (vesmgr *VesMgr) waitSubscriptionLoop() { for { select { case supervision := <-vesmgr.chSupervision: logger.Info("vesmgr: supervision") supervision <- "OK" case isSubscribed := <-vesmgr.chXAppSubscriptions: if isSubscribed.err != nil { logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err) os.Exit(1) } return } } } // Run the vesmgr process main loop func (vesmgr *VesMgr) Run() { logger.Info("vesmgr main loop ready") vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision) vesmgr.subscribeXAppNotifications() vesmgr.waitSubscriptionLoop() xappConfig, _ := queryConf() createConf(vespaConfigFile, xappConfig) vesmgr.startVesagent() for { vesmgr.servRequest() } }