XAPP_NAME:=xapp
XAPP_ROOT:=test
-XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.yaml"
+XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.json"
include build/make.go.mk
XAPP_NAME:=xapp
XAPP_ROOT:=pkg
-XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.yaml"
+XAPP_TESTENV:="RMR_SEED_RT=config/uta_rtg.rt CFG_FILE=$(ROOT_DIR)config/config-file.json"
include build/make.go.mk
--- /dev/null
+{
+ "name": "xapp",
+ "version": "0.7.0",
+ "vendor": "Nokia",
+ "moId": "SEP",
+ "containers": [
+ {
+ "name": "ueec",
+ "image": {
+ "registry": "ranco-dev-tools.eastus.cloudapp.azure.com:10001",
+ "name": "ueec-xapp",
+ "tag": "0.5.3"
+ }
+ }
+ ],
+ "livenessProbe": {
+ "httpGet": {
+ "path": "ric/v1/health/alive",
+ "port": 8080
+ },
+ "initialDelaySeconds": 5,
+ "periodSeconds": 15
+ },
+ "readinessProbe": {
+ "httpGet": {
+ "path": "ric/v1/health/ready",
+ "port": 8080
+ },
+ "initialDelaySeconds": 5,
+ "periodSeconds": 15
+ },
+ "messaging": {
+ "ports": [
+ {
+ "name": "http",
+ "container": "ueec",
+ "port": 8086,
+ "description": "http service"
+ },
+ {
+ "name": "rmr-route",
+ "container": "ueec",
+ "port": 4561,
+ "description": "rmr route port for ueec"
+ },
+ {
+ "name": "rmr-data",
+ "container": "ueec",
+ "port": 4560,
+ "maxSize": 2072,
+ "threadType": 0,
+ "lowLatency": false,
+ "maxRetryOnFailure": 5,
+ "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE"],
+ "txMessages": ["RIC_SGNB_ADDITION_REQ", "RIC_SGNB_ADDITION_ACK"],
+ "policies": [],
+ "description": "rmr data port for ueec"
+ }
+ ]
+ },
+ "controls": {
+ "logger": {
+ "level": 3
+ },
+ "subscription": {
+ "subscriptionActive": true,
+ "functionId": 1,
+ "plmnId": "310150",
+ "eNBId": "202251",
+ "timeout": 5,
+ "host": "service-ricplt-submgr-http.ricplt:8088",
+ "clientEndpoint": "service-ricxapp-ueec-http.ricxapp:8080"
+ }
+ },
+ "metrics": {
+ "url": "/ric/v1/metrics",
+ "namespace": "ricxapp"
+ },
+ "subscription": {
+ "host": "localhost:8088",
+ "timeout": 2,
+ "retryCount": 10,
+ "retryDelay": 5
+ },
+ "faults": {},
+ "measurements": []
+}
+
l.Info("config file %s changed ", e.Name)
updatemtypes()
- Logger.SetLevel(viper.GetInt("logger.level"))
+ Logger.SetLevel(viper.GetInt("controls.logger.level"))
if len(ConfigChangeListeners) > 0 {
for _, f := range ConfigChangeListeners {
go f(e.Name)
return nil
}
+func GetPortData(pname string) (d PortData) {
+ for _, v := range viper.GetStringMap("messaging")["ports"].([]interface{}) {
+ if n, ok := v.(map[string]interface{})["name"].(string); ok && n == pname {
+ d.Name = n
+ if p, _ := v.(map[string]interface{})["port"].(float64); ok {
+ d.Port = int(p)
+ }
+ if m, _ := v.(map[string]interface{})["maxSize"].(float64); ok {
+ d.MaxSize = int(m)
+ }
+ if m, _ := v.(map[string]interface{})["threadType"].(float64); ok {
+ d.ThreadType = int(m)
+ }
+ if m, _ := v.(map[string]interface{})["lowLatency"].(bool); ok {
+ d.LowLatency = bool(m)
+ }
+ if m, _ := v.(map[string]interface{})["fastAck"].(bool); ok {
+ d.FastAck = bool(m)
+ }
+ if m, _ := v.(map[string]interface{})["maxRetryOnFailure"].(float64); ok {
+ d.MaxRetryOnFailure = int(m)
+ }
+ }
+ }
+ return
+}
+
func (*Configurator) SetSDLNotificationCB(appName string, sdlNotificationCb SDLNotificationCB) error {
return Sdl.Subscribe(sdlNotificationCb, fmt.Sprintf("CM_UPDATE:%s", appName))
}
return
}
-func NewRNIBClient(ns string) *RNIBClient {
+func NewRNIBClient() *RNIBClient {
s := sdl.NewSdlInstance("e2Manager", sdl.NewDatabase())
return &RNIBClient{
db: s,
"bytes"
"crypto/md5"
"fmt"
- "github.com/spf13/viper"
"strings"
"time"
"unsafe"
//
//-----------------------------------------------------------------------------
type RMRClientParams struct {
- ProtPort string
- MaxSize int
- ThreadType int
- StatDesc string
- LowLatency bool
- FastAck bool
+ StatDesc string
+ RmrData PortData
}
func (params *RMRClientParams) String() string {
- return fmt.Sprintf("ProtPort=%s MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
- params.ProtPort, params.MaxSize, params.ThreadType, params.StatDesc, params.LowLatency, params.FastAck)
+ return fmt.Sprintf("ProtPort=%d MaxSize=%d ThreadType=%d StatDesc=%s LowLatency=%t FastAck=%t",
+ params.RmrData.Port, params.RmrData.MaxSize, params.RmrData.ThreadType, params.StatDesc, params.RmrData.LowLatency, params.RmrData.FastAck)
}
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
func NewRMRClientWithParams(params *RMRClientParams) *RMRClient {
- p := C.CString(params.ProtPort)
- m := C.int(params.MaxSize)
- c := C.int(params.ThreadType)
+ p := C.CString(fmt.Sprintf("%d", params.RmrData.Port))
+ m := C.int(params.RmrData.MaxSize)
+ c := C.int(params.RmrData.ThreadType)
defer C.free(unsafe.Pointer(p))
ctx := C.rmr_init(p, m, c)
if ctx == nil {
Logger.Info("new rmrClient with parameters: %s", params.String())
- if params.LowLatency {
+ if params.RmrData.LowLatency {
C.rmr_set_low_latency(ctx)
}
- if params.FastAck {
+ if params.RmrData.FastAck {
C.rmr_set_fack(ctx)
}
return &RMRClient{
- protPort: params.ProtPort,
- context: ctx,
- consumers: make([]MessageConsumer, 0),
- stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
+ context: ctx,
+ consumers: make([]MessageConsumer, 0),
+ stat: Metric.RegisterCounterGroup(RMRCounterOpts, params.StatDesc),
+ maxRetryOnFailure: params.RmrData.MaxRetryOnFailure,
}
}
func NewRMRClient() *RMRClient {
+ p := GetPortData("rmr-data")
return NewRMRClientWithParams(
&RMRClientParams{
- ProtPort: viper.GetString("rmr.protPort"),
- MaxSize: viper.GetInt("rmr.maxSize"),
- ThreadType: viper.GetInt("rmr.threadType"),
- StatDesc: "RMR",
- LowLatency: viper.GetBool("rmr.lowLatency"),
- FastAck: viper.GetBool("rmr.fastAck"),
+ RmrData: p,
+ StatDesc: "RMR",
})
}
}
// Just quick retry seems to help for K8s issue
- maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
- if maxRetryOnFailure == 0 {
- maxRetryOnFailure = 5
+ if m.maxRetryOnFailure == 0 {
+ m.maxRetryOnFailure = 5
}
- for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
+ for j := 0; j < m.maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
m.contextMux.Lock()
if whid != 0 {
currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
Logger.Debug(fmt.Sprintf("rmrClient: %s -> mbuf nil", text))
return 0
}
-
-// To be removed ...
-func (m *RMRClient) GetStat() (r RMRStatistics) {
- return
-}
}
ep, _, _ := net.SplitHostPort(clientEndpoint)
- _, port, _ := net.SplitHostPort(viper.GetString("local.host"))
+ _, port, _ := net.SplitHostPort(fmt.Sprintf(":%d", GetPortData("http").Port))
clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
retries := viper.GetInt("subscription.retryCount")
"unsafe"
)
-// To be removed ...
-type RMRStatistics struct{}
-
-//
-//
-//
type RMRClient struct {
- protPort string
- contextMux sync.Mutex
- context unsafe.Pointer
- ready int
- wg sync.WaitGroup
- mux sync.Mutex
- stat map[string]Counter
- consumers []MessageConsumer
- readyCb ReadyCB
- readyCbParams interface{}
+ contextMux sync.Mutex
+ context unsafe.Pointer
+ ready int
+ wg sync.WaitGroup
+ mux sync.Mutex
+ stat map[string]Counter
+ consumers []MessageConsumer
+ readyCb ReadyCB
+ readyCbParams interface{}
+ maxRetryOnFailure int
}
-//
-//
-//
type RMRMeid struct {
PlmnID string
EnbID string
return str
}
-//
-//
-//
type MessageConsumerFunc func(*RMRParams) error
func (fn MessageConsumerFunc) Consume(params *RMRParams) error {
return fn(params)
}
-//
-//
-//
type MessageConsumer interface {
Consume(params *RMRParams) error
}
+
+type PortData struct {
+ Name string
+ Port int
+ MaxSize int
+ ThreadType int
+ LowLatency bool
+ FastAck bool
+ MaxRetryOnFailure int
+}
readyCbParams = params
}
-func xappReadyCb(params interface{}) {
- Alarm = NewAlarmClient(viper.GetString("alarm.MOId"), viper.GetString("alarm.APPId"))
+func XappReadyCb(params interface{}) {
+ Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
if readyCb != nil {
readyCb(readyCbParams)
}
shutdownCb = cb
}
-func init() {
- // Load xapp configuration
- Logger = LoadConfig()
-
- Logger.SetLevel(viper.GetInt("logger.level"))
- Resource = NewRouter()
- Config = Configurator{}
- Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
- Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
-
- if viper.IsSet("db.namespaces") {
- namespaces := viper.GetStringSlice("db.namespaces")
- if len(namespaces) > 0 && namespaces[0] != "" {
- Sdl = NewSDLClient(viper.GetStringSlice("db.namespaces")[0])
- }
- if len(namespaces) > 1 && namespaces[1] != "" {
- Rnib = NewRNIBClient(viper.GetStringSlice("db.namespaces")[1])
- }
- } else {
- Sdl = NewSDLClient(viper.GetString("db.namespace"))
- }
-
+func InstallSignalHandler() {
//
// Signal handlers to really exit program.
// shutdownCb can hang until application has
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
//signal handler function
go func() {
- for _ = range interrupt {
+ for range interrupt {
if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
// close function
go func() {
}()
}
+func init() {
+ // Load xapp configuration
+ Logger = LoadConfig()
+
+ Logger.SetLevel(viper.GetInt("controls.logger.level"))
+ Resource = NewRouter()
+ Config = Configurator{}
+ Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
+ Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
+ Sdl = NewSDLClient(viper.GetString("db.namespace"))
+ Rnib = NewRNIBClient()
+
+ InstallSignalHandler()
+}
+
func RunWithParams(c MessageConsumer, sdlcheck bool) {
Rmr = NewRMRClient()
- Rmr.SetReadyCB(xappReadyCb, nil)
- go http.ListenAndServe(viper.GetString("local.host"), Resource.router)
- Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", viper.GetString("local.host")))
+ Rmr.SetReadyCB(XappReadyCb, nil)
+
+ host := fmt.Sprintf(":%d", GetPortData("http").Port)
+ go http.ListenAndServe(host, Resource.router)
+ Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
if sdlcheck {
Sdl.TestConnection()
}