From: Roni Riska Date: Thu, 26 Sep 2019 05:20:44 +0000 (+0300) Subject: Refactor the code X-Git-Tag: 0.0.8~6 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=fc77ebb24a8627ccfb18edd8b5dbc038da475eab;p=ric-plt%2Fvespamgr.git Refactor the code Multiple code refactorings to make the code simpler and increase the unit testing coverage. * Golint findings are fixed. * Xapp query function name changed from status to config. The functionality is slightly changed so that - Vespa is started only after the xapp notification subscription is successful and the vesmgr has received the current xapp configuration from the xapp manager. - The vesmgr goes to the main loop after that. Change-Id: Ie2675c0543d4e4ce0a60b92a6c06a79b9e2cb2cd Signed-off-by: Roni Riska --- diff --git a/cmd/vesmgr/config.go b/cmd/vesmgr/config.go index e2c9f25..8301b0c 100644 --- a/cmd/vesmgr/config.go +++ b/cmd/vesmgr/config.go @@ -19,11 +19,12 @@ package main import ( "encoding/json" - "gopkg.in/yaml.v2" "io" "os" "strconv" "time" + + "gopkg.in/yaml.v2" ) func basicVespaConf() VESAgentConfiguration { @@ -65,12 +66,13 @@ func basicVespaConf() VESAgentConfiguration { return vespaconf } +// AppMetricsStruct contains xapplication metrics definition type AppMetricsStruct struct { ObjectName string ObjectInstance string - // xxx add labels here } +// AppMetrics contains metrics definitions for all Xapps type AppMetrics map[string]AppMetricsStruct // Parses the metrics data from an array of bytes, which is expected to contain a JSON @@ -89,10 +91,10 @@ func parseMetricsFromXAppDescriptor(descriptor []byte, appMetrics AppMetrics) Ap json.Unmarshal(descriptor, &desc) for _, app := range desc { - config, config_ok := app["config"] - if config_ok { - metrics, metrics_ok := config.(map[string]interface{})["metrics"] - if metrics_ok { + config, configOk := app["config"] + if configOk { + metrics, metricsOk := config.(map[string]interface{})["metrics"] + if metricsOk { parseMetricsRules(metrics.([]interface{}), appMetrics) } } @@ -106,16 +108,16 @@ func parseMetricsFromXAppDescriptor(descriptor []byte, appMetrics AppMetrics) Ap // Entries, which do not have all the necessary fields, are ignored. func parseMetricsRules(metricsMap []interface{}, appMetrics AppMetrics) AppMetrics { for _, element := range metricsMap { - name, name_ok := element.(map[string]interface{})["name"].(string) - if name_ok { - _, already_found := appMetrics[name] - objectName, objectName_ok := element.(map[string]interface{})["objectName"].(string) - objectInstance, objectInstance_ok := element.(map[string]interface{})["objectInstance"].(string) - if !already_found && objectName_ok && objectInstance_ok { + name, nameOk := element.(map[string]interface{})["name"].(string) + if nameOk { + _, alreadyFound := appMetrics[name] + objectName, objectNameOk := element.(map[string]interface{})["objectName"].(string) + objectInstance, objectInstanceOk := element.(map[string]interface{})["objectInstance"].(string) + if !alreadyFound && objectNameOk && objectInstanceOk { appMetrics[name] = AppMetricsStruct{objectName, objectInstance} logger.Info("parsed counter %s %s %s", name, objectName, objectInstance) } - if already_found { + if alreadyFound { logger.Info("skipped duplicate counter %s", name) } } @@ -127,12 +129,12 @@ func getRules(vespaconf *VESAgentConfiguration, xAppConfig []byte) { appMetrics := make(AppMetrics) parseMetricsFromXAppDescriptor(xAppConfig, appMetrics) - makeRule := func(expr string, obj_name string, obj_instance string) MetricRule { + makeRule := func(expr string, objName string, objInstance string) MetricRule { return MetricRule{ Target: "AdditionalObjects", Expr: expr, - ObjectInstance: obj_instance, - ObjectName: obj_name, + ObjectInstance: objInstance, + ObjectName: objName, ObjectKeys: []Label{ Label{ Name: "ricComponentName", @@ -161,15 +163,15 @@ func getCollectorConfiguration(vespaconf *VESAgentConfiguration) { vespaconf.PrimaryCollector.FQDN = os.Getenv("VESMGR_PRICOLLECTOR_ADDR") vespaconf.PrimaryCollector.ServerRoot = os.Getenv("VESMGR_PRICOLLECTOR_SERVERROOT") vespaconf.PrimaryCollector.Topic = os.Getenv("VESMGR_PRICOLLECTOR_TOPIC") - port_str := os.Getenv("VESMGR_PRICOLLECTOR_PORT") - if port_str == "" { + portStr := os.Getenv("VESMGR_PRICOLLECTOR_PORT") + if portStr == "" { vespaconf.PrimaryCollector.Port = 8443 } else { - port, _ := strconv.Atoi(port_str) + port, _ := strconv.Atoi(portStr) vespaconf.PrimaryCollector.Port = port } - secure_str := os.Getenv("VESMGR_PRICOLLECTOR_SECURE") - if secure_str == "true" { + secureStr := os.Getenv("VESMGR_PRICOLLECTOR_SECURE") + if secureStr == "true" { vespaconf.PrimaryCollector.Secure = true } else { vespaconf.PrimaryCollector.Secure = false diff --git a/cmd/vesmgr/config_test.go b/cmd/vesmgr/config_test.go index 08bde19..76793a1 100644 --- a/cmd/vesmgr/config_test.go +++ b/cmd/vesmgr/config_test.go @@ -19,12 +19,13 @@ package main import ( "bytes" "encoding/json" - "github.com/stretchr/testify/assert" - "gopkg.in/yaml.v2" "io/ioutil" "os" "testing" "time" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" ) func testBaseConf(t *testing.T, vesconf VESAgentConfiguration) { @@ -130,7 +131,7 @@ func metricsStringToInterfaceArray(metrics string) []interface{} { } func TestParseMetricsRules(t *testing.T) { - metricsJson := `{"metrics": [ + metricsJSON := `{"metrics": [ { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }, { "name": "ricxapp_RMR_ReceiveError", "objectName": "ricxappRMRReceiveErrorCounter", "objectInstance": "ricxappRMRReceiveError" }, { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }, @@ -138,7 +139,7 @@ func TestParseMetricsRules(t *testing.T) { { "name": "ricxapp_SDL_Stored", "objectName": "ricxappSDLStoredCounter", "objectInstance": "ricxappSDLStored" }, { "name": "ricxapp_SDL_StoreError", "objectName": "ricxappSDLStoreErrorCounter", "objectInstance": "ricxappSDLStoreError" } ]}` appMetrics := make(AppMetrics) - var m []interface{} = metricsStringToInterfaceArray(metricsJson) + m := metricsStringToInterfaceArray(metricsJSON) appMetrics = parseMetricsRules(m, appMetrics) assert.Len(t, appMetrics, 6) assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName) @@ -148,17 +149,17 @@ func TestParseMetricsRules(t *testing.T) { func TestParseMetricsRulesNoMetrics(t *testing.T) { appMetrics := make(AppMetrics) - metricsJson := `{"metrics": []` - var m []interface{} = metricsStringToInterfaceArray(metricsJson) + metricsJSON := `{"metrics": []` + m := metricsStringToInterfaceArray(metricsJSON) appMetrics = parseMetricsRules(m, appMetrics) assert.Empty(t, appMetrics) } func TestParseMetricsRulesAdditionalFields(t *testing.T) { appMetrics := make(AppMetrics) - metricsJson := `{"metrics": [ + metricsJSON := `{"metrics": [ { "additionalField": "valueIgnored", "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }]}` - var m []interface{} = metricsStringToInterfaceArray(metricsJson) + m := metricsStringToInterfaceArray(metricsJSON) appMetrics = parseMetricsRules(m, appMetrics) assert.Len(t, appMetrics, 1) assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName) @@ -167,11 +168,11 @@ func TestParseMetricsRulesAdditionalFields(t *testing.T) { func TestParseMetricsRulesMissingFields(t *testing.T) { appMetrics := make(AppMetrics) - metricsJson := `{"metrics": [ + metricsJSON := `{"metrics": [ { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }, { "name": "ricxapp_RMR_ReceiveError", "objectInstance": "ricxappRMRReceiveError" }, { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}` - var m []interface{} = metricsStringToInterfaceArray(metricsJson) + m := metricsStringToInterfaceArray(metricsJSON) appMetrics = parseMetricsRules(m, appMetrics) assert.Len(t, appMetrics, 2) assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName) @@ -182,11 +183,11 @@ func TestParseMetricsRulesMissingFields(t *testing.T) { func TestParseMetricsRulesDuplicateDefinitionIsIgnored(t *testing.T) { appMetrics := make(AppMetrics) - metricsJson := `{"metrics": [ + metricsJSON := `{"metrics": [ { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }, { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounterXXX", "objectInstance": "ricxappRMRReceivedXXX" }, { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}` - var m []interface{} = metricsStringToInterfaceArray(metricsJson) + m := metricsStringToInterfaceArray(metricsJSON) appMetrics = parseMetricsRules(m, appMetrics) assert.Len(t, appMetrics, 2) assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName) @@ -195,12 +196,12 @@ func TestParseMetricsRulesDuplicateDefinitionIsIgnored(t *testing.T) { func TestParseMetricsRulesIncrementalFillOfAppMetrics(t *testing.T) { appMetrics := make(AppMetrics) - metricsJson1 := `{"metrics": [ + metricsJSON1 := `{"metrics": [ { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }]}` - metricsJson2 := `{"metrics": [ + metricsJSON2 := `{"metrics": [ { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}` - var m1 []interface{} = metricsStringToInterfaceArray(metricsJson1) - var m2 []interface{} = metricsStringToInterfaceArray(metricsJson2) + m1 := metricsStringToInterfaceArray(metricsJSON1) + m2 := metricsStringToInterfaceArray(metricsJSON2) appMetrics = parseMetricsRules(m1, appMetrics) appMetrics = parseMetricsRules(m2, appMetrics) assert.Len(t, appMetrics, 2) @@ -226,18 +227,18 @@ func TestParseXAppDescriptor(t *testing.T) { } func TestParseXAppDescriptorWithNoConfig(t *testing.T) { - metricsJson := `[{{"metadata": "something", "descriptor": "somethingelse"}}, + metricsJSON := `[{{"metadata": "something", "descriptor": "somethingelse"}}, {{"metadata": "something", "descriptor": "somethingelse"}}]` - metricsBytes := []byte(metricsJson) + metricsBytes := []byte(metricsJSON) appMetrics := make(AppMetrics) appMetrics = parseMetricsFromXAppDescriptor(metricsBytes, appMetrics) assert.Empty(t, appMetrics) } func TestParseXAppDescriptorWithNoMetrics(t *testing.T) { - metricsJson := `[{{"metadata": "something", "descriptor": "somethingelse", "config":{}}, + metricsJSON := `[{{"metadata": "something", "descriptor": "somethingelse", "config":{}}, {{"metadata": "something", "descriptor": "somethingelse", "config":{}}}]` - metricsBytes := []byte(metricsJson) + metricsBytes := []byte(metricsJSON) appMetrics := make(AppMetrics) appMetrics = parseMetricsFromXAppDescriptor(metricsBytes, appMetrics) assert.Empty(t, appMetrics) diff --git a/cmd/vesmgr/httpserver.go b/cmd/vesmgr/httpserver.go index 585319e..2564ef3 100644 --- a/cmd/vesmgr/httpserver.go +++ b/cmd/vesmgr/httpserver.go @@ -24,28 +24,47 @@ import ( "net/http" ) -const SupervisionUrl = "/supervision/" +// SupervisionURL is the url where kubernetes posts alive queries +const SupervisionURL = "/supervision/" -func startHttpServer(listener net.Listener, xappnotifUrl string, notif_ch chan []byte, supervision_ch chan chan string) { - go runHttpServer(listener, xappnotifUrl, notif_ch, supervision_ch) +// HTTPServer is the VesMgr HTTP server struct +type HTTPServer struct { + listener net.Listener } -func runHttpServer(listener net.Listener, xappNotifUrl string, notif_ch chan []byte, supervision_ch chan chan string) { +func (s *HTTPServer) init(address string) *HTTPServer { + var err error + s.listener, err = net.Listen("tcp", address) + if err != nil { + panic("Cannot listen:" + err.Error()) + } + return s +} + +func (s *HTTPServer) start(notifPath string, notifCh chan []byte, supCh chan chan string) { + go runHTTPServer(s.listener, notifPath, notifCh, supCh) +} + +func (s *HTTPServer) addr() net.Addr { + return s.listener.Addr() +} + +func runHTTPServer(listener net.Listener, xappNotifURL string, notifCh chan []byte, supervisionCh chan chan string) { logger.Info("vesmgr http server serving at %s", listener.Addr()) - http.HandleFunc(xappNotifUrl, func(w http.ResponseWriter, r *http.Request) { + http.HandleFunc(xappNotifURL, func(w http.ResponseWriter, r *http.Request) { switch r.Method { case "POST": - logger.Info("httpServer: POST in %s", xappNotifUrl) + logger.Info("httpServer: POST in %s", xappNotifURL) body, err := ioutil.ReadAll(r.Body) defer r.Body.Close() if err != nil { logger.Error("httpServer: Invalid body in POST request") return } - notif_ch <- body + notifCh <- body return default: logger.Error("httpServer: Invalid method %s to %s", r.Method, r.URL.Path) @@ -54,15 +73,15 @@ func runHttpServer(listener net.Listener, xappNotifUrl string, notif_ch chan []b } }) - http.HandleFunc(SupervisionUrl, func(w http.ResponseWriter, r *http.Request) { + http.HandleFunc(SupervisionURL, func(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": logger.Info("httpServer: GET supervision") - supervision_ack_ch := make(chan string) + supervisionAckCh := make(chan string) // send supervision to the main loop - supervision_ch <- supervision_ack_ch - reply := <-supervision_ack_ch + supervisionCh <- supervisionAckCh + reply := <-supervisionAckCh logger.Info("httpServer: supervision ack from the main loop: %s", reply) fmt.Fprintf(w, reply) return diff --git a/cmd/vesmgr/httpserver_test.go b/cmd/vesmgr/httpserver_test.go index d52f4b2..c2946aa 100644 --- a/cmd/vesmgr/httpserver_test.go +++ b/cmd/vesmgr/httpserver_test.go @@ -17,42 +17,41 @@ package main import ( - "github.com/stretchr/testify/suite" "io/ioutil" - "net" "net/http" "os" "strings" "testing" + + "github.com/stretchr/testify/suite" ) -type HttpServerTestSuite struct { +type HTTPServerTestSuite struct { suite.Suite - listener net.Listener - ch_notif chan []byte - ch_supervision chan chan string + chNotif chan []byte + chSupervision chan chan string + server HTTPServer } // suite setup creates the HTTP server -func (suite *HttpServerTestSuite) SetupSuite() { +func (suite *HTTPServerTestSuite) SetupSuite() { os.Unsetenv("http_proxy") os.Unsetenv("HTTP_PROXY") - var err error - suite.listener, err = net.Listen("tcp", ":0") - suite.Nil(err) - suite.ch_notif = make(chan []byte) - suite.ch_supervision = make(chan chan string) - startHttpServer(suite.listener, "/vesmgr_notif/", suite.ch_notif, suite.ch_supervision) + suite.chNotif = make(chan []byte) + suite.chSupervision = make(chan chan string) + suite.server = HTTPServer{} + suite.server.init(":0") + suite.server.start("/vesmgr_notif/", suite.chNotif, suite.chSupervision) } -func (suite *HttpServerTestSuite) TestHtppServerSupervisionInvalidOperation() { - resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+SupervisionUrl, "supervision") +func (suite *HTTPServerTestSuite) TestHtppServerSupervisionInvalidOperation() { + resp, reply := suite.doPost("http://"+suite.server.addr().String()+SupervisionURL, "supervision") suite.Equal("405 method not allowed\n", reply) suite.Equal(405, resp.StatusCode) suite.Equal("405 Method Not Allowed", resp.Status) } -func (suite *HttpServerTestSuite) doGet(url string) (*http.Response, string) { +func (suite *HTTPServerTestSuite) doGet(url string) (*http.Response, string) { resp, err := http.Get(url) suite.Nil(err) @@ -62,8 +61,8 @@ func (suite *HttpServerTestSuite) doGet(url string) (*http.Response, string) { return resp, string(contents) } -func (suite *HttpServerTestSuite) doPost(serverUrl string, msg string) (*http.Response, string) { - resp, err := http.Post(serverUrl, "data", strings.NewReader(msg)) +func (suite *HTTPServerTestSuite) doPost(serverURL string, msg string) (*http.Response, string) { + resp, err := http.Post(serverURL, "data", strings.NewReader(msg)) suite.Nil(err) defer resp.Body.Close() @@ -72,41 +71,41 @@ func (suite *HttpServerTestSuite) doPost(serverUrl string, msg string) (*http.Re return resp, string(contents) } -func replySupervision(ch_supervision chan chan string, reply string) { - ch_supervision_ack := <-ch_supervision - ch_supervision_ack <- reply +func replySupervision(chSupervision chan chan string, reply string) { + chSupervisionAck := <-chSupervision + chSupervisionAck <- reply } -func (suite *HttpServerTestSuite) TestHttpServerSupervision() { +func (suite *HTTPServerTestSuite) TestHttpServerSupervision() { // start the "main loop" to reply to the supervision to the HTTPServer - go replySupervision(suite.ch_supervision, "I'm just fine") + go replySupervision(suite.chSupervision, "I'm just fine") - resp, reply := suite.doGet("http://" + suite.listener.Addr().String() + SupervisionUrl) + resp, reply := suite.doGet("http://" + suite.server.addr().String() + SupervisionURL) suite.Equal("I'm just fine", reply) suite.Equal(200, resp.StatusCode) suite.Equal("200 OK", resp.Status) } -func (suite *HttpServerTestSuite) TestHttpServerInvalidUrl() { - resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+"/invalid_url", "foo") +func (suite *HTTPServerTestSuite) TestHttpServerInvalidUrl() { + resp, reply := suite.doPost("http://"+suite.server.addr().String()+"/invalid_url", "foo") suite.Equal("404 page not found\n", reply) suite.Equal(404, resp.StatusCode) suite.Equal("404 Not Found", resp.Status) } -func readXAppNotification(ch_notif chan []byte, ch chan []byte) { - notification := <-ch_notif +func readXAppNotification(chNotif chan []byte, ch chan []byte) { + notification := <-chNotif ch <- notification } -func (suite *HttpServerTestSuite) TestHttpServerXappNotif() { +func (suite *HTTPServerTestSuite) TestHttpServerXappNotif() { // start the "main loop" to receive the xAppNotification message from the HTTPServer ch := make(chan []byte) - go readXAppNotification(suite.ch_notif, ch) + go readXAppNotification(suite.chNotif, ch) - resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+"/vesmgr_notif/", "test data") + resp, reply := suite.doPost("http://"+suite.server.addr().String()+"/vesmgr_notif/", "test data") suite.Equal("", reply) suite.Equal(200, resp.StatusCode) suite.Equal("200 OK", resp.Status) @@ -114,13 +113,13 @@ func (suite *HttpServerTestSuite) TestHttpServerXappNotif() { suite.Equal([]byte("test data"), notification) } -func (suite *HttpServerTestSuite) TestHttpServerXappNotifInvalidOperation() { - resp, reply := suite.doGet("http://" + suite.listener.Addr().String() + "/vesmgr_notif/") +func (suite *HTTPServerTestSuite) TestHttpServerXappNotifInvalidOperation() { + resp, reply := suite.doGet("http://" + suite.server.addr().String() + "/vesmgr_notif/") suite.Equal("405 method not allowed\n", reply) suite.Equal(405, resp.StatusCode) suite.Equal("405 Method Not Allowed", resp.Status) } func TestHttpServerSuite(t *testing.T) { - suite.Run(t, new(HttpServerTestSuite)) + suite.Run(t, new(HTTPServerTestSuite)) } diff --git a/cmd/vesmgr/main.go b/cmd/vesmgr/main.go index 68e5c0c..cd3d14e 100644 --- a/cmd/vesmgr/main.go +++ b/cmd/vesmgr/main.go @@ -18,5 +18,6 @@ package main func main() { - vesmgrInit() + vesmgr := VesMgr{} + vesmgr.Init(vesmgrXappNotifPort).Run() } diff --git a/cmd/vesmgr/subprocess.go b/cmd/vesmgr/subprocess.go new file mode 100644 index 0000000..0071c20 --- /dev/null +++ b/cmd/vesmgr/subprocess.go @@ -0,0 +1,35 @@ +package main + +import ( + "os" + "os/exec" +) + +type cmdRunner struct { + exe string + args []string + cmd *exec.Cmd +} + +func (r *cmdRunner) run(result chan error) { + r.cmd = exec.Command(r.exe, r.args...) + r.cmd.Stdout = os.Stdout + r.cmd.Stderr = os.Stderr + err := r.cmd.Start() + go func() { + if err != nil { + result <- err + } else { + result <- r.cmd.Wait() + } + }() +} + +func (r *cmdRunner) kill() error { + return r.cmd.Process.Kill() +} + +func makeRunner(exe string, arg ...string) cmdRunner { + r := cmdRunner{exe: exe, args: arg} + return r +} diff --git a/cmd/vesmgr/subprocess_test.go b/cmd/vesmgr/subprocess_test.go new file mode 100644 index 0000000..bf460d9 --- /dev/null +++ b/cmd/vesmgr/subprocess_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProcessRunning(t *testing.T) { + r := makeRunner("echo", "a") + ch := make(chan error) + r.run(ch) + err := <-ch + assert.Nil(t, err) +} + +func TestProcessKill(t *testing.T) { + r := makeRunner("sleep", "20") + ch := make(chan error) + r.run(ch) + assert.Nil(t, r.kill()) + <-ch // wait and seee that kills is actually done +} + +func TestProcessRunningFails(t *testing.T) { + r := makeRunner("foobarbaz") + ch := make(chan error) + r.run(ch) + err := <-ch + assert.NotNil(t, err) +} diff --git a/cmd/vesmgr/subscribexAPPNotifications.go b/cmd/vesmgr/subscribexAPPNotifications.go index 6f8ed77..1d3cc22 100644 --- a/cmd/vesmgr/subscribexAPPNotifications.go +++ b/cmd/vesmgr/subscribexAPPNotifications.go @@ -30,60 +30,58 @@ import ( // appmgr API const appmgrSubsPath = "/ric/v1/subscriptions" -var errPostingFailed error = errors.New("Posting subscriptions failed") -var errWrongStatusCode error = errors.New("Wrong subscriptions response StatusCode") +var errPostingFailed = errors.New("Posting subscriptions failed") +var errWrongStatusCode = errors.New("Wrong subscriptions response StatusCode") -func subscribexAppNotifications(targetUrl string, subscriptions chan subsChannel, timeout time.Duration, subsUrl string) { - requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, targetUrl)) - req, err := http.NewRequest("POST", subsUrl, bytes.NewBuffer(requestBody)) +func subscribexAppNotifications(targetURL string, subscriptions chan subscriptionNotification, timeout time.Duration, subsURL string) { + requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, targetURL)) + req, err := http.NewRequest("POST", subsURL, bytes.NewBuffer(requestBody)) if err != nil { logger.Error("Setting NewRequest failed: %s", err) - subscriptions <- subsChannel{false, err} + subscriptions <- subscriptionNotification{false, err, ""} return } req.Header.Set("Content-Type", "application/json") client := &http.Client{} client.Timeout = time.Second * timeout + var subsID string for { - err := subscribexAppNotificationsClientDo(req, client) + subsID, err = subscribexAppNotificationsClientDo(req, client) if err == nil { break } else if err != errPostingFailed && err != errWrongStatusCode { - subscriptions <- subsChannel{false, err} + subscriptions <- subscriptionNotification{false, err, ""} return } time.Sleep(5 * time.Second) } - subscriptions <- subsChannel{true, nil} + subscriptions <- subscriptionNotification{true, nil, subsID} } -func subscribexAppNotificationsClientDo(req *http.Request, client *http.Client) error { +func subscribexAppNotificationsClientDo(req *http.Request, client *http.Client) (string, error) { resp, err := client.Do(req) if err != nil { logger.Error("Posting subscriptions failed: %s", err) - return errPostingFailed - } else { - defer resp.Body.Close() - if resp.StatusCode == http.StatusCreated { - logger.Info("Subscriptions response StatusCode: %d", resp.StatusCode) - logger.Info("Subscriptions response headers: %s", resp.Header) - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - logger.Error("Subscriptions response Body read failed: %s", err) - return err - } - logger.Info("Response Body: %s", body) - var result map[string]interface{} - if err := json.Unmarshal([]byte(body), &result); err != nil { - logger.Error("json.Unmarshal failed: %s", err) - return err - } - logger.Info("Subscription id from the response: %s", result["id"].(string)) - vesmgr.appmgrSubsId = result["id"].(string) - return nil - } else { - logger.Error("Wrong subscriptions response StatusCode: %d", resp.StatusCode) - return errWrongStatusCode + return "", errPostingFailed + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusCreated { + logger.Info("Subscriptions response StatusCode: %d", resp.StatusCode) + logger.Info("Subscriptions response headers: %s", resp.Header) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + logger.Error("Subscriptions response Body read failed: %s", err) + return "", err + } + logger.Info("Response Body: %s", body) + var result map[string]interface{} + if err := json.Unmarshal([]byte(body), &result); err != nil { + logger.Error("json.Unmarshal failed: %s", err) + return "", err } + logger.Info("Subscription id from the response: %s", result["id"].(string)) + return result["id"].(string), nil } + logger.Error("Wrong subscriptions response StatusCode: %d", resp.StatusCode) + return "", errWrongStatusCode } diff --git a/cmd/vesmgr/subscribexAPPNotifications_test.go b/cmd/vesmgr/subscribexAPPNotifications_test.go index 31621b3..6305f40 100644 --- a/cmd/vesmgr/subscribexAPPNotifications_test.go +++ b/cmd/vesmgr/subscribexAPPNotifications_test.go @@ -21,38 +21,33 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/stretchr/testify/suite" "io/ioutil" "net/http" "net/http/httptest" "testing" + + "github.com/stretchr/testify/suite" ) -type AppmgrHttpServerTestSuite struct { +type AppmgrHTTPServerTestSuite struct { suite.Suite - subscriptions chan subsChannel - xappNotifUrl string + subscriptions chan subscriptionNotification + xappNotifURL string } // suite setup -func (suite *AppmgrHttpServerTestSuite) SetupSuite() { - vesmgr.appmgrSubsId = string("") - vesmgr.myIPAddress, _ = getMyIP() - suite.xappNotifUrl = "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath - suite.subscriptions = make(chan subsChannel) +func (suite *AppmgrHTTPServerTestSuite) SetupSuite() { + // the url here is not actually used anywhere + suite.xappNotifURL = "http://127.0.0.1:8080" + vesmgrXappNotifPath + suite.subscriptions = make(chan subscriptionNotification) } // test setup -func (suite *AppmgrHttpServerTestSuite) SetupTest() { - suite.subscriptions = make(chan subsChannel) -} - -// test teardown -func (suite *AppmgrHttpServerTestSuite) TearDownTest() { - vesmgr.appmgrSubsId = string("") +func (suite *AppmgrHTTPServerTestSuite) SetupTest() { + suite.subscriptions = make(chan subscriptionNotification) } -func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotifications() { +func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotifications() { testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { body, _ := ioutil.ReadAll(req.Body) var result map[string]interface{} @@ -68,13 +63,13 @@ func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotifications() { })) defer testServer.Close() - go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL) + go subscribexAppNotifications(suite.xappNotifURL, suite.subscriptions, 1, testServer.URL) isSubscribed := <-suite.subscriptions suite.Nil(isSubscribed.err) - suite.Equal("deadbeef1234567890", vesmgr.appmgrSubsId) + suite.Equal("deadbeef1234567890", isSubscribed.subsID) } -func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsWrongStatus() { +func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotificationsWrongStatus() { testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { res.Header().Add("Content-Type", "application/json") res.WriteHeader(http.StatusUnauthorized) @@ -82,32 +77,32 @@ func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsWrongStatu })) defer testServer.Close() - requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifUrl)) + requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifURL)) req, _ := http.NewRequest("POST", testServer.URL, bytes.NewBuffer(requestBody)) req.Header.Set("Content-Type", "application/json") client := &http.Client{} - err := subscribexAppNotificationsClientDo(req, client) + subsID, err := subscribexAppNotificationsClientDo(req, client) suite.Equal(errWrongStatusCode, err) // after failed POST vesmgr.appmgrSubsId holds an initial values - suite.Equal("", vesmgr.appmgrSubsId) + suite.Equal("", subsID) } -func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsWrongUrl() { +func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotificationsWrongUrl() { // use fake appmgrUrl that is not served in unit test - appmgrUrl := "/I_do_not_exist/" - requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifUrl)) - req, _ := http.NewRequest("POST", appmgrUrl, bytes.NewBuffer(requestBody)) + appmgrURL := "/I_do_not_exist/" + requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifURL)) + req, _ := http.NewRequest("POST", appmgrURL, bytes.NewBuffer(requestBody)) req.Header.Set("Content-Type", "application/json") client := &http.Client{} - err := subscribexAppNotificationsClientDo(req, client) + subsID, err := subscribexAppNotificationsClientDo(req, client) suite.Equal(errPostingFailed, err) // after failed POST vesmgr.appmgrSubsId holds an initial values - suite.Equal("", vesmgr.appmgrSubsId) + suite.Equal("", subsID) } -func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsReadBodyFails() { +func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotificationsReadBodyFails() { testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { res.Header().Set("Content-Length", "1") res.Header().Add("Content-Type", "application/json") @@ -115,13 +110,13 @@ func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsReadBodyFa })) defer testServer.Close() - go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL) + go subscribexAppNotifications(suite.xappNotifURL, suite.subscriptions, 1, testServer.URL) isSubscribed := <-suite.subscriptions suite.Equal("unexpected EOF", isSubscribed.err.Error()) - suite.Equal("", vesmgr.appmgrSubsId) + suite.Equal("", isSubscribed.subsID) } -func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsUnMarshalFails() { +func (suite *AppmgrHTTPServerTestSuite) TestSubscribexAppNotificationsUnMarshalFails() { testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { res.Header().Add("Content-Type", "application/json") res.WriteHeader(http.StatusCreated) @@ -129,12 +124,12 @@ func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsUnMarshalF })) defer testServer.Close() - go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL) + go subscribexAppNotifications(suite.xappNotifURL, suite.subscriptions, 1, testServer.URL) isSubscribed := <-suite.subscriptions suite.Equal("invalid character 'd' after object key", isSubscribed.err.Error()) - suite.Equal("", vesmgr.appmgrSubsId) + suite.Equal("", isSubscribed.subsID) } func TestAppmgrHttpServerTestSuite(t *testing.T) { - suite.Run(t, new(AppmgrHttpServerTestSuite)) + suite.Run(t, new(AppmgrHTTPServerTestSuite)) } diff --git a/cmd/vesmgr/vesmgr.go b/cmd/vesmgr/vesmgr.go index 0e27838..aeebb15 100755 --- a/cmd/vesmgr/vesmgr.go +++ b/cmd/vesmgr/vesmgr.go @@ -23,7 +23,6 @@ import ( "net" "net/http" "os" - "os/exec" "time" mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git" @@ -34,29 +33,29 @@ var appmgrDomain string const appmgrXAppConfigPath = "/ric/v1/config" const appmgrPort = "8080" -type VesAgent struct { - Pid int - name string - process *os.Process -} - +// VesMgr contains runtime information of the vesmgr process type VesMgr struct { - myIPAddress string - appmgrSubsId string + myIPAddress string + chXAppSubscriptions chan subscriptionNotification + chXAppNotifications chan []byte + chSupervision chan chan string + chVesagent chan error + vesagent cmdRunner + httpServer HTTPServer } -type subsChannel struct { +type subscriptionNotification struct { subscribed bool err error + subsID string } -var vesagent VesAgent -var vesmgr VesMgr var logger *mdcloggo.MdcLogger const vesmgrXappNotifPort = "8080" const vesmgrXappNotifPath = "/vesmgr_xappnotif/" const timeoutPostXAppSubscriptions = 5 +const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml" func init() { logger, _ = mdcloggo.InitLogger("vesmgr") @@ -80,8 +79,8 @@ func getMyIP() (myIP string, retErr error) { return "", nil } -func createConf(xappMetrics []byte) { - f, err := os.Create("/etc/ves-agent/ves-agent.yaml") +func createConf(fname string, xappMetrics []byte) { + f, err := os.Create(fname) if err != nil { logger.Error("Cannot create vespa conf file: %s", err.Error()) os.Exit(1) @@ -92,21 +91,20 @@ func createConf(xappMetrics []byte) { logger.Info("Vespa config created") } -func subscribeXAppNotifications(chSubscriptions chan subsChannel) { - xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath - subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath - go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl) - logger.Info("xApp notifications subscribed from %s", subsUrl) +func (vesmgr *VesMgr) subscribeXAppNotifications() { + xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath + subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath + go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL) + logger.Info("xApp notifications subscribed from %s", subsURL) } -func vesmgrInit() { - vesagent.name = "ves-agent" +// Init initializes the vesmgr +func (vesmgr *VesMgr) Init(listenPort string) *VesMgr { logger.Info("vesmgrInit") - var err error if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" { logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error()) - return + panic("Cannot get my IP address") } var ok bool @@ -117,138 +115,134 @@ func vesmgrInit() { appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local" logger.Info("Using default appmgrdomain %s", appmgrDomain) } - chXAppSubscriptions := make(chan subsChannel) - chXAppNotifications := make(chan []byte) - chSupervision := make(chan chan string) - chVesagent := make(chan error) - - listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort) - startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision) - - subscribeXAppNotifications(chXAppSubscriptions) - - runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions) + vesmgr.chXAppSubscriptions = make(chan subscriptionNotification) + // Create notifications as buffered channel so that + // xappmgr does not block if we are stuck somewhere + vesmgr.chXAppNotifications = make(chan []byte, 10) + vesmgr.chSupervision = make(chan chan string) + vesmgr.chVesagent = make(chan error) + vesmgr.httpServer = HTTPServer{} + vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort) + vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"), + "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", + os.Getenv("VESMGR_PROMETHEUS_ADDR")) + return vesmgr } -func startVesagent(ch chan error) { - cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR")) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Start(); err != nil { - logger.Error("vesmgr exiting, ves-agent start failed: %s", err) - go func() { - ch <- err - }() - } else { - logger.Info("ves-agent started with pid %d", cmd.Process.Pid) - vesagent.Pid = cmd.Process.Pid - vesagent.process = cmd.Process - go func() { - // wait ves-agent exit and then post the error to the channel - err := cmd.Wait() - ch <- err - }() - } +func (vesmgr *VesMgr) startVesagent() { + vesmgr.vesagent.run(vesmgr.chVesagent) } -func killVespa(process *os.Process) { +func (vesmgr *VesMgr) killVespa() error { logger.Info("Killing vespa") - err := process.Kill() + err := vesmgr.vesagent.kill() if err != nil { logger.Error("Cannot kill vespa: %s", err.Error()) + return err } + return <-vesmgr.chVesagent // wait vespa exit } -func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) { - - logger.Info("query xAppStatus started, url %s", appmgrUrl) - req, err := http.NewRequest("GET", appmgrUrl, nil) +func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) { + emptyConfig := []byte("{}") + logger.Info("query xAppConfig started, url %s", appmgrURL) + req, err := http.NewRequest("GET", appmgrURL, nil) if err != nil { logger.Error("Failed to create a HTTP request: %s", err) - return nil, err + return emptyConfig, err } req.Header.Set("Content-Type", "application/json") client := &http.Client{} client.Timeout = time.Second * timeout resp, err := client.Do(req) if err != nil { - logger.Error("Query xApp status failed: %s", err) - return nil, err + logger.Error("Query xApp config failed: %s", err) + return emptyConfig, err } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { body, err := ioutil.ReadAll(resp.Body) if err != nil { - logger.Error("Failed to read xApp status body: %s", err) - return nil, err + logger.Error("Failed to read xApp config body: %s", err) + return emptyConfig, err } - logger.Info("query xAppStatus completed") + logger.Info("query xAppConfig completed") return body, nil - } else { - logger.Error("Error from xApp status query: %s", resp.Status) - return nil, errors.New(resp.Status) } + logger.Error("Error from xApp config query: %s", resp.Status) + return emptyConfig, errors.New(resp.Status) } -type state int - -const ( - normalState state = iota - vespaTerminatingState state = iota -) - func queryConf() ([]byte, error) { - return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, + return queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, 10*time.Second) } -func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) { +func (vesmgr *VesMgr) emptyNotificationsChannel() { + for { + select { + case <-vesmgr.chXAppNotifications: + // we don't care the content + default: + return + } + } +} + +func (vesmgr *VesMgr) servRequest() { + select { + case supervision := <-vesmgr.chSupervision: + logger.Info("vesmgr: supervision") + supervision <- "OK" + case xAppNotif := <-vesmgr.chXAppNotifications: + logger.Info("vesmgr: xApp notification") + logger.Info(string(xAppNotif)) + vesmgr.emptyNotificationsChannel() + /* + * If xapp config query fails then we cannot create + * a new configuration and kill vespa. + * In that case we assume that + * the situation is fixed when the next + * xapp notif comes + */ + xappConfig, err := queryConf() + if err == nil { + vesmgr.killVespa() + createConf(vespaConfigFile, xappConfig) + vesmgr.startVesagent() + } + case err := <-vesmgr.chVesagent: + logger.Error("Vesagent exited: " + err.Error()) + os.Exit(1) + } +} - logger.Info("vesmgr main loop ready") - mystate := normalState - var xappStatus []byte - var err error +func (vesmgr *VesMgr) waitSubscriptionLoop() { for { select { - case supervision := <-chSupervision: + case supervision := <-vesmgr.chSupervision: logger.Info("vesmgr: supervision") supervision <- "OK" - case xAppNotif := <-chXAppNotifications: - logger.Info("vesmgr: xApp notification") - logger.Info(string(xAppNotif)) - /* - * If xapp status query fails then we cannot create - * a new configuration and kill vespa. - * In that case we assume that - * the situation is fixed when the next - * xapp notif comes - */ - xappStatus, err = queryConf() - if err == nil { - killVespa(vesagent.process) - mystate = vespaTerminatingState - } - case err := <-chVesagent: - switch mystate { - case vespaTerminatingState: - logger.Info("Vesagent termination completed") - createConf(xappStatus) - startVesagent(chVesagent) - mystate = normalState - default: - logger.Error("Vesagent exited: " + err.Error()) - os.Exit(1) - } - case isSubscribed := <-chXAppSubscriptions: + case isSubscribed := <-vesmgr.chXAppSubscriptions: if isSubscribed.err != nil { logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err) os.Exit(1) } - xappStatus, err = queryConf() - if err == nil { - createConf(xappStatus) - startVesagent(chVesagent) - } + return } } } + +// Run the vesmgr process main loop +func (vesmgr *VesMgr) Run() { + logger.Info("vesmgr main loop ready") + vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision) + vesmgr.subscribeXAppNotifications() + vesmgr.waitSubscriptionLoop() + xappConfig, _ := queryConf() + createConf(vespaConfigFile, xappConfig) + vesmgr.startVesagent() + for { + vesmgr.servRequest() + } +} diff --git a/cmd/vesmgr/vesmgr_queryxappssttus_test.go b/cmd/vesmgr/vesmgr_queryxappconfig_test.go similarity index 67% rename from cmd/vesmgr/vesmgr_queryxappssttus_test.go rename to cmd/vesmgr/vesmgr_queryxappconfig_test.go index 5f0f36c..e9bcbfe 100644 --- a/cmd/vesmgr/vesmgr_queryxappssttus_test.go +++ b/cmd/vesmgr/vesmgr_queryxappconfig_test.go @@ -19,25 +19,26 @@ package main import ( "fmt" - "github.com/stretchr/testify/suite" "net" "net/http" "net/url" "os" "testing" "time" + + "github.com/stretchr/testify/suite" ) type do func(w http.ResponseWriter) -type QueryXAppsStatusTestSuite struct { +type QueryXAppsConfigTestSuite struct { suite.Suite listener net.Listener xAppMgrFunc do } // suite setup creates the HTTP server -func (suite *QueryXAppsStatusTestSuite) SetupSuite() { +func (suite *QueryXAppsConfigTestSuite) SetupSuite() { os.Unsetenv("http_proxy") os.Unsetenv("HTTP_PROXY") var err error @@ -46,7 +47,7 @@ func (suite *QueryXAppsStatusTestSuite) SetupSuite() { go runXAppMgr(suite.listener, "/test_url/", suite) } -func runXAppMgr(listener net.Listener, url string, suite *QueryXAppsStatusTestSuite) { +func runXAppMgr(listener net.Listener, url string, suite *QueryXAppsConfigTestSuite) { http.HandleFunc(url, func(w http.ResponseWriter, r *http.Request) { switch r.Method { @@ -57,44 +58,44 @@ func runXAppMgr(listener net.Listener, url string, suite *QueryXAppsStatusTestSu http.Serve(listener, nil) } -func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusFailsWithTimeout() { - do_sleep := func(w http.ResponseWriter) { +func (suite *QueryXAppsConfigTestSuite) TestQueryXAppsConfigFailsWithTimeout() { + doSleep := func(w http.ResponseWriter) { time.Sleep(time.Second * 2) } - suite.xAppMgrFunc = do_sleep + suite.xAppMgrFunc = doSleep - data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1) - suite.Nil(data) + data, err := queryXAppsConfig("http://"+suite.listener.Addr().String()+"/test_url/", 1) + suite.Equal([]byte("{}"), data) suite.NotNil(err) e, ok := err.(*url.Error) suite.Equal(ok, true) suite.Equal(e.Timeout(), true) } -func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusFailsWithAnErrorReply() { - do_reply_with_err := func(w http.ResponseWriter) { +func (suite *QueryXAppsConfigTestSuite) TestQueryXAppsConfigFailsWithAnErrorReply() { + doReplyWithErr := func(w http.ResponseWriter) { http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed) } - suite.xAppMgrFunc = do_reply_with_err + suite.xAppMgrFunc = doReplyWithErr - data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1) - suite.Nil(data) + data, err := queryXAppsConfig("http://"+suite.listener.Addr().String()+"/test_url/", 1) + suite.Equal([]byte("{}"), data) suite.NotNil(err) suite.Equal("405 Method Not Allowed", err.Error()) } -func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusOk() { - do_reply := func(w http.ResponseWriter) { +func (suite *QueryXAppsConfigTestSuite) TestQueryXAppsConfigOk() { + doReply := func(w http.ResponseWriter) { fmt.Fprintf(w, "reply message") } - suite.xAppMgrFunc = do_reply + suite.xAppMgrFunc = doReply - data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1) + data, err := queryXAppsConfig("http://"+suite.listener.Addr().String()+"/test_url/", 1) suite.NotNil(data) suite.Nil(err) suite.Equal(data, []byte("reply message")) } -func TestQueryXAppsStatusTestSuite(t *testing.T) { - suite.Run(t, new(QueryXAppsStatusTestSuite)) +func TestQueryXAppsConfigTestSuite(t *testing.T) { + suite.Run(t, new(QueryXAppsConfigTestSuite)) } diff --git a/cmd/vesmgr/vesmgr_test.go b/cmd/vesmgr/vesmgr_test.go index 19dd60d..a9b9275 100644 --- a/cmd/vesmgr/vesmgr_test.go +++ b/cmd/vesmgr/vesmgr_test.go @@ -19,90 +19,123 @@ package main import ( "errors" - "github.com/stretchr/testify/assert" "os" "os/exec" + "path/filepath" + "strconv" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) -func init() { - vesagent.name = "echo" // no need to run real ves-agent - logger.MdcAdd("Testvesmgr", "0.0.1") - os.Setenv("VESMGR_HB_INTERVAL", "30s") - os.Setenv("VESMGR_MEAS_INTERVAL", "30s") - os.Setenv("VESMGR_PRICOLLECTOR_ADDR", "127.1.1.1") - os.Setenv("VESMGR_PRICOLLECTOR_PORT", "8443") - os.Setenv("VESMGR_PROMETHEUS_ADDR", "http://localhost:9090") +func TestGetMyIP(t *testing.T) { + myIPAddress, err := getMyIP() + assert.NotEqual(t, string(""), myIPAddress) + assert.Nil(t, err) } -func TestStartVesagent(t *testing.T) { - assert.Equal(t, 0, vesagent.Pid) - ch := make(chan error) - startVesagent(ch) - assert.NotEqual(t, 0, vesagent.Pid) - t.Logf("VES agent pid = %d", vesagent.Pid) - vesagent.Pid = 0 - err := <-ch +func TestConfCreate(t *testing.T) { + tmpfile := filepath.Join(os.TempDir(), "vestest."+strconv.Itoa(os.Getpid())) + defer os.Remove(tmpfile) // clean up + createConf(tmpfile, []byte("{}")) + _, err := os.Stat(tmpfile) assert.Nil(t, err) } -func TestStartVesagentFails(t *testing.T) { - vesagent.name = "Not-ves-agent" - assert.Equal(t, 0, vesagent.Pid) - ch := make(chan error) - startVesagent(ch) - err := <-ch - assert.NotNil(t, err) - assert.Equal(t, 0, vesagent.Pid) - vesagent.name = "ves-agent" +type VesmgrTestSuite struct { + suite.Suite + vesmgr VesMgr } -func TestGetMyIP(t *testing.T) { - vesmgr.myIPAddress = string("") - var err error - vesmgr.myIPAddress, err = getMyIP() - assert.NotEqual(t, string(""), vesmgr.myIPAddress) - assert.Equal(t, nil, err) +func (suite *VesmgrTestSuite) SetupSuite() { + suite.vesmgr = VesMgr{} + suite.vesmgr.Init("0") + logger.MdcAdd("Testvesmgr", "0.0.1") + os.Setenv("VESMGR_HB_INTERVAL", "30s") + os.Setenv("VESMGR_MEAS_INTERVAL", "30s") + os.Setenv("VESMGR_PRICOLLECTOR_ADDR", "127.1.1.1") + os.Setenv("VESMGR_PRICOLLECTOR_PORT", "8443") + os.Setenv("VESMGR_PROMETHEUS_ADDR", "http://localhost:9090") } -func TestMainLoopSupervision(t *testing.T) { - chXAppNotifications := make(chan []byte) - chSupervision := make(chan chan string) - chVesagent := make(chan error) - chSubscriptions := make(chan subsChannel) - go runVesmgr(chVesagent, chSupervision, chXAppNotifications, chSubscriptions) - +func (suite *VesmgrTestSuite) TestMainLoopSupervision() { + go suite.vesmgr.servRequest() ch := make(chan string) - chSupervision <- ch + suite.vesmgr.chSupervision <- ch reply := <-ch - assert.Equal(t, "OK", reply) + suite.Equal("OK", reply) } -func TestMainLoopVesagentError(t *testing.T) { +func (suite *VesmgrTestSuite) TestMainLoopVesagentError() { if os.Getenv("TEST_VESPA_EXIT") == "1" { // we're run in a new process, now make vesmgr main loop exit - chXAppNotifications := make(chan []byte) - chSupervision := make(chan chan string) - chVesagent := make(chan error) - chSubscriptions := make(chan subsChannel) - go runVesmgr(chVesagent, chSupervision, chXAppNotifications, chSubscriptions) - - chVesagent <- errors.New("vesagent killed") + go suite.vesmgr.servRequest() + suite.vesmgr.chVesagent <- errors.New("vesagent killed") // we should never actually end up to this sleep, since the runVesmgr should exit time.Sleep(3 * time.Second) return } // Run the vesmgr exit test as a separate process - cmd := exec.Command(os.Args[0], "-test.run=TestMainLoopVesagentError") + cmd := exec.Command(os.Args[0], "-test.run", "TestVesMgrSuite", "-testify.m", "TestMainLoopVesagentError") cmd.Env = append(os.Environ(), "TEST_VESPA_EXIT=1") cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr err := cmd.Run() - // check that vesmgr existed with status 1 + e, ok := err.(*exec.ExitError) - assert.Equal(t, true, ok) - assert.Equal(t, "exit status 1", e.Error()) + suite.True(ok) + suite.Equal("exit status 1", e.Error()) +} + +func (suite *VesmgrTestSuite) TestWaitSubscriptionLoopRespondsSupervisionAndBreaksWhenReceivedSubsNotif() { + go func() { + time.Sleep(time.Second) + ch := make(chan string) + suite.vesmgr.chSupervision <- ch + suite.Equal("OK", <-ch) + suite.vesmgr.chSupervision <- ch + suite.Equal("OK", <-ch) + suite.vesmgr.chXAppSubscriptions <- subscriptionNotification{true, nil, ""} + }() + + suite.vesmgr.waitSubscriptionLoop() +} + +func (suite *VesmgrTestSuite) TestEmptyNotificationChannelReadsAllMsgsFromCh() { + go func() { + for i := 0; i < 11; i++ { + suite.vesmgr.chXAppNotifications <- []byte("hello") + } + }() + time.Sleep(500 * time.Millisecond) + <-suite.vesmgr.chXAppNotifications + suite.vesmgr.emptyNotificationsChannel() + select { + case <-suite.vesmgr.chXAppNotifications: + suite.Fail("Got unexpected notification") + default: + // ok + } +} + +func (suite *VesmgrTestSuite) TestVespaKilling() { + suite.vesmgr.vesagent = makeRunner("sleep", "20") + suite.vesmgr.startVesagent() + suite.NotNil(suite.vesmgr.killVespa()) +} + +func (suite *VesmgrTestSuite) TestVespaKillingAlreadyKilled() { + suite.vesmgr.vesagent = makeRunner("sleep", "20") + suite.vesmgr.startVesagent() + suite.NotNil(suite.vesmgr.killVespa()) + // Just check that second kill does not block execution + suite.NotNil(suite.vesmgr.killVespa()) +} + +func TestVesMgrSuite(t *testing.T) { + suite.Run(t, new(VesmgrTestSuite)) } diff --git a/container-tag.yaml b/container-tag.yaml index 553a6e5..fa6e83d 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -1,4 +1,4 @@ # The Jenkins job uses this string for the tag in the image name # for example nexus3.o-ran-sc.org:10004/my-image-name:0.0.1 --- -tag: 0.0.4 +tag: 0.0.5