* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+ *
*/
package main
import (
+ "errors"
+ "io/ioutil"
+ "net"
+ "net/http"
"os"
- "os/exec"
+ "time"
+
mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
)
-type VesAgent struct {
- Pid int
- name string
+var appmgrDomain string
+
+const appmgrXAppConfigPath = "/ric/v1/config"
+const appmgrPort = "8080"
+
+// VesMgr contains runtime information of the vesmgr process
+type VesMgr struct {
+ myIPAddress string
+ chXAppSubscriptions chan subscriptionNotification
+ chXAppNotifications chan []byte
+ chSupervision chan chan string
+ chVesagent chan error
+ vesagent cmdRunner
+ httpServer HTTPServer
+}
+
+type subscriptionNotification struct {
+ subscribed bool
+ err error
+ subsID string
}
-var vesagent VesAgent
var logger *mdcloggo.MdcLogger
-var osExit = os.Exit
+
+// Version information, which is filled during compilation
+// Version tag of vesmgr container
+var Version string
+
+// Hash of the git commit used in building
+var Hash string
+
+const vesmgrXappNotifPort = "8080"
+const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
+const timeoutPostXAppSubscriptions = 5
+const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml"
func init() {
logger, _ = mdcloggo.InitLogger("vesmgr")
}
-/* Function to initialize vesmgr */
-func vesmgrInit() {
- vesagent.name = "ves-agent"
- logger.MdcAdd("vesmgr", "0.0.1")
- logger.Info("vesmgrInit")
-
- /* Subscribe notifications from xAPP Mgr */
- //subscribexAppNotifications()
+func getMyIP() (myIP string, retErr error) {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ logger.Error("net.InterfaceAddrs failed: %s", err.Error())
+ return "", err
+ }
+ for _, addr := range addrs {
+ // check the address type and if it is not a loopback take it
+ if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if ipnet.IP.To4() != nil {
+ logger.Info("My IP Address: %s", ipnet.IP.String())
+ return ipnet.IP.String(), nil
+ }
+ }
+ }
+ return "", nil
+}
- // create configuration
- f, err := os.Create("/etc/ves-agent/ves-agent.yaml")
+func createConf(fname string, xappMetrics []byte) {
+ f, err := os.Create(fname)
if err != nil {
logger.Error("Cannot create vespa conf file: %s", err.Error())
- return
+ os.Exit(1)
}
defer f.Close()
- createVespaConfig(f)
-
- /* Start ves-agent */
- ch := startVesagent()
+ createVespaConfig(f, xappMetrics)
+ logger.Info("Vespa config created")
+}
- runVesmgr(ch)
+func (vesmgr *VesMgr) subscribeXAppNotifications() {
+ xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
+ subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
+ go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL)
+ logger.Info("xApp notifications subscribed from %s", subsURL)
}
-func startVesagent() chan error {
- /* Start ves-agent */
- cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"))
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- ch := make(chan error)
- if err := cmd.Start(); err != nil {
- logger.Error("vesmgr exiting, ves-agent start failed: %s", err)
- go func() {
- ch <- err
- }()
+// Init initializes the vesmgr
+func (vesmgr *VesMgr) Init(listenPort string) *VesMgr {
+ logger.Info("vesmgrInit")
+ logger.Info("version: %s (%s)", Version, Hash)
+
+ var err error
+ if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
+ logger.Error("Cannot get myIPAddress: IP %s", vesmgr.myIPAddress)
+ panic("Cannot get my IP address")
+ }
+
+ var ok bool
+ appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN")
+ if ok {
+ logger.Info("Using appmgrdomain %s", appmgrDomain)
} else {
- logger.Info("ves-agent started with pid %d", cmd.Process.Pid)
- vesagent.Pid = cmd.Process.Pid
- go func() {
- // wait ves-agent exit and then post the error to the channel
- err := cmd.Wait()
- ch <- err
- }()
+ appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
+ logger.Info("Using default appmgrdomain %s", appmgrDomain)
+ }
+ vesmgr.chXAppSubscriptions = make(chan subscriptionNotification)
+ // Create notifications as buffered channel so that
+ // xappmgr does not block if we are stuck somewhere
+ vesmgr.chXAppNotifications = make(chan []byte, 10)
+ vesmgr.chSupervision = make(chan chan string)
+ vesmgr.chVesagent = make(chan error)
+ vesmgr.httpServer = HTTPServer{}
+ vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort)
+ vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"),
+ "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address",
+ os.Getenv("VESMGR_PROMETHEUS_ADDR"), "--AlertManager.Bind", os.Getenv("VESMGR_ALERTMANAGER_BIND_ADDR"),
+ "--Debug")
+ return vesmgr
+}
+
+func (vesmgr *VesMgr) startVesagent() {
+ vesmgr.vesagent.run(vesmgr.chVesagent)
+}
+
+func (vesmgr *VesMgr) killVespa() error {
+ logger.Info("Killing vespa")
+ err := vesmgr.vesagent.kill()
+ if err != nil {
+ logger.Error("Cannot kill vespa: %s", err.Error())
+ return err
+ }
+ return <-vesmgr.chVesagent // wait vespa exit
+}
+
+func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) {
+ emptyConfig := []byte("{}")
+ logger.Info("query xAppConfig started, url %s", appmgrURL)
+ req, err := http.NewRequest("GET", appmgrURL, nil)
+ if err != nil {
+ logger.Error("Failed to create a HTTP request: %s", err)
+ return emptyConfig, err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ client := &http.Client{}
+ client.Timeout = time.Second * timeout
+ resp, err := client.Do(req)
+ if err != nil {
+ logger.Error("Query xApp config failed: %s", err)
+ return emptyConfig, err
}
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusOK {
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ logger.Error("Failed to read xApp config body: %s", err)
+ return emptyConfig, err
+ }
+ logger.Info("query xAppConfig completed")
+ return body, nil
+ }
+ logger.Error("Error from xApp config query: %s", resp.Status)
+ return emptyConfig, errors.New(resp.Status)
+}
- return ch
+func queryConf() (appConfig []byte, err error) {
+ for i := 0; i < 10; i++ {
+ appConfig, err = queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, 10*time.Second)
+ if len(appConfig) > 0 {
+ break
+ }
+ time.Sleep(5 * time.Second)
+ }
+ return appConfig, err
}
-func runVesmgr(ch chan error) {
+func (vesmgr *VesMgr) emptyNotificationsChannel() {
for {
- err := <-ch
+ select {
+ case <-vesmgr.chXAppNotifications:
+ // we don't care the content
+ default:
+ return
+ }
+ }
+}
+
+func (vesmgr *VesMgr) servRequest() {
+ select {
+ case supervision := <-vesmgr.chSupervision:
+ logger.Info("vesmgr: supervision")
+ supervision <- "OK"
+ case xAppNotif := <-vesmgr.chXAppNotifications:
+ logger.Info("vesmgr: xApp notification")
+ logger.Info(string(xAppNotif))
+ vesmgr.emptyNotificationsChannel()
+ /*
+ * If xapp config query fails then we cannot create
+ * a new configuration and kill vespa.
+ * In that case we assume that
+ * the situation is fixed when the next
+ * xapp notif comes
+ */
+ xappConfig, err := queryConf()
+ if err == nil {
+ vesmgr.killVespa()
+ createConf(vespaConfigFile, xappConfig)
+ vesmgr.startVesagent()
+ }
+ case err := <-vesmgr.chVesagent:
logger.Error("Vesagent exited: " + err.Error())
- osExit(1)
+ os.Exit(1)
+ }
+}
+
+func (vesmgr *VesMgr) waitSubscriptionLoop() {
+ for {
+ select {
+ case supervision := <-vesmgr.chSupervision:
+ logger.Info("vesmgr: supervision")
+ supervision <- "OK"
+ case isSubscribed := <-vesmgr.chXAppSubscriptions:
+ if isSubscribed.err != nil {
+ logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
+ os.Exit(1)
+ }
+ return
+ }
+ }
+}
+
+// Run the vesmgr process main loop
+func (vesmgr *VesMgr) Run() {
+ logger.Info("vesmgr main loop ready")
+
+ vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision)
+
+ vesmgr.subscribeXAppNotifications()
+
+ vesmgr.waitSubscriptionLoop()
+
+ xappConfig, _ := queryConf()
+
+ createConf(vespaConfigFile, xappConfig)
+
+ vesmgr.startVesagent()
+ for {
+ vesmgr.servRequest()
}
}