Adaptation to new xApp Descriptor 09/5009/1 v0.6.0
authorMohamed Abukar <abukar.mohamed@nokia.com>
Sat, 7 Nov 2020 09:22:56 +0000 (11:22 +0200)
committerMohamed Abukar <abukar.mohamed@nokia.com>
Sun, 8 Nov 2020 10:08:03 +0000 (12:08 +0200)
Change-Id: Ic3398b16787dab948a4414d121365870cd5cedbf
Signed-off-by: Mohamed Abukar <abukar.mohamed@nokia.com>
Makefile
config/config-file.json [new file with mode: 0755]
pkg/xapp/config.go
pkg/xapp/db.go
pkg/xapp/rmr.go
pkg/xapp/subscription.go
pkg/xapp/types.go
pkg/xapp/xapp.go

index f657a7a..6d01540 100755 (executable)
--- a/Makefile
+++ b/Makefile
@@ -32,10 +32,10 @@ CACHE_DIR:=$(abspath $(ROOT_DIR)/cache)
 
 XAPP_NAME:=xapp
 XAPP_ROOT:=test
-XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.yaml"
+XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.json"
 include build/make.go.mk 
 
 XAPP_NAME:=xapp
 XAPP_ROOT:=pkg
-XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.yaml"
+XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.json"
 include build/make.go.mk 
diff --git a/config/config-file.json b/config/config-file.json
new file mode 100755 (executable)
index 0000000..3dad244
--- /dev/null
@@ -0,0 +1,88 @@
+{
+    "name": "xapp",
+    "version": "0.7.0",
+    "vendor": "Nokia",
+    "moId": "SEP",
+    "containers": [
+        {
+            "name": "ueec",
+            "image": {
+                "registry": "ranco-dev-tools.eastus.cloudapp.azure.com:10001",
+                "name": "ueec-xapp",
+                "tag": "0.5.3"
+            }
+        }
+    ],
+    "livenessProbe": {
+        "httpGet": {
+            "path": "ric/v1/health/alive",
+            "port": 8080
+        },
+        "initialDelaySeconds": 5,
+        "periodSeconds": 15
+    },
+    "readinessProbe": {
+        "httpGet": {
+            "path": "ric/v1/health/ready",
+            "port": 8080
+        },
+        "initialDelaySeconds": 5,
+        "periodSeconds": 15
+    },
+    "messaging": {
+        "ports": [
+            {
+                "name": "http",
+                "container": "ueec",
+                "port": 8086,
+                "description": "http service"
+            },
+            {
+                "name": "rmr-route",
+                "container": "ueec",
+                "port": 4561,
+                "description": "rmr route port for ueec"
+            },
+            {
+                "name": "rmr-data",
+                "container": "ueec",
+                "port": 4560,
+                "maxSize": 2072,
+                "threadType": 0,
+                "lowLatency": false,
+                "maxRetryOnFailure": 5,
+                "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE"],
+                "txMessages": ["RIC_SGNB_ADDITION_REQ", "RIC_SGNB_ADDITION_ACK"],
+                "policies": [],
+                "description": "rmr data port for ueec"
+            }
+        ]
+    },
+    "controls": {
+        "logger": {
+            "level": 3
+        },
+        "subscription": {
+            "subscriptionActive": true,
+            "functionId": 1,
+            "plmnId": "310150",
+            "eNBId": "202251",
+            "timeout": 5,
+            "host": "service-ricplt-submgr-http.ricplt:8088",
+            "clientEndpoint": "service-ricxapp-ueec-http.ricxapp:8080"
+        }
+    },
+    "metrics": {
+        "url": "/ric/v1/metrics",
+        "namespace": "ricxapp"
+    },
+    "subscription": {
+        "host": "localhost:8088",
+        "timeout": 2,
+        "retryCount": 10,
+        "retryDelay": 5
+    },
+    "faults": {},
+    "measurements": []
+}
+
index 1b4b806..8d427c5 100755 (executable)
@@ -99,7 +99,7 @@ func LoadConfig() (l *Log) {
                l.Info("config file %s changed ", e.Name)
 
                updatemtypes()
-               Logger.SetLevel(viper.GetInt("logger.level"))
+               Logger.SetLevel(viper.GetInt("controls.logger.level"))
                if len(ConfigChangeListeners) > 0 {
                        for _, f := range ConfigChangeListeners {
                                go f(e.Name)
@@ -126,6 +126,33 @@ func PublishConfigChange(appName, eventJson string) error {
        return nil
 }
 
+func GetPortData(pname string) (d PortData) {
+       for _, v := range viper.GetStringMap("messaging")["ports"].([]interface{}) {
+               if n, ok := v.(map[string]interface{})["name"].(string); ok && n == pname {
+                       d.Name = n
+                       if p, _ := v.(map[string]interface{})["port"].(float64); ok {
+                               d.Port = int(p)
+                       }
+                       if m, _ := v.(map[string]interface{})["maxSize"].(float64); ok {
+                               d.MaxSize = int(m)
+                       }
+                       if m, _ := v.(map[string]interface{})["threadType"].(float64); ok {
+                               d.ThreadType = int(m)
+                       }
+                       if m, _ := v.(map[string]interface{})["lowLatency"].(bool); ok {
+                               d.LowLatency = bool(m)
+                       }
+                       if m, _ := v.(map[string]interface{})["fastAck"].(bool); ok {
+                               d.FastAck = bool(m)
+                       }
+                       if m, _ := v.(map[string]interface{})["maxRetryOnFailure"].(float64); ok {
+                               d.MaxRetryOnFailure = int(m)
+                       }
+               }
+       }
+       return
+}
+
 func (*Configurator) SetSDLNotificationCB(appName string, sdlNotificationCb SDLNotificationCB) error {
        return Sdl.Subscribe(sdlNotificationCb, fmt.Sprintf("CM_UPDATE:%s", appName))
 }
index 6591e04..a7302cd 100755 (executable)
@@ -165,7 +165,7 @@ func (c *SDLClient) GetStat() (t SDLStatistics) {
        return
 }
 
-func NewRNIBClient(ns string) *RNIBClient {
+func NewRNIBClient() *RNIBClient {
        s := sdl.NewSdlInstance("e2Manager", sdl.NewDatabase())
        return &RNIBClient{
                db:     s,
index 8e9faef..f2d09da 100755 (executable)
@@ -67,7 +67,6 @@ import (
        "bytes"
        "crypto/md5"
        "fmt"
-       "github.com/spf13/viper"
        "strings"
        "time"
        "unsafe"
@@ -128,26 +127,22 @@ func (params *RMRParams) String() string {
 //
 //-----------------------------------------------------------------------------
 type RMRClientParams struct {
-       ProtPort   string
-       MaxSize    int
-       ThreadType int
-       StatDesc   string
-       LowLatency bool
-       FastAck    bool
+       StatDesc string
+       RmrData  PortData
 }
 
 func (params *RMRClientParams) String() string {
-       return fmt.Sprintf("ProtPort=%s MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
-               params.ProtPort, params.MaxSize, params.ThreadType, params.StatDesc, params.LowLatency, params.FastAck)
+       return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
+               params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc, params.RmrData.LowLatency, params.RmrData.FastAck)
 }
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
 func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
-       p := C.CString(params.ProtPort)
-       m := C.int(params.MaxSize)
-       c := C.int(params.ThreadType)
+       p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
+       m := C.int(params.RmrData.MaxSize)
+       c := C.int(params.RmrData.ThreadType)
        defer C.free(unsafe.Pointer(p))
        ctx := C.rmr_init(p, m, c)
        if ctx == nil {
@@ -156,30 +151,27 @@ func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
 
        Logger.Info("new rmrClient with parameters: %s", params.String())
 
-       if params.LowLatency {
+       if params.RmrData.LowLatency {
                C.rmr_set_low_latency(ctx)
        }
-       if params.FastAck {
+       if params.RmrData.FastAck {
                C.rmr_set_fack(ctx)
        }
 
        return &RMRClient{
-               protPort:  params.ProtPort,
-               context:   ctx,
-               consumers: make([]MessageConsumer, 0),
-               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
+               context:           ctx,
+               consumers:         make([]MessageConsumer, 0),
+               stat:              Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
+               maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
        }
 }
 
 func NewRMRClient() *RMRClient {
+       p := GetPortData("rmr-data")
        return NewRMRClientWithParams(
                &RMRClientParams{
-                       ProtPort:   viper.GetString("rmr.protPort"),
-                       MaxSize:    viper.GetInt("rmr.maxSize"),
-                       ThreadType: viper.GetInt("rmr.threadType"),
-                       StatDesc:   "RMR",
-                       LowLatency: viper.GetBool("rmr.lowLatency"),
-                       FastAck:    viper.GetBool("rmr.fastAck"),
+                       RmrData:  p,
+                       StatDesc: "RMR",
                })
 }
 
@@ -431,12 +423,11 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
        }
 
        // Just quick retry seems to help for K8s issue
-       maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
-       if maxRetryOnFailure == 0 {
-               maxRetryOnFailure = 5
+       if m.maxRetryOnFailure == 0 {
+               m.maxRetryOnFailure = 5
        }
 
-       for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
+       for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
                m.contextMux.Lock()
                if whid != 0 {
                        currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
@@ -582,8 +573,3 @@ func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
        Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
        return 0
 }
-
-// To be removed ...
-func (m *RMRClient) GetStat() (r RMRStatistics) {
-       return
-}
index 4dcf22b..4ecc262 100755 (executable)
@@ -170,7 +170,7 @@ func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint st
        }
 
        ep, _, _ := net.SplitHostPort(clientEndpoint)
-       _, port, _ := net.SplitHostPort(viper.GetString("local.host"))
+       _, port, _ := net.SplitHostPort(fmt.Sprintf(":%d", GetPortData("http").Port))
        clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
 
        retries := viper.GetInt("subscription.retryCount")
index 571dcba..387d393 100755 (executable)
@@ -23,28 +23,19 @@ import (
        "unsafe"
 )
 
-// To be removed ...
-type RMRStatistics struct{}
-
-//
-//
-//
 type RMRClient struct {
-       protPort      string
-       contextMux    sync.Mutex
-       context       unsafe.Pointer
-       ready         int
-       wg            sync.WaitGroup
-       mux           sync.Mutex
-       stat          map[string]Counter
-       consumers     []MessageConsumer
-       readyCb       ReadyCB
-       readyCbParams interface{}
+       contextMux        sync.Mutex
+       context           unsafe.Pointer
+       ready             int
+       wg                sync.WaitGroup
+       mux               sync.Mutex
+       stat              map[string]Counter
+       consumers         []MessageConsumer
+       readyCb           ReadyCB
+       readyCbParams     interface{}
+       maxRetryOnFailure int
 }
 
-//
-//
-//
 type RMRMeid struct {
        PlmnID  string
        EnbID   string
@@ -70,18 +61,22 @@ func (meid *RMRMeid) String() string {
        return str
 }
 
-//
-//
-//
 type MessageConsumerFunc func(*RMRParams) error
 
 func (fn MessageConsumerFunc) Consume(params *RMRParams) error {
        return fn(params)
 }
 
-//
-//
-//
 type MessageConsumer interface {
        Consume(params *RMRParams) error
 }
+
+type PortData struct {
+       Name              string
+       Port              int
+       MaxSize           int
+       ThreadType        int
+       LowLatency        bool
+       FastAck           bool
+       MaxRetryOnFailure int
+}
index 6c7c567..5ab48ab 100755 (executable)
@@ -60,8 +60,8 @@ func SetReadyCB(cb ReadyCB, params interface{}) {
        readyCbParams = params
 }
 
-func xappReadyCb(params interface{}) {
-       Alarm = NewAlarmClient(viper.GetString("alarm.MOId"), viper.GetString("alarm.APPId"))
+func XappReadyCb(params interface{}) {
+       Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
        if readyCb != nil {
                readyCb(readyCbParams)
        }
@@ -71,28 +71,7 @@ func SetShutdownCB(cb ShutdownCB) {
        shutdownCb = cb
 }
 
-func init() {
-       // Load xapp configuration
-       Logger = LoadConfig()
-
-       Logger.SetLevel(viper.GetInt("logger.level"))
-       Resource = NewRouter()
-       Config = Configurator{}
-       Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
-       Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
-
-       if viper.IsSet("db.namespaces") {
-               namespaces := viper.GetStringSlice("db.namespaces")
-               if len(namespaces) > 0 && namespaces[0] != "" {
-                       Sdl = NewSDLClient(viper.GetStringSlice("db.namespaces")[0])
-               }
-               if len(namespaces) > 1 && namespaces[1] != "" {
-                       Rnib = NewRNIBClient(viper.GetStringSlice("db.namespaces")[1])
-               }
-       } else {
-               Sdl = NewSDLClient(viper.GetString("db.namespace"))
-       }
-
+func InstallSignalHandler() {
        //
        // Signal handlers to really exit program.
        // shutdownCb can hang until application has
@@ -103,7 +82,7 @@ func init() {
        signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
        //signal handler function
        go func() {
-               for _ = range interrupt {
+               for range interrupt {
                        if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
                                // close function
                                go func() {
@@ -140,11 +119,28 @@ func init() {
        }()
 }
 
+func init() {
+       // Load xapp configuration
+       Logger = LoadConfig()
+
+       Logger.SetLevel(viper.GetInt("controls.logger.level"))
+       Resource = NewRouter()
+       Config = Configurator{}
+       Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
+       Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
+       Sdl = NewSDLClient(viper.GetString("db.namespace"))
+       Rnib = NewRNIBClient()
+
+       InstallSignalHandler()
+}
+
 func RunWithParams(c MessageConsumer, sdlcheck bool) {
        Rmr = NewRMRClient()
-       Rmr.SetReadyCB(xappReadyCb, nil)
-       go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
-       Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
+       Rmr.SetReadyCB(XappReadyCb, nil)
+
+       host := fmt.Sprintf(":%d", GetPortData("http").Port)
+       go http.ListenAndServe(host, Resource.router)
+       Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
        if sdlcheck {
                Sdl.TestConnection()
        }