Refactor the code 14/1014/3
authorRoni Riska <roni.riska@nokia.com>
Thu, 26 Sep 2019 05:20:44 +0000 (08:20 +0300)
committerRoni Riska <roni.riska@nokia.com>
Thu, 26 Sep 2019 07:28:50 +0000 (10:28 +0300)
Multiple code refactorings to make the code
simpler and increase the unit testing coverage.

* Golint findings are fixed.
* Xapp query function name changed from status to
  config.

The functionality is slightly changed so that
  - Vespa is started only after the xapp notification
    subscription is successful and the vesmgr has received
    the current xapp configuration from the xapp manager.
  - The vesmgr goes to the main loop after that.

Change-Id: Ie2675c0543d4e4ce0a60b92a6c06a79b9e2cb2cd
Signed-off-by: Roni Riska <roni.riska@nokia.com>
13 files changed:
cmd/vesmgr/config.go
cmd/vesmgr/config_test.go
cmd/vesmgr/httpserver.go
cmd/vesmgr/httpserver_test.go
cmd/vesmgr/main.go
cmd/vesmgr/subprocess.go [new file with mode: 0644]
cmd/vesmgr/subprocess_test.go [new file with mode: 0644]
cmd/vesmgr/subscribexAPPNotifications.go
cmd/vesmgr/subscribexAPPNotifications_test.go
cmd/vesmgr/vesmgr.go
cmd/vesmgr/vesmgr_queryxappconfig_test.go [moved from cmd/vesmgr/vesmgr_queryxappssttus_test.go with 67% similarity]
cmd/vesmgr/vesmgr_test.go
container-tag.yaml

index e2c9f25..8301b0c 100644 (file)
@@ -19,11 +19,12 @@ package main
 
 import (
        "encoding/json"
-       "gopkg.in/yaml.v2"
        "io"
        "os"
        "strconv"
        "time"
+
+       "gopkg.in/yaml.v2"
 )
 
 func basicVespaConf() VESAgentConfiguration {
@@ -65,12 +66,13 @@ func basicVespaConf() VESAgentConfiguration {
        return vespaconf
 }
 
+// AppMetricsStruct contains xapplication metrics definition
 type AppMetricsStruct struct {
        ObjectName     string
        ObjectInstance string
-       // xxx add labels here
 }
 
+// AppMetrics contains metrics definitions for all Xapps
 type AppMetrics map[string]AppMetricsStruct
 
 // Parses the metrics data from an array of bytes, which is expected to contain a JSON
@@ -89,10 +91,10 @@ func parseMetricsFromXAppDescriptor(descriptor []byte, appMetrics AppMetrics) Ap
        json.Unmarshal(descriptor, &desc)
 
        for _, app := range desc {
-               config, config_ok := app["config"]
-               if config_ok {
-                       metrics, metrics_ok := config.(map[string]interface{})["metrics"]
-                       if metrics_ok {
+               config, configOk := app["config"]
+               if configOk {
+                       metrics, metricsOk := config.(map[string]interface{})["metrics"]
+                       if metricsOk {
                                parseMetricsRules(metrics.([]interface{}), appMetrics)
                        }
                }
@@ -106,16 +108,16 @@ func parseMetricsFromXAppDescriptor(descriptor []byte, appMetrics AppMetrics) Ap
 // Entries, which do not have all the necessary fields, are ignored.
 func parseMetricsRules(metricsMap []interface{}, appMetrics AppMetrics) AppMetrics {
        for _, element := range metricsMap {
-               name, name_ok := element.(map[string]interface{})["name"].(string)
-               if name_ok {
-                       _, already_found := appMetrics[name]
-                       objectName, objectName_ok := element.(map[string]interface{})["objectName"].(string)
-                       objectInstance, objectInstance_ok := element.(map[string]interface{})["objectInstance"].(string)
-                       if !already_found && objectName_ok && objectInstance_ok {
+               name, nameOk := element.(map[string]interface{})["name"].(string)
+               if nameOk {
+                       _, alreadyFound := appMetrics[name]
+                       objectName, objectNameOk := element.(map[string]interface{})["objectName"].(string)
+                       objectInstance, objectInstanceOk := element.(map[string]interface{})["objectInstance"].(string)
+                       if !alreadyFound && objectNameOk && objectInstanceOk {
                                appMetrics[name] = AppMetricsStruct{objectName, objectInstance}
                                logger.Info("parsed counter %s %s %s", name, objectName, objectInstance)
                        }
-                       if already_found {
+                       if alreadyFound {
                                logger.Info("skipped duplicate counter %s", name)
                        }
                }
@@ -127,12 +129,12 @@ func getRules(vespaconf *VESAgentConfiguration, xAppConfig []byte) {
        appMetrics := make(AppMetrics)
        parseMetricsFromXAppDescriptor(xAppConfig, appMetrics)
 
-       makeRule := func(expr string, obj_name string, obj_instance string) MetricRule {
+       makeRule := func(expr string, objName string, objInstance string) MetricRule {
                return MetricRule{
                        Target:         "AdditionalObjects",
                        Expr:           expr,
-                       ObjectInstance: obj_instance,
-                       ObjectName:     obj_name,
+                       ObjectInstance: objInstance,
+                       ObjectName:     objName,
                        ObjectKeys: []Label{
                                Label{
                                        Name: "ricComponentName",
@@ -161,15 +163,15 @@ func getCollectorConfiguration(vespaconf *VESAgentConfiguration) {
        vespaconf.PrimaryCollector.FQDN = os.Getenv("VESMGR_PRICOLLECTOR_ADDR")
        vespaconf.PrimaryCollector.ServerRoot = os.Getenv("VESMGR_PRICOLLECTOR_SERVERROOT")
        vespaconf.PrimaryCollector.Topic = os.Getenv("VESMGR_PRICOLLECTOR_TOPIC")
-       port_str := os.Getenv("VESMGR_PRICOLLECTOR_PORT")
-       if port_str == "" {
+       portStr := os.Getenv("VESMGR_PRICOLLECTOR_PORT")
+       if portStr == "" {
                vespaconf.PrimaryCollector.Port = 8443
        } else {
-               port, _ := strconv.Atoi(port_str)
+               port, _ := strconv.Atoi(portStr)
                vespaconf.PrimaryCollector.Port = port
        }
-       secure_str := os.Getenv("VESMGR_PRICOLLECTOR_SECURE")
-       if secure_str == "true" {
+       secureStr := os.Getenv("VESMGR_PRICOLLECTOR_SECURE")
+       if secureStr == "true" {
                vespaconf.PrimaryCollector.Secure = true
        } else {
                vespaconf.PrimaryCollector.Secure = false
index 08bde19..76793a1 100644 (file)
@@ -19,12 +19,13 @@ package main
 import (
        "bytes"
        "encoding/json"
-       "github.com/stretchr/testify/assert"
-       "gopkg.in/yaml.v2"
        "io/ioutil"
        "os"
        "testing"
        "time"
+
+       "github.com/stretchr/testify/assert"
+       "gopkg.in/yaml.v2"
 )
 
 func testBaseConf(t *testing.T, vesconf VESAgentConfiguration) {
@@ -130,7 +131,7 @@ func metricsStringToInterfaceArray(metrics string) []interface{} {
 }
 
 func TestParseMetricsRules(t *testing.T) {
-       metricsJson := `{"metrics": [
+       metricsJSON := `{"metrics": [
                        { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" },
                        { "name": "ricxapp_RMR_ReceiveError", "objectName": "ricxappRMRReceiveErrorCounter", "objectInstance": "ricxappRMRReceiveError" },
                        { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" },
@@ -138,7 +139,7 @@ func TestParseMetricsRules(t *testing.T) {
                        { "name": "ricxapp_SDL_Stored", "objectName": "ricxappSDLStoredCounter", "objectInstance": "ricxappSDLStored" },
                        { "name": "ricxapp_SDL_StoreError", "objectName": "ricxappSDLStoreErrorCounter", "objectInstance": "ricxappSDLStoreError" } ]}`
        appMetrics := make(AppMetrics)
-       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       m := metricsStringToInterfaceArray(metricsJSON)
        appMetrics = parseMetricsRules(m, appMetrics)
        assert.Len(t, appMetrics, 6)
        assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
@@ -148,17 +149,17 @@ func TestParseMetricsRules(t *testing.T) {
 
 func TestParseMetricsRulesNoMetrics(t *testing.T) {
        appMetrics := make(AppMetrics)
-       metricsJson := `{"metrics": []`
-       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       metricsJSON := `{"metrics": []`
+       m := metricsStringToInterfaceArray(metricsJSON)
        appMetrics = parseMetricsRules(m, appMetrics)
        assert.Empty(t, appMetrics)
 }
 
 func TestParseMetricsRulesAdditionalFields(t *testing.T) {
        appMetrics := make(AppMetrics)
-       metricsJson := `{"metrics": [
+       metricsJSON := `{"metrics": [
                        { "additionalField": "valueIgnored", "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }]}`
-       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       m := metricsStringToInterfaceArray(metricsJSON)
        appMetrics = parseMetricsRules(m, appMetrics)
        assert.Len(t, appMetrics, 1)
        assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
@@ -167,11 +168,11 @@ func TestParseMetricsRulesAdditionalFields(t *testing.T) {
 
 func TestParseMetricsRulesMissingFields(t *testing.T) {
        appMetrics := make(AppMetrics)
-       metricsJson := `{"metrics": [
+       metricsJSON := `{"metrics": [
                        { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" },
                        { "name": "ricxapp_RMR_ReceiveError", "objectInstance": "ricxappRMRReceiveError" },
                        { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}`
-       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       m := metricsStringToInterfaceArray(metricsJSON)
        appMetrics = parseMetricsRules(m, appMetrics)
        assert.Len(t, appMetrics, 2)
        assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
@@ -182,11 +183,11 @@ func TestParseMetricsRulesMissingFields(t *testing.T) {
 
 func TestParseMetricsRulesDuplicateDefinitionIsIgnored(t *testing.T) {
        appMetrics := make(AppMetrics)
-       metricsJson := `{"metrics": [
+       metricsJSON := `{"metrics": [
                        { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" },
                        { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounterXXX", "objectInstance": "ricxappRMRReceivedXXX" },
                        { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}`
-       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       m := metricsStringToInterfaceArray(metricsJSON)
        appMetrics = parseMetricsRules(m, appMetrics)
        assert.Len(t, appMetrics, 2)
        assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
@@ -195,12 +196,12 @@ func TestParseMetricsRulesDuplicateDefinitionIsIgnored(t *testing.T) {
 
 func TestParseMetricsRulesIncrementalFillOfAppMetrics(t *testing.T) {
        appMetrics := make(AppMetrics)
-       metricsJson1 := `{"metrics": [
+       metricsJSON1 := `{"metrics": [
                        { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }]}`
-       metricsJson2 := `{"metrics": [
+       metricsJSON2 := `{"metrics": [
                        { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}`
-       var m1 []interface{} = metricsStringToInterfaceArray(metricsJson1)
-       var m2 []interface{} = metricsStringToInterfaceArray(metricsJson2)
+       m1 := metricsStringToInterfaceArray(metricsJSON1)
+       m2 := metricsStringToInterfaceArray(metricsJSON2)
        appMetrics = parseMetricsRules(m1, appMetrics)
        appMetrics = parseMetricsRules(m2, appMetrics)
        assert.Len(t, appMetrics, 2)
@@ -226,18 +227,18 @@ func TestParseXAppDescriptor(t *testing.T) {
 }
 
 func TestParseXAppDescriptorWithNoConfig(t *testing.T) {
-       metricsJson := `[{{"metadata": "something", "descriptor": "somethingelse"}},
+       metricsJSON := `[{{"metadata": "something", "descriptor": "somethingelse"}},
                         {{"metadata": "something", "descriptor": "somethingelse"}}]`
-       metricsBytes := []byte(metricsJson)
+       metricsBytes := []byte(metricsJSON)
        appMetrics := make(AppMetrics)
        appMetrics = parseMetricsFromXAppDescriptor(metricsBytes, appMetrics)
        assert.Empty(t, appMetrics)
 }
 
 func TestParseXAppDescriptorWithNoMetrics(t *testing.T) {
-       metricsJson := `[{{"metadata": "something", "descriptor": "somethingelse", "config":{}},
+       metricsJSON := `[{{"metadata": "something", "descriptor": "somethingelse", "config":{}},
                         {{"metadata": "something", "descriptor": "somethingelse", "config":{}}}]`
-       metricsBytes := []byte(metricsJson)
+       metricsBytes := []byte(metricsJSON)
        appMetrics := make(AppMetrics)
        appMetrics = parseMetricsFromXAppDescriptor(metricsBytes, appMetrics)
        assert.Empty(t, appMetrics)
index 585319e..2564ef3 100644 (file)
@@ -24,28 +24,47 @@ import (
        "net/http"
 )
 
-const SupervisionUrl = "/supervision/"
+// SupervisionURL is the url where kubernetes posts alive queries
+const SupervisionURL = "/supervision/"
 
-func startHttpServer(listener net.Listener, xappnotifUrl string, notif_ch chan []byte, supervision_ch chan chan string) {
-       go runHttpServer(listener, xappnotifUrl, notif_ch, supervision_ch)
+// HTTPServer is the VesMgr HTTP server struct
+type HTTPServer struct {
+       listener net.Listener
 }
 
-func runHttpServer(listener net.Listener, xappNotifUrl string, notif_ch chan []byte, supervision_ch chan chan string) {
+func (s *HTTPServer) init(address string) *HTTPServer {
+       var err error
+       s.listener, err = net.Listen("tcp", address)
+       if err != nil {
+               panic("Cannot listen:" + err.Error())
+       }
+       return s
+}
+
+func (s *HTTPServer) start(notifPath string, notifCh chan []byte, supCh chan chan string) {
+       go runHTTPServer(s.listener, notifPath, notifCh, supCh)
+}
+
+func (s *HTTPServer) addr() net.Addr {
+       return s.listener.Addr()
+}
+
+func runHTTPServer(listener net.Listener, xappNotifURL string, notifCh chan []byte, supervisionCh chan chan string) {
 
        logger.Info("vesmgr http server serving at %s", listener.Addr())
 
-       http.HandleFunc(xappNotifUrl, func(w http.ResponseWriter, r *http.Request) {
+       http.HandleFunc(xappNotifURL, func(w http.ResponseWriter, r *http.Request) {
 
                switch r.Method {
                case "POST":
-                       logger.Info("httpServer: POST in %s", xappNotifUrl)
+                       logger.Info("httpServer: POST in %s", xappNotifURL)
                        body, err := ioutil.ReadAll(r.Body)
                        defer r.Body.Close()
                        if err != nil {
                                logger.Error("httpServer: Invalid body in POST request")
                                return
                        }
-                       notif_ch <- body
+                       notifCh <- body
                        return
                default:
                        logger.Error("httpServer: Invalid method %s to %s", r.Method, r.URL.Path)
@@ -54,15 +73,15 @@ func runHttpServer(listener net.Listener, xappNotifUrl string, notif_ch chan []b
                }
        })
 
-       http.HandleFunc(SupervisionUrl, func(w http.ResponseWriter, r *http.Request) {
+       http.HandleFunc(SupervisionURL, func(w http.ResponseWriter, r *http.Request) {
 
                switch r.Method {
                case "GET":
                        logger.Info("httpServer: GET supervision")
-                       supervision_ack_ch := make(chan string)
+                       supervisionAckCh := make(chan string)
                        // send supervision to the main loop
-                       supervision_ch <- supervision_ack_ch
-                       reply := <-supervision_ack_ch
+                       supervisionCh <- supervisionAckCh
+                       reply := <-supervisionAckCh
                        logger.Info("httpServer: supervision ack from the main loop: %s", reply)
                        fmt.Fprintf(w, reply)
                        return
index d52f4b2..c2946aa 100644 (file)
 package main
 
 import (
-       "github.com/stretchr/testify/suite"
        "io/ioutil"
-       "net"
        "net/http"
        "os"
        "strings"
        "testing"
+
+       "github.com/stretchr/testify/suite"
 )
 
-type HttpServerTestSuite struct {
+type HTTPServerTestSuite struct {
        suite.Suite
-       listener       net.Listener
-       ch_notif       chan []byte
-       ch_supervision chan chan string
+       chNotif       chan []byte
+       chSupervision chan chan string
+       server        HTTPServer
 }
 
 // suite setup creates the HTTP server
-func (suite *HttpServerTestSuite) SetupSuite() {
+func (suite *HTTPServerTestSuite) SetupSuite() {
        os.Unsetenv("http_proxy")
        os.Unsetenv("HTTP_PROXY")
-       var err error
-       suite.listener, err = net.Listen("tcp", ":0")
-       suite.Nil(err)
-       suite.ch_notif = make(chan []byte)
-       suite.ch_supervision = make(chan chan string)
-       startHttpServer(suite.listener, "/vesmgr_notif/", suite.ch_notif, suite.ch_supervision)
+       suite.chNotif = make(chan []byte)
+       suite.chSupervision = make(chan chan string)
+       suite.server = HTTPServer{}
+       suite.server.init(":0")
+       suite.server.start("/vesmgr_notif/", suite.chNotif, suite.chSupervision)
 }
 
-func (suite *HttpServerTestSuite) TestHtppServerSupervisionInvalidOperation() {
-       resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+SupervisionUrl, "supervision")
+func (suite *HTTPServerTestSuite) TestHtppServerSupervisionInvalidOperation() {
+       resp, reply := suite.doPost("http://"+suite.server.addr().String()+SupervisionURL, "supervision")
        suite.Equal("405 method not allowed\n", reply)
        suite.Equal(405, resp.StatusCode)
        suite.Equal("405 Method Not Allowed", resp.Status)
 }
 
-func (suite *HttpServerTestSuite) doGet(url string) (*http.Response, string) {
+func (suite *HTTPServerTestSuite) doGet(url string) (*http.Response, string) {
        resp, err := http.Get(url)
        suite.Nil(err)
 
@@ -62,8 +61,8 @@ func (suite *HttpServerTestSuite) doGet(url string) (*http.Response, string) {
        return resp, string(contents)
 }
 
-func (suite *HttpServerTestSuite) doPost(serverUrl string, msg string) (*http.Response, string) {
-       resp, err := http.Post(serverUrl, "data", strings.NewReader(msg))
+func (suite *HTTPServerTestSuite) doPost(serverURL string, msg string) (*http.Response, string) {
+       resp, err := http.Post(serverURL, "data", strings.NewReader(msg))
        suite.Nil(err)
 
        defer resp.Body.Close()
@@ -72,41 +71,41 @@ func (suite *HttpServerTestSuite) doPost(serverUrl string, msg string) (*http.Re
        return resp, string(contents)
 }
 
-func replySupervision(ch_supervision chan chan string, reply string) {
-       ch_supervision_ack := <-ch_supervision
-       ch_supervision_ack <- reply
+func replySupervision(chSupervision chan chan string, reply string) {
+       chSupervisionAck := <-chSupervision
+       chSupervisionAck <- reply
 }
 
-func (suite *HttpServerTestSuite) TestHttpServerSupervision() {
+func (suite *HTTPServerTestSuite) TestHttpServerSupervision() {
 
        // start the "main loop" to reply to the supervision to the HTTPServer
-       go replySupervision(suite.ch_supervision, "I'm just fine")
+       go replySupervision(suite.chSupervision, "I'm just fine")
 
-       resp, reply := suite.doGet("http://" + suite.listener.Addr().String() + SupervisionUrl)
+       resp, reply := suite.doGet("http://" + suite.server.addr().String() + SupervisionURL)
 
        suite.Equal("I'm just fine", reply)
        suite.Equal(200, resp.StatusCode)
        suite.Equal("200 OK", resp.Status)
 }
 
-func (suite *HttpServerTestSuite) TestHttpServerInvalidUrl() {
-       resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+"/invalid_url", "foo")
+func (suite *HTTPServerTestSuite) TestHttpServerInvalidUrl() {
+       resp, reply := suite.doPost("http://"+suite.server.addr().String()+"/invalid_url", "foo")
        suite.Equal("404 page not found\n", reply)
        suite.Equal(404, resp.StatusCode)
        suite.Equal("404 Not Found", resp.Status)
 }
 
-func readXAppNotification(ch_notif chan []byte, ch chan []byte) {
-       notification := <-ch_notif
+func readXAppNotification(chNotif chan []byte, ch chan []byte) {
+       notification := <-chNotif
        ch <- notification
 }
 
-func (suite *HttpServerTestSuite) TestHttpServerXappNotif() {
+func (suite *HTTPServerTestSuite) TestHttpServerXappNotif() {
        // start the "main loop" to receive the xAppNotification message from the HTTPServer
        ch := make(chan []byte)
-       go readXAppNotification(suite.ch_notif, ch)
+       go readXAppNotification(suite.chNotif, ch)
 
-       resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+"/vesmgr_notif/", "test data")
+       resp, reply := suite.doPost("http://"+suite.server.addr().String()+"/vesmgr_notif/", "test data")
        suite.Equal("", reply)
        suite.Equal(200, resp.StatusCode)
        suite.Equal("200 OK", resp.Status)
@@ -114,13 +113,13 @@ func (suite *HttpServerTestSuite) TestHttpServerXappNotif() {
        suite.Equal([]byte("test data"), notification)
 }
 
-func (suite *HttpServerTestSuite) TestHttpServerXappNotifInvalidOperation() {
-       resp, reply := suite.doGet("http://" + suite.listener.Addr().String() + "/vesmgr_notif/")
+func (suite *HTTPServerTestSuite) TestHttpServerXappNotifInvalidOperation() {
+       resp, reply := suite.doGet("http://" + suite.server.addr().String() + "/vesmgr_notif/")
        suite.Equal("405 method not allowed\n", reply)
        suite.Equal(405, resp.StatusCode)
        suite.Equal("405 Method Not Allowed", resp.Status)
 }
 
 func TestHttpServerSuite(t *testing.T) {
-       suite.Run(t, new(HttpServerTestSuite))
+       suite.Run(t, new(HTTPServerTestSuite))
 }
index 68e5c0c..cd3d14e 100644 (file)
@@ -18,5 +18,6 @@
 package main
 
 func main() {
-       vesmgrInit()
+       vesmgr := VesMgr{}
+       vesmgr.Init(vesmgrXappNotifPort).Run()
 }
diff --git a/cmd/vesmgr/subprocess.go b/cmd/vesmgr/subprocess.go
new file mode 100644 (file)
index 0000000..0071c20
--- /dev/null
@@ -0,0 +1,35 @@
+package main
+
+import (
+       "os"
+       "os/exec"
+)
+
+type cmdRunner struct {
+       exe  string
+       args []string
+       cmd  *exec.Cmd
+}
+
+func (r *cmdRunner) run(result chan error) {
+       r.cmd = exec.Command(r.exe, r.args...)
+       r.cmd.Stdout = os.Stdout
+       r.cmd.Stderr = os.Stderr
+       err := r.cmd.Start()
+       go func() {
+               if err != nil {
+                       result <- err
+               } else {
+                       result <- r.cmd.Wait()
+               }
+       }()
+}
+
+func (r *cmdRunner) kill() error {
+       return r.cmd.Process.Kill()
+}
+
+func makeRunner(exe string, arg ...string) cmdRunner {
+       r := cmdRunner{exe: exe, args: arg}
+       return r
+}
diff --git a/cmd/vesmgr/subprocess_test.go b/cmd/vesmgr/subprocess_test.go
new file mode 100644 (file)
index 0000000..bf460d9
--- /dev/null
@@ -0,0 +1,31 @@
+package main
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestProcessRunning(t *testing.T) {
+       r := makeRunner("echo", "a")
+       ch := make(chan error)
+       r.run(ch)
+       err := <-ch
+       assert.Nil(t, err)
+}
+
+func TestProcessKill(t *testing.T) {
+       r := makeRunner("sleep", "20")
+       ch := make(chan error)
+       r.run(ch)
+       assert.Nil(t, r.kill())
+       <-ch // wait and seee that kills is actually done
+}
+
+func TestProcessRunningFails(t *testing.T) {
+       r := makeRunner("foobarbaz")
+       ch := make(chan error)
+       r.run(ch)
+       err := <-ch
+       assert.NotNil(t, err)
+}
index 6f8ed77..1d3cc22 100644 (file)
@@ -30,60 +30,58 @@ import (
 // appmgr API
 const appmgrSubsPath = "/ric/v1/subscriptions"
 
-var errPostingFailed error = errors.New("Posting subscriptions failed")
-var errWrongStatusCode error = errors.New("Wrong subscriptions response StatusCode")
+var errPostingFailed = errors.New("Posting subscriptions failed")
+var errWrongStatusCode = errors.New("Wrong subscriptions response StatusCode")
 
-func subscribexAppNotifications(targetUrl string, subscriptions chan subsChannel, timeout time.Duration, subsUrl string) {
-       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, targetUrl))
-       req, err := http.NewRequest("POST", subsUrl, bytes.NewBuffer(requestBody))
+func subscribexAppNotifications(targetURL string, subscriptions chan subscriptionNotification, timeout time.Duration, subsURL string) {
+       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, targetURL))
+       req, err := http.NewRequest("POST", subsURL, bytes.NewBuffer(requestBody))
        if err != nil {
                logger.Error("Setting NewRequest failed: %s", err)
-               subscriptions <- subsChannel{false, err}
+               subscriptions <- subscriptionNotification{false, err, ""}
                return
        }
        req.Header.Set("Content-Type", "application/json")
        client := &http.Client{}
        client.Timeout = time.Second * timeout
+       var subsID string
        for {
-               err := subscribexAppNotificationsClientDo(req, client)
+               subsID, err = subscribexAppNotificationsClientDo(req, client)
                if err == nil {
                        break
                } else if err != errPostingFailed && err != errWrongStatusCode {
-                       subscriptions <- subsChannel{false, err}
+                       subscriptions <- subscriptionNotification{false, err, ""}
                        return
                }
                time.Sleep(5 * time.Second)
        }
-       subscriptions <- subsChannel{true, nil}
+       subscriptions <- subscriptionNotification{true, nil, subsID}
 }
 
-func subscribexAppNotificationsClientDo(req *http.Request, client *http.Client) error {
+func subscribexAppNotificationsClientDo(req *http.Request, client *http.Client) (string, error) {
        resp, err := client.Do(req)
        if err != nil {
                logger.Error("Posting subscriptions failed: %s", err)
-               return errPostingFailed
-       } else {
-               defer resp.Body.Close()
-               if resp.StatusCode == http.StatusCreated {
-                       logger.Info("Subscriptions response StatusCode: %d", resp.StatusCode)
-                       logger.Info("Subscriptions response headers: %s", resp.Header)
-                       body, err := ioutil.ReadAll(resp.Body)
-                       if err != nil {
-                               logger.Error("Subscriptions response Body read failed: %s", err)
-                               return err
-                       }
-                       logger.Info("Response Body: %s", body)
-                       var result map[string]interface{}
-                       if err := json.Unmarshal([]byte(body), &result); err != nil {
-                               logger.Error("json.Unmarshal failed: %s", err)
-                               return err
-                       }
-                       logger.Info("Subscription id from the response: %s", result["id"].(string))
-                       vesmgr.appmgrSubsId = result["id"].(string)
-                       return nil
-               } else {
-                       logger.Error("Wrong subscriptions response StatusCode: %d", resp.StatusCode)
-                       return errWrongStatusCode
+               return "", errPostingFailed
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode == http.StatusCreated {
+               logger.Info("Subscriptions response StatusCode: %d", resp.StatusCode)
+               logger.Info("Subscriptions response headers: %s", resp.Header)
+               body, err := ioutil.ReadAll(resp.Body)
+               if err != nil {
+                       logger.Error("Subscriptions response Body read failed: %s", err)
+                       return "", err
+               }
+               logger.Info("Response Body: %s", body)
+               var result map[string]interface{}
+               if err := json.Unmarshal([]byte(body), &result); err != nil {
+                       logger.Error("json.Unmarshal failed: %s", err)
+                       return "", err
                }
+               logger.Info("Subscription id from the response: %s", result["id"].(string))
+               return result["id"].(string), nil
        }
+       logger.Error("Wrong subscriptions response StatusCode: %d", resp.StatusCode)
+       return "", errWrongStatusCode
 }
index 31621b3..6305f40 100644 (file)
@@ -21,38 +21,33 @@ import (
        "bytes"
        "encoding/json"
        "fmt"
-       "github.com/stretchr/testify/suite"
        "io/ioutil"
        "net/http"
        "net/http/httptest"
        "testing"
+
+       "github.com/stretchr/testify/suite"
 )
 
-type AppmgrHttpServerTestSuite struct {
+type AppmgrHTTPServerTestSuite struct {
        suite.Suite
-       subscriptions chan subsChannel
-       xappNotifUrl  string
+       subscriptions chan subscriptionNotification
+       xappNotifURL  string
 }
 
 // suite setup
-func (suite *AppmgrHttpServerTestSuite) SetupSuite() {
-       vesmgr.appmgrSubsId = string("")
-       vesmgr.myIPAddress, _ = getMyIP()
-       suite.xappNotifUrl = "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
-       suite.subscriptions = make(chan subsChannel)
+func (suite *AppmgrHTTPServerTestSuite) SetupSuite() {
+       // the url here is not actually used anywhere
+       suite.xappNotifURL = "http://127.0.0.1:8080" + vesmgrXappNotifPath
+       suite.subscriptions = make(chan subscriptionNotification)
 }
 
 // test setup
-func (suite *AppmgrHttpServerTestSuite) SetupTest() {
-       suite.subscriptions = make(chan subsChannel)
-}
-
-// test teardown
-func (suite *AppmgrHttpServerTestSuite) TearDownTest() {
-       vesmgr.appmgrSubsId = string("")
+func (suite *AppmgrHTTPServerTestSuite) SetupTest() {
+       suite.subscriptions = make(chan subscriptionNotification)
 }
 
-func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotifications() {
+func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotifications() {
        testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
                body, _ := ioutil.ReadAll(req.Body)
                var result map[string]interface{}
@@ -68,13 +63,13 @@ func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotifications() {
        }))
        defer testServer.Close()
 
-       go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL)
+       go subscribexAppNotifications(suite.xappNotifURL, suite.subscriptions, 1, testServer.URL)
        isSubscribed := <-suite.subscriptions
        suite.Nil(isSubscribed.err)
-       suite.Equal("deadbeef1234567890", vesmgr.appmgrSubsId)
+       suite.Equal("deadbeef1234567890", isSubscribed.subsID)
 }
 
-func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsWrongStatus() {
+func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotificationsWrongStatus() {
        testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
                res.Header().Add("Content-Type", "application/json")
                res.WriteHeader(http.StatusUnauthorized)
@@ -82,32 +77,32 @@ func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsWrongStatu
        }))
        defer testServer.Close()
 
-       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifUrl))
+       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifURL))
        req, _ := http.NewRequest("POST", testServer.URL, bytes.NewBuffer(requestBody))
        req.Header.Set("Content-Type", "application/json")
        client := &http.Client{}
 
-       err := subscribexAppNotificationsClientDo(req, client)
+       subsID, err := subscribexAppNotificationsClientDo(req, client)
        suite.Equal(errWrongStatusCode, err)
        // after failed POST vesmgr.appmgrSubsId holds an initial values
-       suite.Equal("", vesmgr.appmgrSubsId)
+       suite.Equal("", subsID)
 }
 
-func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsWrongUrl() {
+func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotificationsWrongUrl() {
        // use fake appmgrUrl that is not served in unit test
-       appmgrUrl := "/I_do_not_exist/"
-       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifUrl))
-       req, _ := http.NewRequest("POST", appmgrUrl, bytes.NewBuffer(requestBody))
+       appmgrURL := "/I_do_not_exist/"
+       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifURL))
+       req, _ := http.NewRequest("POST", appmgrURL, bytes.NewBuffer(requestBody))
        req.Header.Set("Content-Type", "application/json")
        client := &http.Client{}
 
-       err := subscribexAppNotificationsClientDo(req, client)
+       subsID, err := subscribexAppNotificationsClientDo(req, client)
        suite.Equal(errPostingFailed, err)
        // after failed POST vesmgr.appmgrSubsId holds an initial values
-       suite.Equal("", vesmgr.appmgrSubsId)
+       suite.Equal("", subsID)
 }
 
-func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsReadBodyFails() {
+func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotificationsReadBodyFails() {
        testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
                res.Header().Set("Content-Length", "1")
                res.Header().Add("Content-Type", "application/json")
@@ -115,13 +110,13 @@ func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsReadBodyFa
        }))
        defer testServer.Close()
 
-       go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL)
+       go subscribexAppNotifications(suite.xappNotifURL, suite.subscriptions, 1, testServer.URL)
        isSubscribed := <-suite.subscriptions
        suite.Equal("unexpected EOF", isSubscribed.err.Error())
-       suite.Equal("", vesmgr.appmgrSubsId)
+       suite.Equal("", isSubscribed.subsID)
 }
 
-func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsUnMarshalFails() {
+func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotificationsUnMarshalFails() {
        testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
                res.Header().Add("Content-Type", "application/json")
                res.WriteHeader(http.StatusCreated)
@@ -129,12 +124,12 @@ func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsUnMarshalF
        }))
        defer testServer.Close()
 
-       go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL)
+       go subscribexAppNotifications(suite.xappNotifURL, suite.subscriptions, 1, testServer.URL)
        isSubscribed := <-suite.subscriptions
        suite.Equal("invalid character 'd' after object key", isSubscribed.err.Error())
-       suite.Equal("", vesmgr.appmgrSubsId)
+       suite.Equal("", isSubscribed.subsID)
 }
 
 func TestAppmgrHttpServerTestSuite(t *testing.T) {
-       suite.Run(t, new(AppmgrHttpServerTestSuite))
+       suite.Run(t, new(AppmgrHTTPServerTestSuite))
 }
index 0e27838..aeebb15 100755 (executable)
@@ -23,7 +23,6 @@ import (
        "net"
        "net/http"
        "os"
-       "os/exec"
        "time"
 
        mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
@@ -34,29 +33,29 @@ var appmgrDomain string
 const appmgrXAppConfigPath = "/ric/v1/config"
 const appmgrPort = "8080"
 
-type VesAgent struct {
-       Pid     int
-       name    string
-       process *os.Process
-}
-
+// VesMgr contains runtime information of the vesmgr process
 type VesMgr struct {
-       myIPAddress  string
-       appmgrSubsId string
+       myIPAddress         string
+       chXAppSubscriptions chan subscriptionNotification
+       chXAppNotifications chan []byte
+       chSupervision       chan chan string
+       chVesagent          chan error
+       vesagent            cmdRunner
+       httpServer          HTTPServer
 }
 
-type subsChannel struct {
+type subscriptionNotification struct {
        subscribed bool
        err        error
+       subsID     string
 }
 
-var vesagent VesAgent
-var vesmgr VesMgr
 var logger *mdcloggo.MdcLogger
 
 const vesmgrXappNotifPort = "8080"
 const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
 const timeoutPostXAppSubscriptions = 5
+const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml"
 
 func init() {
        logger, _ = mdcloggo.InitLogger("vesmgr")
@@ -80,8 +79,8 @@ func getMyIP() (myIP string, retErr error) {
        return "", nil
 }
 
-func createConf(xappMetrics []byte) {
-       f, err := os.Create("/etc/ves-agent/ves-agent.yaml")
+func createConf(fname string, xappMetrics []byte) {
+       f, err := os.Create(fname)
        if err != nil {
                logger.Error("Cannot create vespa conf file: %s", err.Error())
                os.Exit(1)
@@ -92,21 +91,20 @@ func createConf(xappMetrics []byte) {
        logger.Info("Vespa config created")
 }
 
-func subscribeXAppNotifications(chSubscriptions chan subsChannel) {
-       xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
-       subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
-       go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl)
-       logger.Info("xApp notifications subscribed from %s", subsUrl)
+func (vesmgr *VesMgr) subscribeXAppNotifications() {
+       xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
+       subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
+       go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL)
+       logger.Info("xApp notifications subscribed from %s", subsURL)
 }
 
-func vesmgrInit() {
-       vesagent.name = "ves-agent"
+// Init initializes the vesmgr
+func (vesmgr *VesMgr) Init(listenPort string) *VesMgr {
        logger.Info("vesmgrInit")
-
        var err error
        if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
                logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error())
-               return
+               panic("Cannot get my IP address")
        }
 
        var ok bool
@@ -117,138 +115,134 @@ func vesmgrInit() {
                appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
                logger.Info("Using default appmgrdomain %s", appmgrDomain)
        }
-       chXAppSubscriptions := make(chan subsChannel)
-       chXAppNotifications := make(chan []byte)
-       chSupervision := make(chan chan string)
-       chVesagent := make(chan error)
-
-       listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort)
-       startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision)
-
-       subscribeXAppNotifications(chXAppSubscriptions)
-
-       runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions)
+       vesmgr.chXAppSubscriptions = make(chan subscriptionNotification)
+       // Create notifications as buffered channel so that
+       // xappmgr does not block if we are stuck somewhere
+       vesmgr.chXAppNotifications = make(chan []byte, 10)
+       vesmgr.chSupervision = make(chan chan string)
+       vesmgr.chVesagent = make(chan error)
+       vesmgr.httpServer = HTTPServer{}
+       vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort)
+       vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"),
+               "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address",
+               os.Getenv("VESMGR_PROMETHEUS_ADDR"))
+       return vesmgr
 }
 
-func startVesagent(ch chan error) {
-       cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"))
-       cmd.Stdout = os.Stdout
-       cmd.Stderr = os.Stderr
-       if err := cmd.Start(); err != nil {
-               logger.Error("vesmgr exiting, ves-agent start failed: %s", err)
-               go func() {
-                       ch <- err
-               }()
-       } else {
-               logger.Info("ves-agent started with pid %d", cmd.Process.Pid)
-               vesagent.Pid = cmd.Process.Pid
-               vesagent.process = cmd.Process
-               go func() {
-                       // wait ves-agent exit and then post the error to the channel
-                       err := cmd.Wait()
-                       ch <- err
-               }()
-       }
+func (vesmgr *VesMgr) startVesagent() {
+       vesmgr.vesagent.run(vesmgr.chVesagent)
 }
 
-func killVespa(process *os.Process) {
+func (vesmgr *VesMgr) killVespa() error {
        logger.Info("Killing vespa")
-       err := process.Kill()
+       err := vesmgr.vesagent.kill()
        if err != nil {
                logger.Error("Cannot kill vespa: %s", err.Error())
+               return err
        }
+       return <-vesmgr.chVesagent // wait vespa exit
 }
 
-func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) {
-
-       logger.Info("query xAppStatus started, url %s", appmgrUrl)
-       req, err := http.NewRequest("GET", appmgrUrl, nil)
+func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) {
+       emptyConfig := []byte("{}")
+       logger.Info("query xAppConfig started, url %s", appmgrURL)
+       req, err := http.NewRequest("GET", appmgrURL, nil)
        if err != nil {
                logger.Error("Failed to create a HTTP request: %s", err)
-               return nil, err
+               return emptyConfig, err
        }
        req.Header.Set("Content-Type", "application/json")
        client := &http.Client{}
        client.Timeout = time.Second * timeout
        resp, err := client.Do(req)
        if err != nil {
-               logger.Error("Query xApp status failed: %s", err)
-               return nil, err
+               logger.Error("Query xApp config failed: %s", err)
+               return emptyConfig, err
        }
        defer resp.Body.Close()
        if resp.StatusCode == http.StatusOK {
                body, err := ioutil.ReadAll(resp.Body)
                if err != nil {
-                       logger.Error("Failed to read xApp status body: %s", err)
-                       return nil, err
+                       logger.Error("Failed to read xApp config body: %s", err)
+                       return emptyConfig, err
                }
-               logger.Info("query xAppStatus completed")
+               logger.Info("query xAppConfig completed")
                return body, nil
-       } else {
-               logger.Error("Error from xApp status query: %s", resp.Status)
-               return nil, errors.New(resp.Status)
        }
+       logger.Error("Error from xApp config query: %s", resp.Status)
+       return emptyConfig, errors.New(resp.Status)
 }
 
-type state int
-
-const (
-       normalState           state = iota
-       vespaTerminatingState state = iota
-)
-
 func queryConf() ([]byte, error) {
-       return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
+       return queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
                10*time.Second)
 }
 
-func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) {
+func (vesmgr *VesMgr) emptyNotificationsChannel() {
+       for {
+               select {
+               case <-vesmgr.chXAppNotifications:
+                       // we don't care the content
+               default:
+                       return
+               }
+       }
+}
+
+func (vesmgr *VesMgr) servRequest() {
+       select {
+       case supervision := <-vesmgr.chSupervision:
+               logger.Info("vesmgr: supervision")
+               supervision <- "OK"
+       case xAppNotif := <-vesmgr.chXAppNotifications:
+               logger.Info("vesmgr: xApp notification")
+               logger.Info(string(xAppNotif))
+               vesmgr.emptyNotificationsChannel()
+               /*
+               * If xapp config query fails then we cannot create
+               * a new configuration and kill vespa.
+               * In that case we assume that
+               * the situation is fixed when the next
+               * xapp notif comes
+                */
+               xappConfig, err := queryConf()
+               if err == nil {
+                       vesmgr.killVespa()
+                       createConf(vespaConfigFile, xappConfig)
+                       vesmgr.startVesagent()
+               }
+       case err := <-vesmgr.chVesagent:
+               logger.Error("Vesagent exited: " + err.Error())
+               os.Exit(1)
+       }
+}
 
-       logger.Info("vesmgr main loop ready")
-       mystate := normalState
-       var xappStatus []byte
-       var err error
+func (vesmgr *VesMgr) waitSubscriptionLoop() {
        for {
                select {
-               case supervision := <-chSupervision:
+               case supervision := <-vesmgr.chSupervision:
                        logger.Info("vesmgr: supervision")
                        supervision <- "OK"
-               case xAppNotif := <-chXAppNotifications:
-                       logger.Info("vesmgr: xApp notification")
-                       logger.Info(string(xAppNotif))
-                       /*
-                        * If xapp status query fails then we cannot create
-                        * a new configuration and kill vespa.
-                        * In that case we assume that
-                        * the situation is fixed when the next
-                        * xapp notif comes
-                        */
-                       xappStatus, err = queryConf()
-                       if err == nil {
-                               killVespa(vesagent.process)
-                               mystate = vespaTerminatingState
-                       }
-               case err := <-chVesagent:
-                       switch mystate {
-                       case vespaTerminatingState:
-                               logger.Info("Vesagent termination completed")
-                               createConf(xappStatus)
-                               startVesagent(chVesagent)
-                               mystate = normalState
-                       default:
-                               logger.Error("Vesagent exited: " + err.Error())
-                               os.Exit(1)
-                       }
-               case isSubscribed := <-chXAppSubscriptions:
+               case isSubscribed := <-vesmgr.chXAppSubscriptions:
                        if isSubscribed.err != nil {
                                logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
                                os.Exit(1)
                        }
-                       xappStatus, err = queryConf()
-                       if err == nil {
-                               createConf(xappStatus)
-                               startVesagent(chVesagent)
-                       }
+                       return
                }
        }
 }
+
+// Run the vesmgr process main loop
+func (vesmgr *VesMgr) Run() {
+       logger.Info("vesmgr main loop ready")
+       vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision)
+       vesmgr.subscribeXAppNotifications()
+       vesmgr.waitSubscriptionLoop()
+       xappConfig, _ := queryConf()
+       createConf(vespaConfigFile, xappConfig)
+       vesmgr.startVesagent()
+       for {
+               vesmgr.servRequest()
+       }
+}
similarity index 67%
rename from cmd/vesmgr/vesmgr_queryxappssttus_test.go
rename to cmd/vesmgr/vesmgr_queryxappconfig_test.go
index 5f0f36c..e9bcbfe 100644 (file)
@@ -19,25 +19,26 @@ package main
 
 import (
        "fmt"
-       "github.com/stretchr/testify/suite"
        "net"
        "net/http"
        "net/url"
        "os"
        "testing"
        "time"
+
+       "github.com/stretchr/testify/suite"
 )
 
 type do func(w http.ResponseWriter)
 
-type QueryXAppsStatusTestSuite struct {
+type QueryXAppsConfigTestSuite struct {
        suite.Suite
        listener    net.Listener
        xAppMgrFunc do
 }
 
 // suite setup creates the HTTP server
-func (suite *QueryXAppsStatusTestSuite) SetupSuite() {
+func (suite *QueryXAppsConfigTestSuite) SetupSuite() {
        os.Unsetenv("http_proxy")
        os.Unsetenv("HTTP_PROXY")
        var err error
@@ -46,7 +47,7 @@ func (suite *QueryXAppsStatusTestSuite) SetupSuite() {
        go runXAppMgr(suite.listener, "/test_url/", suite)
 }
 
-func runXAppMgr(listener net.Listener, url string, suite *QueryXAppsStatusTestSuite) {
+func runXAppMgr(listener net.Listener, url string, suite *QueryXAppsConfigTestSuite) {
 
        http.HandleFunc(url, func(w http.ResponseWriter, r *http.Request) {
                switch r.Method {
@@ -57,44 +58,44 @@ func runXAppMgr(listener net.Listener, url string, suite *QueryXAppsStatusTestSu
        http.Serve(listener, nil)
 }
 
-func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusFailsWithTimeout() {
-       do_sleep := func(w http.ResponseWriter) {
+func (suite *QueryXAppsConfigTestSuite) TestQueryXAppsConfigFailsWithTimeout() {
+       doSleep := func(w http.ResponseWriter) {
                time.Sleep(time.Second * 2)
        }
-       suite.xAppMgrFunc = do_sleep
+       suite.xAppMgrFunc = doSleep
 
-       data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1)
-       suite.Nil(data)
+       data, err := queryXAppsConfig("http://"+suite.listener.Addr().String()+"/test_url/", 1)
+       suite.Equal([]byte("{}"), data)
        suite.NotNil(err)
        e, ok := err.(*url.Error)
        suite.Equal(ok, true)
        suite.Equal(e.Timeout(), true)
 }
 
-func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusFailsWithAnErrorReply() {
-       do_reply_with_err := func(w http.ResponseWriter) {
+func (suite *QueryXAppsConfigTestSuite) TestQueryXAppsConfigFailsWithAnErrorReply() {
+       doReplyWithErr := func(w http.ResponseWriter) {
                http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed)
        }
-       suite.xAppMgrFunc = do_reply_with_err
+       suite.xAppMgrFunc = doReplyWithErr
 
-       data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1)
-       suite.Nil(data)
+       data, err := queryXAppsConfig("http://"+suite.listener.Addr().String()+"/test_url/", 1)
+       suite.Equal([]byte("{}"), data)
        suite.NotNil(err)
        suite.Equal("405 Method Not Allowed", err.Error())
 }
 
-func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusOk() {
-       do_reply := func(w http.ResponseWriter) {
+func (suite *QueryXAppsConfigTestSuite) TestQueryXAppsConfigOk() {
+       doReply := func(w http.ResponseWriter) {
                fmt.Fprintf(w, "reply message")
        }
-       suite.xAppMgrFunc = do_reply
+       suite.xAppMgrFunc = doReply
 
-       data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1)
+       data, err := queryXAppsConfig("http://"+suite.listener.Addr().String()+"/test_url/", 1)
        suite.NotNil(data)
        suite.Nil(err)
        suite.Equal(data, []byte("reply message"))
 }
 
-func TestQueryXAppsStatusTestSuite(t *testing.T) {
-       suite.Run(t, new(QueryXAppsStatusTestSuite))
+func TestQueryXAppsConfigTestSuite(t *testing.T) {
+       suite.Run(t, new(QueryXAppsConfigTestSuite))
 }
index 19dd60d..a9b9275 100644 (file)
@@ -19,90 +19,123 @@ package main
 
 import (
        "errors"
-       "github.com/stretchr/testify/assert"
        "os"
        "os/exec"
+       "path/filepath"
+       "strconv"
        "testing"
        "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/suite"
 )
 
-func init() {
-       vesagent.name = "echo" // no need to run real ves-agent
-       logger.MdcAdd("Testvesmgr", "0.0.1")
-       os.Setenv("VESMGR_HB_INTERVAL", "30s")
-       os.Setenv("VESMGR_MEAS_INTERVAL", "30s")
-       os.Setenv("VESMGR_PRICOLLECTOR_ADDR", "127.1.1.1")
-       os.Setenv("VESMGR_PRICOLLECTOR_PORT", "8443")
-       os.Setenv("VESMGR_PROMETHEUS_ADDR", "http://localhost:9090")
+func TestGetMyIP(t *testing.T) {
+       myIPAddress, err := getMyIP()
+       assert.NotEqual(t, string(""), myIPAddress)
+       assert.Nil(t, err)
 }
 
-func TestStartVesagent(t *testing.T) {
-       assert.Equal(t, 0, vesagent.Pid)
-       ch := make(chan error)
-       startVesagent(ch)
-       assert.NotEqual(t, 0, vesagent.Pid)
-       t.Logf("VES agent pid = %d", vesagent.Pid)
-       vesagent.Pid = 0
-       err := <-ch
+func TestConfCreate(t *testing.T) {
+       tmpfile := filepath.Join(os.TempDir(), "vestest."+strconv.Itoa(os.Getpid()))
+       defer os.Remove(tmpfile) // clean up
+       createConf(tmpfile, []byte("{}"))
+       _, err := os.Stat(tmpfile)
        assert.Nil(t, err)
 }
 
-func TestStartVesagentFails(t *testing.T) {
-       vesagent.name = "Not-ves-agent"
-       assert.Equal(t, 0, vesagent.Pid)
-       ch := make(chan error)
-       startVesagent(ch)
-       err := <-ch
-       assert.NotNil(t, err)
-       assert.Equal(t, 0, vesagent.Pid)
-       vesagent.name = "ves-agent"
+type VesmgrTestSuite struct {
+       suite.Suite
+       vesmgr VesMgr
 }
 
-func TestGetMyIP(t *testing.T) {
-       vesmgr.myIPAddress = string("")
-       var err error
-       vesmgr.myIPAddress, err = getMyIP()
-       assert.NotEqual(t, string(""), vesmgr.myIPAddress)
-       assert.Equal(t, nil, err)
+func (suite *VesmgrTestSuite) SetupSuite() {
+       suite.vesmgr = VesMgr{}
+       suite.vesmgr.Init("0")
+       logger.MdcAdd("Testvesmgr", "0.0.1")
+       os.Setenv("VESMGR_HB_INTERVAL", "30s")
+       os.Setenv("VESMGR_MEAS_INTERVAL", "30s")
+       os.Setenv("VESMGR_PRICOLLECTOR_ADDR", "127.1.1.1")
+       os.Setenv("VESMGR_PRICOLLECTOR_PORT", "8443")
+       os.Setenv("VESMGR_PROMETHEUS_ADDR", "http://localhost:9090")
 }
 
-func TestMainLoopSupervision(t *testing.T) {
-       chXAppNotifications := make(chan []byte)
-       chSupervision := make(chan chan string)
-       chVesagent := make(chan error)
-       chSubscriptions := make(chan subsChannel)
-       go runVesmgr(chVesagent, chSupervision, chXAppNotifications, chSubscriptions)
-
+func (suite *VesmgrTestSuite) TestMainLoopSupervision() {
+       go suite.vesmgr.servRequest()
        ch := make(chan string)
-       chSupervision <- ch
+       suite.vesmgr.chSupervision <- ch
        reply := <-ch
-       assert.Equal(t, "OK", reply)
+       suite.Equal("OK", reply)
 }
 
-func TestMainLoopVesagentError(t *testing.T) {
+func (suite *VesmgrTestSuite) TestMainLoopVesagentError() {
        if os.Getenv("TEST_VESPA_EXIT") == "1" {
                // we're run in a new process, now make vesmgr main loop exit
-               chXAppNotifications := make(chan []byte)
-               chSupervision := make(chan chan string)
-               chVesagent := make(chan error)
-               chSubscriptions := make(chan subsChannel)
-               go runVesmgr(chVesagent, chSupervision, chXAppNotifications, chSubscriptions)
-
-               chVesagent <- errors.New("vesagent killed")
+               go suite.vesmgr.servRequest()
+               suite.vesmgr.chVesagent <- errors.New("vesagent killed")
                // we should never actually end up to this sleep, since the runVesmgr should exit
                time.Sleep(3 * time.Second)
                return
        }
 
        // Run the vesmgr exit test as a separate process
-       cmd := exec.Command(os.Args[0], "-test.run=TestMainLoopVesagentError")
+       cmd := exec.Command(os.Args[0], "-test.run", "TestVesMgrSuite", "-testify.m", "TestMainLoopVesagentError")
        cmd.Env = append(os.Environ(), "TEST_VESPA_EXIT=1")
        cmd.Stdout = os.Stdout
        cmd.Stderr = os.Stderr
        err := cmd.Run()
-
        // check that vesmgr existed with status 1
+
        e, ok := err.(*exec.ExitError)
-       assert.Equal(t, true, ok)
-       assert.Equal(t, "exit status 1", e.Error())
+       suite.True(ok)
+       suite.Equal("exit status 1", e.Error())
+}
+
+func (suite *VesmgrTestSuite) TestWaitSubscriptionLoopRespondsSupervisionAndBreaksWhenReceivedSubsNotif() {
+       go func() {
+               time.Sleep(time.Second)
+               ch := make(chan string)
+               suite.vesmgr.chSupervision <- ch
+               suite.Equal("OK", <-ch)
+               suite.vesmgr.chSupervision <- ch
+               suite.Equal("OK", <-ch)
+               suite.vesmgr.chXAppSubscriptions <- subscriptionNotification{true, nil, ""}
+       }()
+
+       suite.vesmgr.waitSubscriptionLoop()
+}
+
+func (suite *VesmgrTestSuite) TestEmptyNotificationChannelReadsAllMsgsFromCh() {
+       go func() {
+               for i := 0; i < 11; i++ {
+                       suite.vesmgr.chXAppNotifications <- []byte("hello")
+               }
+       }()
+       time.Sleep(500 * time.Millisecond)
+       <-suite.vesmgr.chXAppNotifications
+       suite.vesmgr.emptyNotificationsChannel()
+       select {
+       case <-suite.vesmgr.chXAppNotifications:
+               suite.Fail("Got unexpected notification")
+       default:
+               // ok
+       }
+}
+
+func (suite *VesmgrTestSuite) TestVespaKilling() {
+       suite.vesmgr.vesagent = makeRunner("sleep", "20")
+       suite.vesmgr.startVesagent()
+       suite.NotNil(suite.vesmgr.killVespa())
+}
+
+func (suite *VesmgrTestSuite) TestVespaKillingAlreadyKilled() {
+       suite.vesmgr.vesagent = makeRunner("sleep", "20")
+       suite.vesmgr.startVesagent()
+       suite.NotNil(suite.vesmgr.killVespa())
+       // Just check that second kill does not block execution
+       suite.NotNil(suite.vesmgr.killVespa())
+}
+
+func TestVesMgrSuite(t *testing.T) {
+       suite.Run(t, new(VesmgrTestSuite))
 }
index 553a6e5..fa6e83d 100644 (file)
@@ -1,4 +1,4 @@
 # The Jenkins job uses this string for the tag in the image name
 # for example nexus3.o-ran-sc.org:10004/my-image-name:0.0.1
 ---
-tag: 0.0.4
+tag: 0.0.5