Configure xApp metrics in xApp description 60/960/3
authorKatri Turunen <katri.turunen@nokia.com>
Mon, 16 Sep 2019 05:48:18 +0000 (08:48 +0300)
committerKatri Turunen <katri.turunen@nokia.com>
Mon, 16 Sep 2019 10:58:33 +0000 (13:58 +0300)
Vesmgr subscribes the xApp notifications from xAppMgr. When it
receives a notification, it requests the xApp config from xAppMgr,
creates the VESPA configuration according to it, and restarts
VESPA with the new configuration.

The xApp counters should be defined in the xApp descriptor.
Vesmgr reads the counter definitions from section config->metrics.
The following fields are required:

* name - Prometheus name of the counter
* objectName - Ves collector object name
* objectInstance - Ves collector object instance

If the same counter name is defined by several xApps,
vesmgr makes the configuration based on the first definition
and ignores the latter ones, if they are conflicting.
However, VESPA reports all counters to Ves collector regardless
of the xApp exposing them.

VESPA reads the ricComponentName from Prometheus label
"kubernetes_name".

There are no more hard-coded metrics configured in this version.

Change-Id: I746c6941cebe686165aed97e223b0ec0e9c7a679
Signed-off-by: Katri Turunen <katri.turunen@nokia.com>
13 files changed:
README.md
cmd/vesmgr/config.go
cmd/vesmgr/config_test.go
cmd/vesmgr/httpserver.go [new file with mode: 0644]
cmd/vesmgr/httpserver_test.go [new file with mode: 0644]
cmd/vesmgr/subscribexAPPNotifications.go [new file with mode: 0644]
cmd/vesmgr/subscribexAPPNotifications_test.go [new file with mode: 0644]
cmd/vesmgr/vesmgr.go
cmd/vesmgr/vesmgr_queryxappssttus_test.go [new file with mode: 0644]
cmd/vesmgr/vesmgr_test.go
go.sum
test/xApp_config_test_output.json [new file with mode: 0644]
ves-agent-chart/templates/deployment.yaml

index 534b7b2..dc023a2 100644 (file)
--- a/README.md
+++ b/README.md
@@ -4,8 +4,49 @@ The VESPA manager uses the VES Agent (https://github.com/nokia/ONAP-VESPA)
 to adapt near-RT RIC internal statistics' collection using Prometheus 
 (xApps and platform containers) to ONAP's VES (VNF event streaming).
 
+The vesmgr container runs two processes: the VESPA manager and the VES
+Agent (i.s. VESPA).
+
 The VESPA manager starts and configures the VES Agent.
 
+The VES Agent is a service acting as a bridge between Prometheus and 
+ONAP's VES Collector.
+
+# Application metrics definition
+
+The application metrics are defined in the application descriptor.
+For each counter, the following fields are required in the "metrics" 
+section of the descriptor:
+
+* name - Prometheus name of the counter
+* objectName - object name in VES
+* objectInstance - object instance in VE
+
+The VESPA manager receives the application metrics configuration from the
+application manager. It subscribes the app notification messages from the 
+application manager, and after having received one, requests the latest
+application configuration, creates the VES Agent configuration based on it,
+and restarts the VES Agent.
+
+The VES Agent does not report any other metrics to VES.
+
+# Prometheus configuration
+
+The VES Agent reads the ricComponentName from Prometheus label
+"kubernetes_name".
+
+# VES Collector event format
+
+The VES Agent transmits events to the VES Collector in the 
+VES Common Event Format v5.4.1. 
+The Common Event Format is expressed in JSON schema v28.4.1.
+
+VES Event Listener 5.4.1:
+<https://docs.onap.org/en/casablanca/submodules/vnfsdk/model.git/docs/files/VESEventListener.html>
+
+JSON schema v28.4.1:
+<https://github.com/nokia/ONAP-VESPA/blob/8e9d9e93bb00bed0f5402c9de9502385d5e80acc/doc/CommonEventFormat_28.4.1.json>
+
 # Environment variables
 
 The VESPA manager container requires the following environment variables:
@@ -14,6 +55,7 @@ The VESPA manager container requires the following environment variables:
 * VESMGR_MEAS_INTERVAL - Measurement interval as a string. For example: 60s.
 * VESMGR_PROMETHEUS_ADDR - Prometheus address. For example: http://127.0.0.1:123
 
+
 * VESMGR_PRICOLLECTOR_ADDR - Primary collector FQDN as a string. For example: ricaux-entry.
 * VESMGR_PRICOLLECTOR_PORT - Primary collector port id as an integer. Default: 8443.
 * VESMGR_PRICOLLECTOR_SERVERROOT - Path before the /eventListener part of the POST URL as a string.
@@ -23,6 +65,9 @@ The VESPA manager container requires the following environment variables:
 * VESMGR_PRICOLLECTOR_PASSWORD - Password as a string.
 * VESMGR_PRICOLLECTOR_PASSPHASE - Passphrase as a string.
 
+
+* VESMGR_APPMGRDOMAIN - Application manager domain. This is for testing purposes, only. Default: service-ricplt-appmgr-http.ricplt.svc.cluster.local.
+
 # Unit Tests
 
 In order to run the VESPA manager unit tests, give the following command:
index 7c4a2dc..e2c9f25 100644 (file)
 package main
 
 import (
+       "encoding/json"
        "gopkg.in/yaml.v2"
-       "time"
        "io"
        "os"
        "strconv"
+       "time"
 )
 
 func basicVespaConf() VESAgentConfiguration {
-       var vespaconf = VESAgentConfiguration {
+       var vespaconf = VESAgentConfiguration{
                DataDir: "/tmp/data",
                Debug:   false,
-               Event: EventConfiguration {
-                       VNFName: "vespa-demo", // XXX
+               Event: EventConfiguration{
+                       VNFName:           "vespa-demo",                          // XXX
                        ReportingEntityID: "1af5bfa9-40b4-4522-b045-40e54f0310f", // XXX
-                       MaxSize: 2000000,
-                       NfNamingCode: "hsxp",
-                       NfcNamingCodes: [] NfcNamingCode {
-                               NfcNamingCode {
-                                       Type: "oam",
-                                       Vnfcs: [] string {"lr-ope-0","lr-ope-1","lr-ope-2"},
+                       MaxSize:           2000000,
+                       NfNamingCode:      "hsxp",
+                       NfcNamingCodes: []NfcNamingCode{
+                               NfcNamingCode{
+                                       Type:  "oam",
+                                       Vnfcs: []string{"lr-ope-0", "lr-ope-1", "lr-ope-2"},
                                },
-                               NfcNamingCode {
-                                       Type: "etl",
-                                       Vnfcs: [] string {"lr-pro-0","lr-pro-1"},
+                               NfcNamingCode{
+                                       Type:  "etl",
+                                       Vnfcs: []string{"lr-pro-0", "lr-pro-1"},
                                },
                        },
                        RetryInterval: time.Second * 5,
-                       MaxMissed: 2,
+                       MaxMissed:     2,
                },
-               Measurement: MeasurementConfiguration {
-                       DomainAbbreviation: "Mvfs",
+               Measurement: MeasurementConfiguration{
+                       DomainAbbreviation:   "Mvfs",
                        MaxBufferingDuration: time.Hour,
-                       Prometheus: PrometheusConfig {
-                               Timeout: time.Second * 30,
+                       Prometheus: PrometheusConfig{
+                               Timeout:   time.Second * 30,
                                KeepAlive: time.Second * 30,
-                               Rules: MetricRules {
-                                       DefaultValues: &MetricRule {
+                               Rules: MetricRules{
+                                       DefaultValues: &MetricRule{
                                                VMIDLabel: "'{{.labels.instance}}'",
                                        },
                                },
@@ -64,32 +65,93 @@ func basicVespaConf() VESAgentConfiguration {
        return vespaconf
 }
 
-func getRules(vespaconf *VESAgentConfiguration) {
-       // XXX
+type AppMetricsStruct struct {
+       ObjectName     string
+       ObjectInstance string
+       // xxx add labels here
+}
+
+type AppMetrics map[string]AppMetricsStruct
+
+// Parses the metrics data from an array of bytes, which is expected to contain a JSON
+// array with structs of the following format:
+//
+// { ...
+//   "config" : {
+//     "metrics": [
+//       { "name": "...", "objectName": "...", "objectInstamce": "..." },
+//       ...
+//     ]
+//   }
+// }
+func parseMetricsFromXAppDescriptor(descriptor []byte, appMetrics AppMetrics) AppMetrics {
+       var desc []map[string]interface{}
+       json.Unmarshal(descriptor, &desc)
+
+       for _, app := range desc {
+               config, config_ok := app["config"]
+               if config_ok {
+                       metrics, metrics_ok := config.(map[string]interface{})["metrics"]
+                       if metrics_ok {
+                               parseMetricsRules(metrics.([]interface{}), appMetrics)
+                       }
+               }
+       }
+       return appMetrics
+}
+
+// Parses the metrics data from an array of interfaces, which are expected to be maps
+// of the following format:
+//    { "name": xxx, "objectName": yyy, "objectInstance": zzz }
+// Entries, which do not have all the necessary fields, are ignored.
+func parseMetricsRules(metricsMap []interface{}, appMetrics AppMetrics) AppMetrics {
+       for _, element := range metricsMap {
+               name, name_ok := element.(map[string]interface{})["name"].(string)
+               if name_ok {
+                       _, already_found := appMetrics[name]
+                       objectName, objectName_ok := element.(map[string]interface{})["objectName"].(string)
+                       objectInstance, objectInstance_ok := element.(map[string]interface{})["objectInstance"].(string)
+                       if !already_found && objectName_ok && objectInstance_ok {
+                               appMetrics[name] = AppMetricsStruct{objectName, objectInstance}
+                               logger.Info("parsed counter %s %s %s", name, objectName, objectInstance)
+                       }
+                       if already_found {
+                               logger.Info("skipped duplicate counter %s", name)
+                       }
+               }
+       }
+       return appMetrics
+}
+
+func getRules(vespaconf *VESAgentConfiguration, xAppConfig []byte) {
+       appMetrics := make(AppMetrics)
+       parseMetricsFromXAppDescriptor(xAppConfig, appMetrics)
+
        makeRule := func(expr string, obj_name string, obj_instance string) MetricRule {
-               return MetricRule {
-                       Target: "AdditionalObjects",
-                       Expr: expr,
+               return MetricRule{
+                       Target:         "AdditionalObjects",
+                       Expr:           expr,
                        ObjectInstance: obj_instance,
-                       ObjectName: obj_name,
-                       ObjectKeys: [] Label {
-                               Label {
+                       ObjectName:     obj_name,
+                       ObjectKeys: []Label{
+                               Label{
                                        Name: "ricComponentName",
-                                       Expr: "'{{.labels.app_kubernetes_io_instance}}'",
+                                       Expr: "'{{.labels.kubernetes_name}}'",
                                },
                        },
                }
        }
-       // Hard coded for now
-       vespaconf.Measurement.Prometheus.Rules.Metrics = []MetricRule {
-               makeRule("ricxapp_RMR_Received", "ricxappRMRreceivedCounter", "ricxappRMRReceived"),
-               makeRule("ricxapp_RMR_ReceiveError", "ricxappRMRReceiveErrorCounter", "ricxappRMRReceiveError"),
-               makeRule("ricxapp_RMR_Transmitted", "ricxappRMRTransmittedCounter", "ricxappRMRTransmitted"),
-               makeRule("ricxapp_RMR_TransmitError", "ricxappRMRTransmitErrorCounter", "ricxappRMRTransmitError"),
-               makeRule("ricxapp_SDL_Stored", "ricxappSDLStoredCounter", "ricxappSDLStored"),
-               makeRule("ricxapp_SDL_StoreError", "ricxappSDLStoreErrorCounter", "ricxappSDLStoreError"),
-       }
+       var metricsMap map[string][]interface{}
+       json.Unmarshal(xAppConfig, &metricsMap)
+       metrics := parseMetricsRules(metricsMap["metrics"], appMetrics)
 
+       vespaconf.Measurement.Prometheus.Rules.Metrics = make([]MetricRule, 0, len(metrics))
+       for key, value := range metrics {
+               vespaconf.Measurement.Prometheus.Rules.Metrics = append(vespaconf.Measurement.Prometheus.Rules.Metrics, makeRule(key, value.ObjectName, value.ObjectInstance))
+       }
+       if len(vespaconf.Measurement.Prometheus.Rules.Metrics) == 0 {
+               logger.Info("vespa config with empty metrics")
+       }
 }
 
 func getCollectorConfiguration(vespaconf *VESAgentConfiguration) {
@@ -114,9 +176,9 @@ func getCollectorConfiguration(vespaconf *VESAgentConfiguration) {
        }
 }
 
-func createVespaConfig(writer io.Writer) {
+func createVespaConfig(writer io.Writer, xAppStatus []byte) {
        vespaconf := basicVespaConf()
-       getRules(&vespaconf)
+       getRules(&vespaconf, xAppStatus)
        getCollectorConfiguration(&vespaconf)
        err := yaml.NewEncoder(writer).Encode(vespaconf)
        if err != nil {
index b255db9..08bde19 100644 (file)
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
- package main
+package main
 
- import (
-       "testing"
-       "time"
+import (
        "bytes"
+       "encoding/json"
        "github.com/stretchr/testify/assert"
        "gopkg.in/yaml.v2"
+       "io/ioutil"
        "os"
+       "testing"
+       "time"
 )
 
 func testBaseConf(t *testing.T, vesconf VESAgentConfiguration) {
@@ -58,7 +60,7 @@ func TestCollectorConfiguration(t *testing.T) {
        assert.Equal(t, 1234, vesconf.PrimaryCollector.Port)
        assert.Equal(t, "vescollector", vesconf.PrimaryCollector.ServerRoot)
        assert.Equal(t, "sometopic", vesconf.PrimaryCollector.Topic)
-       assert.Equal(t, true, vesconf.PrimaryCollector.Secure)
+       assert.True(t, vesconf.PrimaryCollector.Secure)
 }
 
 func TestCollectorConfigurationWhenEnvironmentVariablesAreNotDefined(t *testing.T) {
@@ -81,7 +83,7 @@ func TestCollectorConfigurationWhenEnvironmentVariablesAreNotDefined(t *testing.
        assert.Equal(t, 8443, vesconf.PrimaryCollector.Port)
        assert.Equal(t, "", vesconf.PrimaryCollector.ServerRoot)
        assert.Equal(t, "", vesconf.PrimaryCollector.Topic)
-       assert.Equal(t, false, vesconf.PrimaryCollector.Secure)
+       assert.False(t, vesconf.PrimaryCollector.Secure)
 }
 
 func TestCollectorConfigurationWhenPrimaryCollectorPortIsNotInteger(t *testing.T) {
@@ -95,14 +97,148 @@ func TestCollectorConfigurationWhenPrimaryCollectorSecureIsNotTrueOrFalse(t *tes
        os.Setenv("VESMGR_PRICOLLECTOR_SECURE", "foo")
        vesconf := basicVespaConf()
        getCollectorConfiguration(&vesconf)
-       assert.Equal(t, false, vesconf.PrimaryCollector.Secure)
+       assert.False(t, vesconf.PrimaryCollector.Secure)
 }
 
-func TestYamlGeneration(t *testing.T) {
+func TestYamlGenerationWithoutXAppsConfig(t *testing.T) {
        buffer := new(bytes.Buffer)
-       createVespaConfig(buffer)
+       createVespaConfig(buffer, []byte{})
        var vesconf VESAgentConfiguration
        err := yaml.Unmarshal(buffer.Bytes(), &vesconf)
        assert.Nil(t, err)
        testBaseConf(t, vesconf)
+       assert.Empty(t, vesconf.Measurement.Prometheus.Rules.Metrics)
+}
+
+func TestYamlGenerationWithXAppsConfig(t *testing.T) {
+       buffer := new(bytes.Buffer)
+       bytes, err := ioutil.ReadFile("../../test/xApp_config_test_output.json")
+       assert.Nil(t, err)
+       createVespaConfig(buffer, bytes)
+       var vesconf VESAgentConfiguration
+       err = yaml.Unmarshal(buffer.Bytes(), &vesconf)
+       assert.Nil(t, err)
+       testBaseConf(t, vesconf)
+       assert.Len(t, vesconf.Measurement.Prometheus.Rules.Metrics, 4)
+}
+
+// Helper function for the metrics parsing tests
+func metricsStringToInterfaceArray(metrics string) []interface{} {
+       var metricsArray map[string][]interface{}
+       json.Unmarshal([]byte(metrics), &metricsArray)
+       return metricsArray["metrics"]
+}
+
+func TestParseMetricsRules(t *testing.T) {
+       metricsJson := `{"metrics": [
+                       { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" },
+                       { "name": "ricxapp_RMR_ReceiveError", "objectName": "ricxappRMRReceiveErrorCounter", "objectInstance": "ricxappRMRReceiveError" },
+                       { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" },
+                       { "name": "ricxapp_RMR_TransmitError", "objectName": "ricxappRMRTransmitErrorCounter", "objectInstance": "ricxappRMRTransmitError" },
+                       { "name": "ricxapp_SDL_Stored", "objectName": "ricxappSDLStoredCounter", "objectInstance": "ricxappSDLStored" },
+                       { "name": "ricxapp_SDL_StoreError", "objectName": "ricxappSDLStoreErrorCounter", "objectInstance": "ricxappSDLStoreError" } ]}`
+       appMetrics := make(AppMetrics)
+       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       appMetrics = parseMetricsRules(m, appMetrics)
+       assert.Len(t, appMetrics, 6)
+       assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
+       assert.Equal(t, "ricxappRMRTransmitErrorCounter", appMetrics["ricxapp_RMR_TransmitError"].ObjectName)
+       assert.Equal(t, "ricxappSDLStoreError", appMetrics["ricxapp_SDL_StoreError"].ObjectInstance)
+}
+
+func TestParseMetricsRulesNoMetrics(t *testing.T) {
+       appMetrics := make(AppMetrics)
+       metricsJson := `{"metrics": []`
+       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       appMetrics = parseMetricsRules(m, appMetrics)
+       assert.Empty(t, appMetrics)
+}
+
+func TestParseMetricsRulesAdditionalFields(t *testing.T) {
+       appMetrics := make(AppMetrics)
+       metricsJson := `{"metrics": [
+                       { "additionalField": "valueIgnored", "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }]}`
+       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       appMetrics = parseMetricsRules(m, appMetrics)
+       assert.Len(t, appMetrics, 1)
+       assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
+       assert.Equal(t, "ricxappRMRReceived", appMetrics["ricxapp_RMR_Received"].ObjectInstance)
+}
+
+func TestParseMetricsRulesMissingFields(t *testing.T) {
+       appMetrics := make(AppMetrics)
+       metricsJson := `{"metrics": [
+                       { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" },
+                       { "name": "ricxapp_RMR_ReceiveError", "objectInstance": "ricxappRMRReceiveError" },
+                       { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}`
+       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       appMetrics = parseMetricsRules(m, appMetrics)
+       assert.Len(t, appMetrics, 2)
+       assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
+       assert.Equal(t, "ricxappRMRTransmittedCounter", appMetrics["ricxapp_RMR_Transmitted"].ObjectName)
+       _, ok := appMetrics["ricxapp_RMR_ReceiveError"]
+       assert.False(t, ok)
+}
+
+func TestParseMetricsRulesDuplicateDefinitionIsIgnored(t *testing.T) {
+       appMetrics := make(AppMetrics)
+       metricsJson := `{"metrics": [
+                       { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" },
+                       { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounterXXX", "objectInstance": "ricxappRMRReceivedXXX" },
+                       { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}`
+       var m []interface{} = metricsStringToInterfaceArray(metricsJson)
+       appMetrics = parseMetricsRules(m, appMetrics)
+       assert.Len(t, appMetrics, 2)
+       assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
+       assert.Equal(t, "ricxappRMRReceived", appMetrics["ricxapp_RMR_Received"].ObjectInstance)
+}
+
+func TestParseMetricsRulesIncrementalFillOfAppMetrics(t *testing.T) {
+       appMetrics := make(AppMetrics)
+       metricsJson1 := `{"metrics": [
+                       { "name": "ricxapp_RMR_Received", "objectName": "ricxappRMRreceivedCounter", "objectInstance": "ricxappRMRReceived" }]}`
+       metricsJson2 := `{"metrics": [
+                       { "name": "ricxapp_RMR_Transmitted", "objectName": "ricxappRMRTransmittedCounter", "objectInstance": "ricxappRMRTransmitted" }]}`
+       var m1 []interface{} = metricsStringToInterfaceArray(metricsJson1)
+       var m2 []interface{} = metricsStringToInterfaceArray(metricsJson2)
+       appMetrics = parseMetricsRules(m1, appMetrics)
+       appMetrics = parseMetricsRules(m2, appMetrics)
+       assert.Len(t, appMetrics, 2)
+       assert.Equal(t, "ricxappRMRreceivedCounter", appMetrics["ricxapp_RMR_Received"].ObjectName)
+       assert.Equal(t, "ricxappRMRReceived", appMetrics["ricxapp_RMR_Received"].ObjectInstance)
+}
+
+func TestParseXAppDescriptor(t *testing.T) {
+       appMetrics := make(AppMetrics)
+       bytes, err := ioutil.ReadFile("../../test/xApp_config_test_output.json")
+       assert.Nil(t, err)
+
+       appMetrics = parseMetricsFromXAppDescriptor(bytes, appMetrics)
+       assert.Len(t, appMetrics, 4)
+       assert.Equal(t, "App1ExampleCounterOneObject", appMetrics["App1ExampleCounterOne"].ObjectName)
+       assert.Equal(t, "App1ExampleCounterOneObjectInstance", appMetrics["App1ExampleCounterOne"].ObjectInstance)
+       assert.Equal(t, "App1ExampleCounterTwoObject", appMetrics["App1ExampleCounterTwo"].ObjectName)
+       assert.Equal(t, "App1ExampleCounterTwoObjectInstance", appMetrics["App1ExampleCounterTwo"].ObjectInstance)
+       assert.Equal(t, "App2ExampleCounterOneObject", appMetrics["App2ExampleCounterOne"].ObjectName)
+       assert.Equal(t, "App2ExampleCounterOneObjectInstance", appMetrics["App2ExampleCounterOne"].ObjectInstance)
+       assert.Equal(t, "App2ExampleCounterTwoObject", appMetrics["App2ExampleCounterTwo"].ObjectName)
+       assert.Equal(t, "App2ExampleCounterTwoObjectInstance", appMetrics["App2ExampleCounterTwo"].ObjectInstance)
+}
+
+func TestParseXAppDescriptorWithNoConfig(t *testing.T) {
+       metricsJson := `[{{"metadata": "something", "descriptor": "somethingelse"}},
+                        {{"metadata": "something", "descriptor": "somethingelse"}}]`
+       metricsBytes := []byte(metricsJson)
+       appMetrics := make(AppMetrics)
+       appMetrics = parseMetricsFromXAppDescriptor(metricsBytes, appMetrics)
+       assert.Empty(t, appMetrics)
+}
+
+func TestParseXAppDescriptorWithNoMetrics(t *testing.T) {
+       metricsJson := `[{{"metadata": "something", "descriptor": "somethingelse", "config":{}},
+                        {{"metadata": "something", "descriptor": "somethingelse", "config":{}}}]`
+       metricsBytes := []byte(metricsJson)
+       appMetrics := make(AppMetrics)
+       appMetrics = parseMetricsFromXAppDescriptor(metricsBytes, appMetrics)
+       assert.Empty(t, appMetrics)
 }
diff --git a/cmd/vesmgr/httpserver.go b/cmd/vesmgr/httpserver.go
new file mode 100644 (file)
index 0000000..585319e
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ *  Copyright (c) 2019 AT&T Intellectual Property.
+ *  Copyright (c) 2018-2019 Nokia.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+       "fmt"
+       "io/ioutil"
+       "net"
+       "net/http"
+)
+
+const SupervisionUrl = "/supervision/"
+
+func startHttpServer(listener net.Listener, xappnotifUrl string, notif_ch chan []byte, supervision_ch chan chan string) {
+       go runHttpServer(listener, xappnotifUrl, notif_ch, supervision_ch)
+}
+
+func runHttpServer(listener net.Listener, xappNotifUrl string, notif_ch chan []byte, supervision_ch chan chan string) {
+
+       logger.Info("vesmgr http server serving at %s", listener.Addr())
+
+       http.HandleFunc(xappNotifUrl, func(w http.ResponseWriter, r *http.Request) {
+
+               switch r.Method {
+               case "POST":
+                       logger.Info("httpServer: POST in %s", xappNotifUrl)
+                       body, err := ioutil.ReadAll(r.Body)
+                       defer r.Body.Close()
+                       if err != nil {
+                               logger.Error("httpServer: Invalid body in POST request")
+                               return
+                       }
+                       notif_ch <- body
+                       return
+               default:
+                       logger.Error("httpServer: Invalid method %s to %s", r.Method, r.URL.Path)
+                       http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed)
+                       return
+               }
+       })
+
+       http.HandleFunc(SupervisionUrl, func(w http.ResponseWriter, r *http.Request) {
+
+               switch r.Method {
+               case "GET":
+                       logger.Info("httpServer: GET supervision")
+                       supervision_ack_ch := make(chan string)
+                       // send supervision to the main loop
+                       supervision_ch <- supervision_ack_ch
+                       reply := <-supervision_ack_ch
+                       logger.Info("httpServer: supervision ack from the main loop: %s", reply)
+                       fmt.Fprintf(w, reply)
+                       return
+               default:
+                       logger.Error("httpServer: invalid method %s to %s", r.Method, r.URL.Path)
+                       http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed)
+                       return
+               }
+
+       })
+
+       http.Serve(listener, nil)
+}
diff --git a/cmd/vesmgr/httpserver_test.go b/cmd/vesmgr/httpserver_test.go
new file mode 100644 (file)
index 0000000..d52f4b2
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ *  Copyright (c) 2019 AT&T Intellectual Property.
+ *  Copyright (c) 2018-2019 Nokia.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package main
+
+import (
+       "github.com/stretchr/testify/suite"
+       "io/ioutil"
+       "net"
+       "net/http"
+       "os"
+       "strings"
+       "testing"
+)
+
+type HttpServerTestSuite struct {
+       suite.Suite
+       listener       net.Listener
+       ch_notif       chan []byte
+       ch_supervision chan chan string
+}
+
+// suite setup creates the HTTP server
+func (suite *HttpServerTestSuite) SetupSuite() {
+       os.Unsetenv("http_proxy")
+       os.Unsetenv("HTTP_PROXY")
+       var err error
+       suite.listener, err = net.Listen("tcp", ":0")
+       suite.Nil(err)
+       suite.ch_notif = make(chan []byte)
+       suite.ch_supervision = make(chan chan string)
+       startHttpServer(suite.listener, "/vesmgr_notif/", suite.ch_notif, suite.ch_supervision)
+}
+
+func (suite *HttpServerTestSuite) TestHtppServerSupervisionInvalidOperation() {
+       resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+SupervisionUrl, "supervision")
+       suite.Equal("405 method not allowed\n", reply)
+       suite.Equal(405, resp.StatusCode)
+       suite.Equal("405 Method Not Allowed", resp.Status)
+}
+
+func (suite *HttpServerTestSuite) doGet(url string) (*http.Response, string) {
+       resp, err := http.Get(url)
+       suite.Nil(err)
+
+       defer resp.Body.Close()
+       contents, err := ioutil.ReadAll(resp.Body)
+       suite.Nil(err)
+       return resp, string(contents)
+}
+
+func (suite *HttpServerTestSuite) doPost(serverUrl string, msg string) (*http.Response, string) {
+       resp, err := http.Post(serverUrl, "data", strings.NewReader(msg))
+       suite.Nil(err)
+
+       defer resp.Body.Close()
+       contents, err := ioutil.ReadAll(resp.Body)
+       suite.Nil(err)
+       return resp, string(contents)
+}
+
+func replySupervision(ch_supervision chan chan string, reply string) {
+       ch_supervision_ack := <-ch_supervision
+       ch_supervision_ack <- reply
+}
+
+func (suite *HttpServerTestSuite) TestHttpServerSupervision() {
+
+       // start the "main loop" to reply to the supervision to the HTTPServer
+       go replySupervision(suite.ch_supervision, "I'm just fine")
+
+       resp, reply := suite.doGet("http://" + suite.listener.Addr().String() + SupervisionUrl)
+
+       suite.Equal("I'm just fine", reply)
+       suite.Equal(200, resp.StatusCode)
+       suite.Equal("200 OK", resp.Status)
+}
+
+func (suite *HttpServerTestSuite) TestHttpServerInvalidUrl() {
+       resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+"/invalid_url", "foo")
+       suite.Equal("404 page not found\n", reply)
+       suite.Equal(404, resp.StatusCode)
+       suite.Equal("404 Not Found", resp.Status)
+}
+
+func readXAppNotification(ch_notif chan []byte, ch chan []byte) {
+       notification := <-ch_notif
+       ch <- notification
+}
+
+func (suite *HttpServerTestSuite) TestHttpServerXappNotif() {
+       // start the "main loop" to receive the xAppNotification message from the HTTPServer
+       ch := make(chan []byte)
+       go readXAppNotification(suite.ch_notif, ch)
+
+       resp, reply := suite.doPost("http://"+suite.listener.Addr().String()+"/vesmgr_notif/", "test data")
+       suite.Equal("", reply)
+       suite.Equal(200, resp.StatusCode)
+       suite.Equal("200 OK", resp.Status)
+       notification := <-ch
+       suite.Equal([]byte("test data"), notification)
+}
+
+func (suite *HttpServerTestSuite) TestHttpServerXappNotifInvalidOperation() {
+       resp, reply := suite.doGet("http://" + suite.listener.Addr().String() + "/vesmgr_notif/")
+       suite.Equal("405 method not allowed\n", reply)
+       suite.Equal(405, resp.StatusCode)
+       suite.Equal("405 Method Not Allowed", resp.Status)
+}
+
+func TestHttpServerSuite(t *testing.T) {
+       suite.Run(t, new(HttpServerTestSuite))
+}
diff --git a/cmd/vesmgr/subscribexAPPNotifications.go b/cmd/vesmgr/subscribexAPPNotifications.go
new file mode 100644 (file)
index 0000000..6f8ed77
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ *  Copyright (c) 2019 AT&T Intellectual Property.
+ *  Copyright (c) 2018-2019 Nokia.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "time"
+)
+
+// appmgr API
+const appmgrSubsPath = "/ric/v1/subscriptions"
+
+var errPostingFailed error = errors.New("Posting subscriptions failed")
+var errWrongStatusCode error = errors.New("Wrong subscriptions response StatusCode")
+
+func subscribexAppNotifications(targetUrl string, subscriptions chan subsChannel, timeout time.Duration, subsUrl string) {
+       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, targetUrl))
+       req, err := http.NewRequest("POST", subsUrl, bytes.NewBuffer(requestBody))
+       if err != nil {
+               logger.Error("Setting NewRequest failed: %s", err)
+               subscriptions <- subsChannel{false, err}
+               return
+       }
+       req.Header.Set("Content-Type", "application/json")
+       client := &http.Client{}
+       client.Timeout = time.Second * timeout
+       for {
+               err := subscribexAppNotificationsClientDo(req, client)
+               if err == nil {
+                       break
+               } else if err != errPostingFailed && err != errWrongStatusCode {
+                       subscriptions <- subsChannel{false, err}
+                       return
+               }
+               time.Sleep(5 * time.Second)
+       }
+       subscriptions <- subsChannel{true, nil}
+}
+
+func subscribexAppNotificationsClientDo(req *http.Request, client *http.Client) error {
+       resp, err := client.Do(req)
+       if err != nil {
+               logger.Error("Posting subscriptions failed: %s", err)
+               return errPostingFailed
+       } else {
+               defer resp.Body.Close()
+               if resp.StatusCode == http.StatusCreated {
+                       logger.Info("Subscriptions response StatusCode: %d", resp.StatusCode)
+                       logger.Info("Subscriptions response headers: %s", resp.Header)
+                       body, err := ioutil.ReadAll(resp.Body)
+                       if err != nil {
+                               logger.Error("Subscriptions response Body read failed: %s", err)
+                               return err
+                       }
+                       logger.Info("Response Body: %s", body)
+                       var result map[string]interface{}
+                       if err := json.Unmarshal([]byte(body), &result); err != nil {
+                               logger.Error("json.Unmarshal failed: %s", err)
+                               return err
+                       }
+                       logger.Info("Subscription id from the response: %s", result["id"].(string))
+                       vesmgr.appmgrSubsId = result["id"].(string)
+                       return nil
+               } else {
+                       logger.Error("Wrong subscriptions response StatusCode: %d", resp.StatusCode)
+                       return errWrongStatusCode
+               }
+       }
+}
diff --git a/cmd/vesmgr/subscribexAPPNotifications_test.go b/cmd/vesmgr/subscribexAPPNotifications_test.go
new file mode 100644 (file)
index 0000000..31621b3
--- /dev/null
@@ -0,0 +1,140 @@
+/*
+ *  Copyright (c) 2019 AT&T Intellectual Property.
+ *  Copyright (c) 2018-2019 Nokia.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "github.com/stretchr/testify/suite"
+       "io/ioutil"
+       "net/http"
+       "net/http/httptest"
+       "testing"
+)
+
+type AppmgrHttpServerTestSuite struct {
+       suite.Suite
+       subscriptions chan subsChannel
+       xappNotifUrl  string
+}
+
+// suite setup
+func (suite *AppmgrHttpServerTestSuite) SetupSuite() {
+       vesmgr.appmgrSubsId = string("")
+       vesmgr.myIPAddress, _ = getMyIP()
+       suite.xappNotifUrl = "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
+       suite.subscriptions = make(chan subsChannel)
+}
+
+// test setup
+func (suite *AppmgrHttpServerTestSuite) SetupTest() {
+       suite.subscriptions = make(chan subsChannel)
+}
+
+// test teardown
+func (suite *AppmgrHttpServerTestSuite) TearDownTest() {
+       vesmgr.appmgrSubsId = string("")
+}
+
+func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotifications() {
+       testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
+               body, _ := ioutil.ReadAll(req.Body)
+               var result map[string]interface{}
+               err := json.Unmarshal([]byte(body), &result)
+               suite.Nil(err)
+               suite.Equal(5, int(result["maxRetries"].(float64)))
+               suite.Equal(5, int(result["retryTimer"].(float64)))
+               suite.Equal("all", result["eventType"].(string))
+               suite.Equal("POST", req.Method)
+               res.Header().Add("Content-Type", "application/json")
+               res.WriteHeader(http.StatusCreated)
+               res.Write([]byte(`{"id":"deadbeef1234567890", "version":0, "eventType":"all"}`))
+       }))
+       defer testServer.Close()
+
+       go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL)
+       isSubscribed := <-suite.subscriptions
+       suite.Nil(isSubscribed.err)
+       suite.Equal("deadbeef1234567890", vesmgr.appmgrSubsId)
+}
+
+func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsWrongStatus() {
+       testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
+               res.Header().Add("Content-Type", "application/json")
+               res.WriteHeader(http.StatusUnauthorized)
+               res.Write([]byte(`{"id":"deadbeef1234567890", "version":0, "eventType":"all"}`))
+       }))
+       defer testServer.Close()
+
+       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifUrl))
+       req, _ := http.NewRequest("POST", testServer.URL, bytes.NewBuffer(requestBody))
+       req.Header.Set("Content-Type", "application/json")
+       client := &http.Client{}
+
+       err := subscribexAppNotificationsClientDo(req, client)
+       suite.Equal(errWrongStatusCode, err)
+       // after failed POST vesmgr.appmgrSubsId holds an initial values
+       suite.Equal("", vesmgr.appmgrSubsId)
+}
+
+func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsWrongUrl() {
+       // use fake appmgrUrl that is not served in unit test
+       appmgrUrl := "/I_do_not_exist/"
+       requestBody := []byte(fmt.Sprintf(`{"maxRetries": 5, "retryTimer": 5, "eventType":"all", "targetUrl": "%v"}`, suite.xappNotifUrl))
+       req, _ := http.NewRequest("POST", appmgrUrl, bytes.NewBuffer(requestBody))
+       req.Header.Set("Content-Type", "application/json")
+       client := &http.Client{}
+
+       err := subscribexAppNotificationsClientDo(req, client)
+       suite.Equal(errPostingFailed, err)
+       // after failed POST vesmgr.appmgrSubsId holds an initial values
+       suite.Equal("", vesmgr.appmgrSubsId)
+}
+
+func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsReadBodyFails() {
+       testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
+               res.Header().Set("Content-Length", "1")
+               res.Header().Add("Content-Type", "application/json")
+               res.WriteHeader(http.StatusCreated)
+       }))
+       defer testServer.Close()
+
+       go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL)
+       isSubscribed := <-suite.subscriptions
+       suite.Equal("unexpected EOF", isSubscribed.err.Error())
+       suite.Equal("", vesmgr.appmgrSubsId)
+}
+
+func (suite *AppmgrHttpServerTestSuite) TestSubscribexAppNotificationsUnMarshalFails() {
+       testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
+               res.Header().Add("Content-Type", "application/json")
+               res.WriteHeader(http.StatusCreated)
+               res.Write([]byte(`{""dump for UT": make(chan int),"}`))
+       }))
+       defer testServer.Close()
+
+       go subscribexAppNotifications(suite.xappNotifUrl, suite.subscriptions, 1, testServer.URL)
+       isSubscribed := <-suite.subscriptions
+       suite.Equal("invalid character 'd' after object key", isSubscribed.err.Error())
+       suite.Equal("", vesmgr.appmgrSubsId)
+}
+
+func TestAppmgrHttpServerTestSuite(t *testing.T) {
+       suite.Run(t, new(AppmgrHttpServerTestSuite))
+}
index 0867517..18ed793 100755 (executable)
 package main
 
 import (
+       "errors"
+       "io/ioutil"
+       "net"
+       "net/http"
        "os"
        "os/exec"
+       "time"
+
        mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
 )
 
+var appmgrDomain string
+
+const appmgrXAppConfigPath = "/ric/v1/config"
+const appmgrPort = "8080"
+
 type VesAgent struct {
-       Pid  int
-       name string
+       Pid     int
+       name    string
+       process *os.Process
+}
+
+type VesMgr struct {
+       myIPAddress  string
+       appmgrSubsId string
+}
+
+type subsChannel struct {
+       subscribed bool
+       err        error
 }
 
 var vesagent VesAgent
+var vesmgr VesMgr
 var logger *mdcloggo.MdcLogger
-var osExit = os.Exit
+
+const vesmgrXappNotifPort = "8080"
+const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
+const timeoutPostXAppSubscriptions = 5
 
 func init() {
        logger, _ = mdcloggo.InitLogger("vesmgr")
 }
 
-/* Function to initialize vesmgr */
-func vesmgrInit() {
-       vesagent.name = "ves-agent"
-       logger.MdcAdd("vesmgr", "0.0.1")
-       logger.Info("vesmgrInit")
-
-       /* Subscribe notifications from xAPP Mgr */
-       //subscribexAppNotifications()
+func getMyIP() (myIP string, retErr error) {
+       addrs, err := net.InterfaceAddrs()
+       if err != nil {
+               logger.Error("net.InterfaceAddrs failed: %s", err.Error())
+               return "", err
+       }
+       for _, addr := range addrs {
+               // check the address type and if it is not a loopback take it
+               if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+                       if ipnet.IP.To4() != nil {
+                               logger.Info("My IP Address: %s", ipnet.IP.String())
+                               return ipnet.IP.String(), nil
+                       }
+               }
+       }
+       return "", nil
+}
 
-       // create configuration
+func createConf(xappMetrics []byte) {
        f, err := os.Create("/etc/ves-agent/ves-agent.yaml")
        if err != nil {
                logger.Error("Cannot create vespa conf file: %s", err.Error())
-               return
+               os.Exit(1)
        }
        defer f.Close()
 
-       createVespaConfig(f)
+       createVespaConfig(f, xappMetrics)
+       logger.Info("Vespa config created")
+}
+
+func subscribeXAppNotifications(chSubscriptions chan subsChannel) {
+       xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
+       subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
+       go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl)
+       logger.Info("xApp notifications subscribed from %s", subsUrl)
+}
+
+func vesmgrInit() {
+       vesagent.name = "ves-agent"
+       logger.Info("vesmgrInit")
+
+       var err error
+       if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
+               logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error())
+               return
+       }
+
+       var ok bool
+       appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN")
+       if ok {
+               logger.Info("Using appmgrdomain %s", appmgrDomain)
+       } else {
+               appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
+               logger.Info("Using default appmgrdomain %s", appmgrDomain)
+       }
+       chXAppSubscriptions := make(chan subsChannel)
+       chXAppNotifications := make(chan []byte)
+       chSupervision := make(chan chan string)
+       chVesagent := make(chan error)
+
+       listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort)
+       startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision)
+
+       subscribeXAppNotifications(chXAppSubscriptions)
 
-       /* Start ves-agent */
-       ch := startVesagent()
+       createConf([]byte{})
+       startVesagent(chVesagent)
 
-       runVesmgr(ch)
+       runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions)
 }
 
-func startVesagent() chan error {
-       /* Start ves-agent */
+func startVesagent(ch chan error) {
        cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"))
        cmd.Stdout = os.Stdout
        cmd.Stderr = os.Stderr
-       ch := make(chan error)
        if err := cmd.Start(); err != nil {
                logger.Error("vesmgr exiting, ves-agent start failed: %s", err)
                go func() {
@@ -75,20 +145,108 @@ func startVesagent() chan error {
        } else {
                logger.Info("ves-agent started with pid %d", cmd.Process.Pid)
                vesagent.Pid = cmd.Process.Pid
+               vesagent.process = cmd.Process
                go func() {
                        // wait ves-agent exit and then post the error to the channel
                        err := cmd.Wait()
                        ch <- err
                }()
        }
+}
 
-       return ch
+func killVespa(process *os.Process) {
+       logger.Info("Killing vespa")
+       err := process.Kill()
+       if err != nil {
+               logger.Error("Cannot kill vespa: %s", err.Error())
+       }
 }
 
-func runVesmgr(ch chan error) {
+func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) {
+
+       logger.Info("query xAppStatus started, url %s", appmgrUrl)
+       req, err := http.NewRequest("GET", appmgrUrl, nil)
+       if err != nil {
+               logger.Error("Failed to create a HTTP request: %s", err)
+               return nil, err
+       }
+       req.Header.Set("Content-Type", "application/json")
+       client := &http.Client{}
+       client.Timeout = time.Second * timeout
+       resp, err := client.Do(req)
+       if err != nil {
+               logger.Error("Query xApp status failed: %s", err)
+               return nil, err
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode == http.StatusOK {
+               body, err := ioutil.ReadAll(resp.Body)
+               if err != nil {
+                       logger.Error("Failed to read xApp status body: %s", err)
+                       return nil, err
+               }
+               logger.Info("query xAppStatus completed")
+               return body, nil
+       } else {
+               logger.Error("Error from xApp status query: %s", resp.Status)
+               return nil, errors.New(resp.Status)
+       }
+}
+
+type state int
+
+const (
+       normalState           state = iota
+       vespaTerminatingState state = iota
+)
+
+func queryConf() ([]byte, error) {
+       return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
+               10*time.Second)
+}
+
+func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) {
+
+       logger.Info("vesmgr main loop ready")
+       mystate := normalState
+       var xappStatus []byte
        for {
-               err := <-ch
-               logger.Error("Vesagent exited: " + err.Error())
-               osExit(1)
+               select {
+               case supervision := <-chSupervision:
+                       logger.Info("vesmgr: supervision")
+                       supervision <- "OK"
+               case xAppNotif := <-chXAppNotifications:
+                       logger.Info("vesmgr: xApp notification")
+                       logger.Info(string(xAppNotif))
+                       /*
+                        * If xapp status query fails then we cannot create
+                        * a new configuration and kill vespa.
+                        * In that case we assume that
+                        * the situation is fixed when the next
+                        * xapp notif comes
+                        */
+                       var err error
+                       xappStatus, err = queryConf()
+                       if err == nil {
+                               killVespa(vesagent.process)
+                               mystate = vespaTerminatingState
+                       }
+               case err := <-chVesagent:
+                       switch mystate {
+                       case vespaTerminatingState:
+                               logger.Info("Vesagent termination completed")
+                               createConf(xappStatus)
+                               startVesagent(chVesagent)
+                               mystate = normalState
+                       default:
+                               logger.Error("Vesagent exited: " + err.Error())
+                               os.Exit(1)
+                       }
+               case isSubscribed := <-chXAppSubscriptions:
+                       if isSubscribed.err != nil {
+                               logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
+                               os.Exit(1)
+                       }
+               }
        }
 }
diff --git a/cmd/vesmgr/vesmgr_queryxappssttus_test.go b/cmd/vesmgr/vesmgr_queryxappssttus_test.go
new file mode 100644 (file)
index 0000000..5f0f36c
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ *  Copyright (c) 2019 AT&T Intellectual Property.
+ *  Copyright (c) 2018-2019 Nokia.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+       "fmt"
+       "github.com/stretchr/testify/suite"
+       "net"
+       "net/http"
+       "net/url"
+       "os"
+       "testing"
+       "time"
+)
+
+type do func(w http.ResponseWriter)
+
+type QueryXAppsStatusTestSuite struct {
+       suite.Suite
+       listener    net.Listener
+       xAppMgrFunc do
+}
+
+// suite setup creates the HTTP server
+func (suite *QueryXAppsStatusTestSuite) SetupSuite() {
+       os.Unsetenv("http_proxy")
+       os.Unsetenv("HTTP_PROXY")
+       var err error
+       suite.listener, err = net.Listen("tcp", ":0")
+       suite.Nil(err)
+       go runXAppMgr(suite.listener, "/test_url/", suite)
+}
+
+func runXAppMgr(listener net.Listener, url string, suite *QueryXAppsStatusTestSuite) {
+
+       http.HandleFunc(url, func(w http.ResponseWriter, r *http.Request) {
+               switch r.Method {
+               case "GET":
+                       suite.xAppMgrFunc(w)
+               }
+       })
+       http.Serve(listener, nil)
+}
+
+func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusFailsWithTimeout() {
+       do_sleep := func(w http.ResponseWriter) {
+               time.Sleep(time.Second * 2)
+       }
+       suite.xAppMgrFunc = do_sleep
+
+       data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1)
+       suite.Nil(data)
+       suite.NotNil(err)
+       e, ok := err.(*url.Error)
+       suite.Equal(ok, true)
+       suite.Equal(e.Timeout(), true)
+}
+
+func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusFailsWithAnErrorReply() {
+       do_reply_with_err := func(w http.ResponseWriter) {
+               http.Error(w, "405 method not allowed", http.StatusMethodNotAllowed)
+       }
+       suite.xAppMgrFunc = do_reply_with_err
+
+       data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1)
+       suite.Nil(data)
+       suite.NotNil(err)
+       suite.Equal("405 Method Not Allowed", err.Error())
+}
+
+func (suite *QueryXAppsStatusTestSuite) TestQueryXAppsStatusOk() {
+       do_reply := func(w http.ResponseWriter) {
+               fmt.Fprintf(w, "reply message")
+       }
+       suite.xAppMgrFunc = do_reply
+
+       data, err := queryXAppsStatus("http://"+suite.listener.Addr().String()+"/test_url/", 1)
+       suite.NotNil(data)
+       suite.Nil(err)
+       suite.Equal(data, []byte("reply message"))
+}
+
+func TestQueryXAppsStatusTestSuite(t *testing.T) {
+       suite.Run(t, new(QueryXAppsStatusTestSuite))
+}
index 0d90f7a..19dd60d 100644 (file)
 package main
 
 import (
+       "errors"
+       "github.com/stretchr/testify/assert"
        "os"
+       "os/exec"
        "testing"
-
-       "github.com/stretchr/testify/assert"
+       "time"
 )
 
 func init() {
@@ -36,7 +38,8 @@ func init() {
 
 func TestStartVesagent(t *testing.T) {
        assert.Equal(t, 0, vesagent.Pid)
-       ch := startVesagent()
+       ch := make(chan error)
+       startVesagent(ch)
        assert.NotEqual(t, 0, vesagent.Pid)
        t.Logf("VES agent pid = %d", vesagent.Pid)
        vesagent.Pid = 0
@@ -45,12 +48,61 @@ func TestStartVesagent(t *testing.T) {
 }
 
 func TestStartVesagentFails(t *testing.T) {
-
        vesagent.name = "Not-ves-agent"
        assert.Equal(t, 0, vesagent.Pid)
-       ch := startVesagent()
+       ch := make(chan error)
+       startVesagent(ch)
        err := <-ch
        assert.NotNil(t, err)
        assert.Equal(t, 0, vesagent.Pid)
        vesagent.name = "ves-agent"
 }
+
+func TestGetMyIP(t *testing.T) {
+       vesmgr.myIPAddress = string("")
+       var err error
+       vesmgr.myIPAddress, err = getMyIP()
+       assert.NotEqual(t, string(""), vesmgr.myIPAddress)
+       assert.Equal(t, nil, err)
+}
+
+func TestMainLoopSupervision(t *testing.T) {
+       chXAppNotifications := make(chan []byte)
+       chSupervision := make(chan chan string)
+       chVesagent := make(chan error)
+       chSubscriptions := make(chan subsChannel)
+       go runVesmgr(chVesagent, chSupervision, chXAppNotifications, chSubscriptions)
+
+       ch := make(chan string)
+       chSupervision <- ch
+       reply := <-ch
+       assert.Equal(t, "OK", reply)
+}
+
+func TestMainLoopVesagentError(t *testing.T) {
+       if os.Getenv("TEST_VESPA_EXIT") == "1" {
+               // we're run in a new process, now make vesmgr main loop exit
+               chXAppNotifications := make(chan []byte)
+               chSupervision := make(chan chan string)
+               chVesagent := make(chan error)
+               chSubscriptions := make(chan subsChannel)
+               go runVesmgr(chVesagent, chSupervision, chXAppNotifications, chSubscriptions)
+
+               chVesagent <- errors.New("vesagent killed")
+               // we should never actually end up to this sleep, since the runVesmgr should exit
+               time.Sleep(3 * time.Second)
+               return
+       }
+
+       // Run the vesmgr exit test as a separate process
+       cmd := exec.Command(os.Args[0], "-test.run=TestMainLoopVesagentError")
+       cmd.Env = append(os.Environ(), "TEST_VESPA_EXIT=1")
+       cmd.Stdout = os.Stdout
+       cmd.Stderr = os.Stderr
+       err := cmd.Run()
+
+       // check that vesmgr existed with status 1
+       e, ok := err.(*exec.ExitError)
+       assert.Equal(t, true, ok)
+       assert.Equal(t, "exit status 1", e.Error())
+}
diff --git a/go.sum b/go.sum
index 65bfb9d..1841876 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -1,15 +1,18 @@
 gerrit.o-ran-sc.org/r/com/golog.git v0.0.1 h1:9RfO/Whehaaq5KiJTT7s+YOzmi9mT1C3HktfhwwMEmw=
 gerrit.o-ran-sc.org/r/com/golog.git v0.0.1/go.mod h1:b8YB31U8/4iRpABioeSzGi/YMzOQ/Zq7hrJmmXKqlJk=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/markbates/deplist v1.0.5/go.mod h1:gRRbPbbuA8TmMiRvaOzUlRfzfjeCCBqX2A6arxN01MM=
 github.com/markbates/oncer v0.0.0-20180924034138-723ad0170a46/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
diff --git a/test/xApp_config_test_output.json b/test/xApp_config_test_output.json
new file mode 100644 (file)
index 0000000..fccda3f
--- /dev/null
@@ -0,0 +1,292 @@
+[
+    {
+        "metadata": {
+            "name": "xApp1",
+            "configName": "xApp1-appconfig",
+            "namespace": "default"
+        },
+        "descriptor": {
+            "$id": "http://example.com/root.json",
+            "$schema": "http://json-schema.org/draft-07/schema#",
+            "definitions": {
+
+            },
+            "properties": {
+                "local": {
+                    "$id": "#/properties/local",
+                    "properties": {
+                        "host": {
+                            "$id": "#/properties/local/properties/host",
+                            "default": "",
+                            "examples": [
+                                ":8080"
+                            ],
+                            "pattern": "^(.*)$",
+                            "title": "The Host Schema",
+                            "type": "string"
+                        }
+                    },
+                    "required": [
+                        "host"
+                    ],
+                    "title": "The Local Schema",
+                    "type": "object"
+                },
+                "logger": {
+                    "$id": "#/properties/logger",
+                    "properties": {
+                        "level": {
+                            "$id": "#/properties/logger/properties/level",
+                            "default": 0,
+                            "examples": [
+                                3
+                            ],
+                            "title": "The Level Schema",
+                            "type": "integer"
+                        }
+                    },
+                    "required": [
+                        "level"
+                    ],
+                    "title": "The Logger Schema",
+                    "type": "object"
+                },
+                "metrics": {
+                    "$id": "#/properties/metrics",
+                    "items": {
+                        "$id": "#/properties/metrics/items",
+                        "properties": {
+                            "description": {
+                                "$id": "#/properties/metrics/items/properties/description",
+                                "default": "",
+                                "examples": [
+                                    "Example counter 1"
+                                ],
+                                "pattern": "^(.*)$",
+                                "title": "The Description Schema",
+                                "type": "string"
+                            },
+                            "enabled": {
+                                "$id": "#/properties/metrics/items/properties/enabled",
+                                "default": false,
+                                "examples": [
+                                    true
+                                ],
+                                "title": "The Enabled Schema",
+                                "type": "boolean"
+                            },
+                            "name": {
+                                "$id": "#/properties/metrics/items/properties/name",
+                                "default": "",
+                                "examples": [
+                                    "App1ExampleCounterOne"
+                                ],
+                                "pattern": "^(.*)$",
+                                "title": "The Name Schema",
+                                "type": "string"
+                            },
+                            "type": {
+                                "$id": "#/properties/metrics/items/properties/type",
+                                "default": "",
+                                "examples": [
+                                    "counter"
+                                ],
+                                "pattern": "^(.*)$",
+                                "title": "The Type Schema",
+                                "type": "string"
+                            }
+                        },
+                        "required": [
+                            "name",
+                            "type",
+                            "enabled",
+                            "description"
+                        ],
+                        "title": "The Items Schema",
+                        "type": "object"
+                    },
+                    "title": "The Metrics Schema",
+                    "type": "array"
+                }
+            },
+            "required": [
+                "local",
+                "logger",
+                "metrics"
+            ],
+            "title": "ANR Descriptor Schema",
+            "type": "object"
+        },
+        "config": {
+            "local": {
+                "host": ":8080"
+            },
+            "logger": {
+                "level": 5
+            },
+            "metrics": [
+                {
+                    "description": "Example counter 1",
+                    "enabled": true,
+                    "name": "App1ExampleCounterOne",
+                    "type": "counter",
+                    "objectName": "App1ExampleCounterOneObject",
+                    "objectInstance": "App1ExampleCounterOneObjectInstance"
+                },
+                {
+                    "description": "Example counter 2",
+                    "enabled": true,
+                    "name": "App1ExampleCounterTwo",
+                    "type": "counter",
+                    "objectName": "App1ExampleCounterTwoObject",
+                    "objectInstance": "App1ExampleCounterTwoObjectInstance"
+                }
+            ]
+        }
+    },
+    {
+        "metadata": {
+            "name": "xApp2",
+            "configName": "xApp2-appconfig",
+            "namespace": "default"
+        },
+        "descriptor": {
+            "$id": "http://example.com/root.json",
+            "$schema": "http://json-schema.org/draft-07/schema#",
+            "definitions": {
+
+            },
+            "properties": {
+                "local": {
+                    "$id": "#/properties/local",
+                    "properties": {
+                        "host": {
+                            "$id": "#/properties/local/properties/host",
+                            "default": "",
+                            "examples": [
+                                ":8080"
+                            ],
+                            "pattern": "^(.*)$",
+                            "title": "The Host Schema",
+                            "type": "string"
+                        }
+                    },
+                    "required": [
+                        "host"
+                    ],
+                    "title": "The Local Schema",
+                    "type": "object"
+                },
+                "logger": {
+                    "$id": "#/properties/logger",
+                    "properties": {
+                        "level": {
+                            "$id": "#/properties/logger/properties/level",
+                            "default": 0,
+                            "examples": [
+                                3
+                            ],
+                            "title": "The Level Schema",
+                            "type": "integer"
+                        }
+                    },
+                    "required": [
+                        "level"
+                    ],
+                    "title": "The Logger Schema",
+                    "type": "object"
+                },
+                "metrics": {
+                    "$id": "#/properties/metrics",
+                    "items": {
+                        "$id": "#/properties/metrics/items",
+                        "properties": {
+                            "description": {
+                                "$id": "#/properties/metrics/items/properties/description",
+                                "default": "",
+                                "examples": [
+                                    ""
+                                ],
+                                "pattern": "^(.*)$",
+                                "title": "The Description Schema",
+                                "type": "string"
+                            },
+                            "enabled": {
+                                "$id": "#/properties/metrics/items/properties/enabled",
+                                "default": false,
+                                "examples": [
+                                    true
+                                ],
+                                "title": "The Enabled Schema",
+                                "type": "boolean"
+                            },
+                            "name": {
+                                "$id": "#/properties/metrics/items/properties/name",
+                                "default": "",
+                                "examples": [
+                                    "UEContextCreated"
+                                ],
+                                "pattern": "^(.*)$",
+                                "title": "The Name Schema",
+                                "type": "string"
+                            },
+                            "type": {
+                                "$id": "#/properties/metrics/items/properties/type",
+                                "default": "",
+                                "examples": [
+                                    "counter"
+                                ],
+                                "pattern": "^(.*)$",
+                                "title": "The Type Schema",
+                                "type": "string"
+                            }
+                        },
+                        "required": [
+                            "name",
+                            "type",
+                            "enabled",
+                            "description"
+                        ],
+                        "title": "The Items Schema",
+                        "type": "object"
+                    },
+                    "title": "The Metrics Schema",
+                    "type": "array"
+                }
+            },
+            "required": [
+                "local",
+                "logger",
+                "metrics"
+            ],
+            "title": "The Root Schema",
+            "type": "object"
+        },
+        "config": {
+            "local": {
+                "host": ":8080"
+            },
+            "logger": {
+                "level": 3
+            },
+            "metrics": [
+                {
+                    "description": "Example counter",
+                    "enabled": true,
+                    "name": "App2ExampleCounterOne",
+                    "type": "counter",
+                    "objectName": "App2ExampleCounterOneObject",
+                    "objectInstance": "App2ExampleCounterOneObjectInstance"
+                },
+                {
+                    "description": "Another example counter",
+                    "enabled": true,
+                    "name": "App2ExampleCounterTwo",
+                    "type": "counter",
+                    "objectName": "App2ExampleCounterTwoObject",
+                    "objectInstance": "App2ExampleCounterTwoObjectInstance"
+                }
+            ]
+        }
+    }
+]
index 1bd4560..5f9e0ca 100644 (file)
@@ -48,5 +48,7 @@ spec:
                 name: vespa-config
             - secretRef:
                 name: vespa-secrets
-
+          env:
+            - name: VESMGR_APPMGRDOMAIN
+              value: appmgr-service