From: Mohamed Abukar Date: Sat, 7 Nov 2020 09:22:56 +0000 (+0200) Subject: Adaptation to new xApp Descriptor X-Git-Tag: v0.6.0^0 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=b8b191fe31602c53414f8086381aef1a48e8be09;p=ric-plt%2Fxapp-frame.git Adaptation to new xApp Descriptor Change-Id: Ic3398b16787dab948a4414d121365870cd5cedbf Signed-off-by: Mohamed Abukar --- diff --git a/Makefile b/Makefile index f657a7a..6d01540 100755 --- a/Makefile +++ b/Makefile @@ -32,10 +32,10 @@ CACHE_DIR:=$(abspath $(ROOT_DIR)/cache) XAPP_NAME:=xapp XAPP_ROOT:=test -XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.yaml" +XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.json" include build/make.go.mk XAPP_NAME:=xapp XAPP_ROOT:=pkg -XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.yaml" +XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.json" include build/make.go.mk diff --git a/config/config-file.json b/config/config-file.json new file mode 100755 index 0000000..3dad244 --- /dev/null +++ b/config/config-file.json @@ -0,0 +1,88 @@ +{ + "name": "xapp", + "version": "0.7.0", + "vendor": "Nokia", + "moId": "SEP", + "containers": [ + { + "name": "ueec", + "image": { + "registry": "ranco-dev-tools.eastus.cloudapp.azure.com:10001", + "name": "ueec-xapp", + "tag": "0.5.3" + } + } + ], + "livenessProbe": { + "httpGet": { + "path": "ric/v1/health/alive", + "port": 8080 + }, + "initialDelaySeconds": 5, + "periodSeconds": 15 + }, + "readinessProbe": { + "httpGet": { + "path": "ric/v1/health/ready", + "port": 8080 + }, + "initialDelaySeconds": 5, + "periodSeconds": 15 + }, + "messaging": { + "ports": [ + { + "name": "http", + "container": "ueec", + "port": 8086, + "description": "http service" + }, + { + "name": "rmr-route", + "container": "ueec", + "port": 4561, + "description": "rmr route port for ueec" + }, + { + "name": "rmr-data", + "container": "ueec", + "port": 4560, + "maxSize": 2072, + "threadType": 0, + "lowLatency": false, + "maxRetryOnFailure": 5, + "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE"], + "txMessages": ["RIC_SGNB_ADDITION_REQ", "RIC_SGNB_ADDITION_ACK"], + "policies": [], + "description": "rmr data port for ueec" + } + ] + }, + "controls": { + "logger": { + "level": 3 + }, + "subscription": { + "subscriptionActive": true, + "functionId": 1, + "plmnId": "310150", + "eNBId": "202251", + "timeout": 5, + "host": "service-ricplt-submgr-http.ricplt:8088", + "clientEndpoint": "service-ricxapp-ueec-http.ricxapp:8080" + } + }, + "metrics": { + "url": "/ric/v1/metrics", + "namespace": "ricxapp" + }, + "subscription": { + "host": "localhost:8088", + "timeout": 2, + "retryCount": 10, + "retryDelay": 5 + }, + "faults": {}, + "measurements": [] +} + diff --git a/pkg/xapp/config.go b/pkg/xapp/config.go index 1b4b806..8d427c5 100755 --- a/pkg/xapp/config.go +++ b/pkg/xapp/config.go @@ -99,7 +99,7 @@ func LoadConfig() (l *Log) { l.Info("config file %s changed ", e.Name) updatemtypes() - Logger.SetLevel(viper.GetInt("logger.level")) + Logger.SetLevel(viper.GetInt("controls.logger.level")) if len(ConfigChangeListeners) > 0 { for _, f := range ConfigChangeListeners { go f(e.Name) @@ -126,6 +126,33 @@ func PublishConfigChange(appName, eventJson string) error { return nil } +func GetPortData(pname string) (d PortData) { + for _, v := range viper.GetStringMap("messaging")["ports"].([]interface{}) { + if n, ok := v.(map[string]interface{})["name"].(string); ok && n == pname { + d.Name = n + if p, _ := v.(map[string]interface{})["port"].(float64); ok { + d.Port = int(p) + } + if m, _ := v.(map[string]interface{})["maxSize"].(float64); ok { + d.MaxSize = int(m) + } + if m, _ := v.(map[string]interface{})["threadType"].(float64); ok { + d.ThreadType = int(m) + } + if m, _ := v.(map[string]interface{})["lowLatency"].(bool); ok { + d.LowLatency = bool(m) + } + if m, _ := v.(map[string]interface{})["fastAck"].(bool); ok { + d.FastAck = bool(m) + } + if m, _ := v.(map[string]interface{})["maxRetryOnFailure"].(float64); ok { + d.MaxRetryOnFailure = int(m) + } + } + } + return +} + func (*Configurator) SetSDLNotificationCB(appName string, sdlNotificationCb SDLNotificationCB) error { return Sdl.Subscribe(sdlNotificationCb, fmt.Sprintf("CM_UPDATE:%s", appName)) } diff --git a/pkg/xapp/db.go b/pkg/xapp/db.go index 6591e04..a7302cd 100755 --- a/pkg/xapp/db.go +++ b/pkg/xapp/db.go @@ -165,7 +165,7 @@ func (c *SDLClient) GetStat() (t SDLStatistics) { return } -func NewRNIBClient(ns string) *RNIBClient { +func NewRNIBClient() *RNIBClient { s := sdl.NewSdlInstance("e2Manager", sdl.NewDatabase()) return &RNIBClient{ db: s, diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index 8e9faef..f2d09da 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -67,7 +67,6 @@ import ( "bytes" "crypto/md5" "fmt" - "github.com/spf13/viper" "strings" "time" "unsafe" @@ -128,26 +127,22 @@ func (params *RMRParams) String() string { // //----------------------------------------------------------------------------- type RMRClientParams struct { - ProtPort string - MaxSize int - ThreadType int - StatDesc string - LowLatency bool - FastAck bool + StatDesc string + RmrData PortData } func (params *RMRClientParams) String() string { - return fmt.Sprintf("ProtPort=%s MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t", - params.ProtPort, params.MaxSize, params.ThreadType, params.StatDesc, params.LowLatency, params.FastAck) + return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t", + params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc, params.RmrData.LowLatency, params.RmrData.FastAck) } //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { - p := C.CString(params.ProtPort) - m := C.int(params.MaxSize) - c := C.int(params.ThreadType) + p := C.CString(fmt.Sprintf("%d", params.RmrData.Port)) + m := C.int(params.RmrData.MaxSize) + c := C.int(params.RmrData.ThreadType) defer C.free(unsafe.Pointer(p)) ctx := C.rmr_init(p, m, c) if ctx == nil { @@ -156,30 +151,27 @@ func NewRMRClientWithParams(params *RMRClientParams) *RMRClient { Logger.Info("new rmrClient with parameters: %s", params.String()) - if params.LowLatency { + if params.RmrData.LowLatency { C.rmr_set_low_latency(ctx) } - if params.FastAck { + if params.RmrData.FastAck { C.rmr_set_fack(ctx) } return &RMRClient{ - protPort: params.ProtPort, - context: ctx, - consumers: make([]MessageConsumer, 0), - stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc), + context: ctx, + consumers: make([]MessageConsumer, 0), + stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc), + maxRetryOnFailure: params.RmrData.MaxRetryOnFailure, } } func NewRMRClient() *RMRClient { + p := GetPortData("rmr-data") return NewRMRClientWithParams( &RMRClientParams{ - ProtPort: viper.GetString("rmr.protPort"), - MaxSize: viper.GetInt("rmr.maxSize"), - ThreadType: viper.GetInt("rmr.threadType"), - StatDesc: "RMR", - LowLatency: viper.GetBool("rmr.lowLatency"), - FastAck: viper.GetBool("rmr.fastAck"), + RmrData: p, + StatDesc: "RMR", }) } @@ -431,12 +423,11 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { } // Just quick retry seems to help for K8s issue - maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure") - if maxRetryOnFailure == 0 { - maxRetryOnFailure = 5 + if m.maxRetryOnFailure == 0 { + m.maxRetryOnFailure = 5 } - for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ { + for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ { m.contextMux.Lock() if whid != 0 { currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) @@ -582,8 +573,3 @@ func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int { Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text)) return 0 } - -// To be removed ... -func (m *RMRClient) GetStat() (r RMRStatistics) { - return -} diff --git a/pkg/xapp/subscription.go b/pkg/xapp/subscription.go index 4dcf22b..4ecc262 100755 --- a/pkg/xapp/subscription.go +++ b/pkg/xapp/subscription.go @@ -170,7 +170,7 @@ func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint st } ep, _, _ := net.SplitHostPort(clientEndpoint) - _, port, _ := net.SplitHostPort(viper.GetString("local.host")) + _, port, _ := net.SplitHostPort(fmt.Sprintf(":%d", GetPortData("http").Port)) clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl) retries := viper.GetInt("subscription.retryCount") diff --git a/pkg/xapp/types.go b/pkg/xapp/types.go index 571dcba..387d393 100755 --- a/pkg/xapp/types.go +++ b/pkg/xapp/types.go @@ -23,28 +23,19 @@ import ( "unsafe" ) -// To be removed ... -type RMRStatistics struct{} - -// -// -// type RMRClient struct { - protPort string - contextMux sync.Mutex - context unsafe.Pointer - ready int - wg sync.WaitGroup - mux sync.Mutex - stat map[string]Counter - consumers []MessageConsumer - readyCb ReadyCB - readyCbParams interface{} + contextMux sync.Mutex + context unsafe.Pointer + ready int + wg sync.WaitGroup + mux sync.Mutex + stat map[string]Counter + consumers []MessageConsumer + readyCb ReadyCB + readyCbParams interface{} + maxRetryOnFailure int } -// -// -// type RMRMeid struct { PlmnID string EnbID string @@ -70,18 +61,22 @@ func (meid *RMRMeid) String() string { return str } -// -// -// type MessageConsumerFunc func(*RMRParams) error func (fn MessageConsumerFunc) Consume(params *RMRParams) error { return fn(params) } -// -// -// type MessageConsumer interface { Consume(params *RMRParams) error } + +type PortData struct { + Name string + Port int + MaxSize int + ThreadType int + LowLatency bool + FastAck bool + MaxRetryOnFailure int +} diff --git a/pkg/xapp/xapp.go b/pkg/xapp/xapp.go index 6c7c567..5ab48ab 100755 --- a/pkg/xapp/xapp.go +++ b/pkg/xapp/xapp.go @@ -60,8 +60,8 @@ func SetReadyCB(cb ReadyCB, params interface{}) { readyCbParams = params } -func xappReadyCb(params interface{}) { - Alarm = NewAlarmClient(viper.GetString("alarm.MOId"), viper.GetString("alarm.APPId")) +func XappReadyCb(params interface{}) { + Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name")) if readyCb != nil { readyCb(readyCbParams) } @@ -71,28 +71,7 @@ func SetShutdownCB(cb ShutdownCB) { shutdownCb = cb } -func init() { - // Load xapp configuration - Logger = LoadConfig() - - Logger.SetLevel(viper.GetInt("logger.level")) - Resource = NewRouter() - Config = Configurator{} - Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router) - Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) - - if viper.IsSet("db.namespaces") { - namespaces := viper.GetStringSlice("db.namespaces") - if len(namespaces) > 0 && namespaces[0] != "" { - Sdl = NewSDLClient(viper.GetStringSlice("db.namespaces")[0]) - } - if len(namespaces) > 1 && namespaces[1] != "" { - Rnib = NewRNIBClient(viper.GetStringSlice("db.namespaces")[1]) - } - } else { - Sdl = NewSDLClient(viper.GetString("db.namespace")) - } - +func InstallSignalHandler() { // // Signal handlers to really exit program. // shutdownCb can hang until application has @@ -103,7 +82,7 @@ func init() { signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) //signal handler function go func() { - for _ = range interrupt { + for range interrupt { if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) { // close function go func() { @@ -140,11 +119,28 @@ func init() { }() } +func init() { + // Load xapp configuration + Logger = LoadConfig() + + Logger.SetLevel(viper.GetInt("controls.logger.level")) + Resource = NewRouter() + Config = Configurator{} + Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router) + Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) + Sdl = NewSDLClient(viper.GetString("db.namespace")) + Rnib = NewRNIBClient() + + InstallSignalHandler() +} + func RunWithParams(c MessageConsumer, sdlcheck bool) { Rmr = NewRMRClient() - Rmr.SetReadyCB(xappReadyCb, nil) - go http.ListenAndServe(viper.GetString("local.host"), Resource.router) - Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host"))) + Rmr.SetReadyCB(XappReadyCb, nil) + + host := fmt.Sprintf(":%d", GetPortData("http").Port) + go http.ListenAndServe(host, Resource.router) + Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host)) if sdlcheck { Sdl.TestConnection() }