bin
pkg/sdl/ut.rt
.idea/*
-
+.gitreview
port=29418
project=ric-plt/rtmgr
defaultbranch=master
-defaultremote=origin
+defaultremote=LinuxFoundation
+
+### v0.3.4
+* The following tools made available in the final docker image: iputils-ping, net-tools, curl and tcpdump
+
### v0.3.3
* Introduced delete API over REST interface for subscriptions.
args["nbi"] = flag.String("nbi", "httpRESTful", "Northbound interface module to be used. Valid values are: 'httpGetter | httpRESTful'")
args["sbi"] = flag.String("sbi", "nngpush", "Southbound interface module to be used. Valid values are: 'nngpush")
args["rpe"] = flag.String("rpe", "rmrpush", "Route Policy Engine to be used. Valid values are: 'rmrpush'")
- args["sdl"] = flag.String("sdl", "file", "Datastore enginge to be used. Valid values are: 'file'")
+ args["sdl"] = flag.String("sdl", "file", "Data store engine to be used. Valid values are: 'file'")
args["xm-url"] = flag.String("xm-url", "http://localhost:3000/xapps", "HTTP URL where xApp Manager exposes the entire xApp List")
args["nbi-if"] = flag.String("nbi-if", "http://localhost:8888", "Base HTTP URL where routing manager will be listening on")
args["sbi-if"] = flag.String("sbi-if", "0.0.0.0", "IPv4 address of interface where Southbound socket to be opened")
args["filename"] = flag.String("filename", "/db/rt.json", "Absolute path of file where the route information to be stored")
- args["loglevel"] = flag.String("loglevel", "INFO", "INFO | WARN | ERROR | DEBUG")
+ args["loglevel"] = flag.String("loglevel", "INFO", "INFO | WARN | ERROR | DEBUG | TRACE")
flag.Parse()
}
-func initRtmgr() (nbi.NbiEngine, sbi.SbiEngine, sdl.SdlEngine, rpe.RpeEngine, error) {
- var err error
- var nbii nbi.NbiEngine
- var sbii sbi.SbiEngine
- var sdli sdl.SdlEngine
- var rpei rpe.RpeEngine
- if nbii, err = nbi.GetNbi(*args["nbi"]); err == nil && nbii != nil {
- if sbii, err = sbi.GetSbi(*args["sbi"]); err == nil && sbii != nil {
- if sdli, err = sdl.GetSdl(*args["sdl"]); err == nil && sdli != nil {
- if rpei, err = rpe.GetRpe(*args["rpe"]); err == nil && rpei != nil {
- return nbii, sbii, sdli, rpei, nil
+func initRtmgr() (nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, err error) {
+ if nbiEngine, err = nbi.GetNbi(*args["nbi"]); err == nil && nbiEngine != nil {
+ if sbiEngine, err = sbi.GetSbi(*args["sbi"]); err == nil && sbiEngine != nil {
+ if sdlEngine, err = sdl.GetSdl(*args["sdl"]); err == nil && sdlEngine != nil {
+ if rpeEngine, err = rpe.GetRpe(*args["rpe"]); err == nil && rpeEngine != nil {
+ return nbiEngine, sbiEngine, sdlEngine, rpeEngine, nil
}
}
}
return nil, nil, nil, nil, err
}
-func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.SbiEngine, sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine) {
+func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) {
for {
if <-triggerSBI {
data, err := sdlEngine.ReadAll(*args["filename"])
}
}
-func serve(nbiEngine nbi.NbiEngine, sbiEngine sbi.SbiEngine, sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine) {
+func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) {
triggerSBI := make(chan bool)
for {
time.Sleep(INTERVAL * time.Second)
if *args["nbi"] == "httpGetter" {
- data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXapps(*args["xm-url"])
+ data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXApps(*args["xm-url"])
if err != nil {
rtmgr.Logger.Error("Cannot fetch xapp data due to: " + err.Error())
} else if data != nil {
- sdlEngine.WriteXapps(*args["filename"], data)
+ sdlEngine.WriteXApps(*args["filename"], data)
}
}
# By default this file is in the docker build directory,
# but the location can configured in the JJB template.
---
-tag: 0.3.3
+tag: 0.3.4
)
type HttpGetter struct {
- NbiEngine
- FetchAllXapps FetchAllXappsHandler
+ Engine
+ FetchAllXApps FetchAllXAppsHandler
}
func NewHttpGetter() *HttpGetter {
instance := new(HttpGetter)
- instance.FetchAllXapps = fetchAllXapps
+ instance.FetchAllXApps = fetchAllXApps
return instance
}
var myClient = &http.Client{Timeout: 5 * time.Second}
-func fetchAllXapps(xmurl string) (*[]rtmgr.XApp, error) {
- rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
+func fetchAllXApps(xmurl string) (*[]rtmgr.XApp, error) {
+ rtmgr.Logger.Info("Invoked httpGetter.fetchXappList: " + xmurl)
r, err := myClient.Get(xmurl)
if err != nil {
return nil, err
if r.StatusCode == 200 {
rtmgr.Logger.Debug("http client raw response: %v", r)
- var xapps []rtmgr.XApp
- err = json.NewDecoder(r.Body).Decode(&xapps)
+ var xApps []rtmgr.XApp
+ err = json.NewDecoder(r.Body).Decode(&xApps)
if err != nil {
rtmgr.Logger.Warn("Json decode failed: " + err.Error())
}
rtmgr.Logger.Info("HTTP GET: OK")
- rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
- return &xapps, err
+ rtmgr.Logger.Debug("httpGetter.fetchXappList returns: %v", xApps)
+ return &xApps, err
}
- rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
+ rtmgr.Logger.Warn("httpGetter got an unexpected http status code: %v", r.StatusCode)
return nil, nil
}
func (g *HttpGetter) Initialize(xmurl string, nbiif string, fileName string, configfile string,
- sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
+ sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
return nil
}
)
var (
- XMURL string = "http://127.0.0.1:3000/ric/v1/xapps"
+ XMURL = "http://127.0.0.1:3000/ric/v1/xapps"
)
func TestFetchXappListInvalidData(t *testing.T) {
var httpGetter = NewHttpGetter()
- _, err := httpGetter.FetchAllXapps(XMURL)
+ _, err := httpGetter.FetchAllXApps(XMURL)
if err == nil {
t.Error("No XApp data received: " + err.Error())
}
}
func TestFetchXappListWithInvalidData(t *testing.T) {
- var expected int = 0
+ var expected = 0
rtmgr.SetLogLevel("debug")
b := []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventType":"all"}`)
l, err := net.Listen("tcp", "127.0.0.1:3000")
ts.Start()
defer ts.Close()
var httpGetter = NewHttpGetter()
- xapplist, err := httpGetter.FetchAllXapps(XMURL)
+ xapplist, err := httpGetter.FetchAllXApps(XMURL)
if err == nil {
t.Error("Error occured: " + err.Error())
} else {
}
}
-func TestFetchAllXappsWithValidData(t *testing.T) {
- var expected int = 1
+func TestFetchAllXAppsWithValidData(t *testing.T) {
+ var expected = 1
b := []byte(`[
{
"name":"xapp-01","status":"unknown","version":"1.2.3",
ts.Start()
defer ts.Close()
var httpGetter = NewHttpGetter()
- xapplist, err := httpGetter.FetchAllXapps(XMURL)
+ xapplist, err := httpGetter.FetchAllXApps(XMURL)
if err != nil {
t.Error("Error occured: " + err.Error())
} else {
package nbi
+//noinspection GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference,GoUnresolvedReference
import (
"encoding/json"
"errors"
//var myClient = &http.Client{Timeout: 1 * time.Second}
type HttpRestful struct {
- NbiEngine
+ Engine
LaunchRest LaunchRestHandler
RecvXappCallbackData RecvXappCallbackDataHandler
ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
func validateXappCallbackData(callbackData *models.XappCallbackData) error {
if len(callbackData.XApps) == 0 {
- return fmt.Errorf("Invalid Data field: \"%s\"", callbackData.XApps)
+ return fmt.Errorf("invalid Data field: \"%s\"", callbackData.XApps)
}
var xapps []rtmgr.XApp
err := json.Unmarshal([]byte(callbackData.XApps), &xapps)
if err != nil {
- return fmt.Errorf("Unmarshal failed: \"%s\"", err.Error())
+ return fmt.Errorf("unmarshal failed: \"%s\"", err.Error())
}
return nil
}
}
if !subscriptionExists(data) {
- rtmgr.Logger.Warn("Subscription not found: %d", *data.SubscriptionID)
- err := fmt.Errorf("Subscription not found: %d", *data.SubscriptionID)
+ rtmgr.Logger.Warn("subscription not found: %d", *data.SubscriptionID)
+ err := fmt.Errorf("subscription not found: %d", *data.SubscriptionID)
return err
}
}
}
-func httpGetXapps(xmurl string) (*[]rtmgr.XApp, error) {
+func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
r, err := myClient.Get(xmurl)
if err != nil {
return nil, nil
}
-func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.SdlEngine) error {
+func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.Engine) error {
var readErr error
var maxRetries = 10
for i := 1; i <= maxRetries; i++ {
time.Sleep(2 * time.Second)
- xappData, err := httpGetXapps(xmurl)
+ xappData, err := httpGetXApps(xmurl)
if xappData != nil && err == nil {
pcData, confErr := rtmgr.GetPlatformComponents(configfile)
if confErr != nil {
}
rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
// Combine the xapps data and platform data before writing to the SDL
- ricData := &rtmgr.RicComponents{Xapps: *xappData, Pcs: *pcData}
+ ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData}
writeErr := sdlEngine.WriteAll(fileName, ricData)
if writeErr != nil {
rtmgr.Logger.Error(writeErr.Error())
return nil
}
} else if err == nil {
- readErr = errors.New("Unexpected HTTP status code")
+ readErr = errors.New("unexpected HTTP status code")
} else {
rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
readErr = err
}
func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
- sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
+ sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool) error {
err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
if err != nil {
- rtmgr.Logger.Error("Exiting as nbi failed to get the intial startup data from the xapp manager: " + err.Error())
+ rtmgr.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
return err
}
if err != nil {
rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
} else if data != nil {
- sdlEngine.WriteXapps(fileName, data)
+ sdlEngine.WriteXApps(fileName, data)
triggerSBI <- true
}
}
}
func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
- var b bool = false
+ var b = false
sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
for _, elem := range *subs {
if elem == sub {
func delSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
rtmgr.Logger.Debug("Deleteing the subscription from the subscriptions list")
- var present bool = false
+ var present = false
sub := rtmgr.Subscription{SubID: *xappSubData.SubscriptionID, Fqdn: *xappSubData.Address, Port: *xappSubData.Port}
for i, elem := range *subs {
if elem == sub {
"time"
)
-var BASIC_XAPPLIST = []byte(`[
+var BasicXAppLists = []byte(`[
{
"name":"xapp-01","status":"unknown","version":"1.2.3",
"instances":[
}
]`)
-var SUBSCRIPTION_RESP = []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventType":"all"}`)
+var SubscriptionResp = []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventType":"all"}`)
-var INVALID_SUB_RESP = []byte(`{"Version":0, "EventType":all}`)
+var InvalidSubResp = []byte(`{"Version":0, "EventType":all}`)
func createMockAppmgrWithData(url string, g []byte, p []byte) *httptest.Server {
l, err := net.Listen("tcp", url)
}
func createMockPlatformComponents() {
- var filename = string("config.json")
+ var filename = "config.json"
file, _ := json.MarshalIndent(stub.ValidPlatformComponents, "", "")
filestr := string(file)
filestr = "{\"PlatformComponents\":" + filestr + "}"
}
}
-func TestHttpGetXappsInvalidData(t *testing.T) {
- _, err := httpGetXapps(XMURL)
+func TestHttpGetXAppsInvalidData(t *testing.T) {
+ _, err := httpGetXApps(XMURL)
if err == nil {
t.Error("No XApp data received: " + err.Error())
}
}
-func TestHttpGetXappsWithValidData(t *testing.T) {
- var expected int = 1
- ts := createMockAppmgrWithData("127.0.0.1:3000", BASIC_XAPPLIST, nil)
+func TestHttpGetXAppsWithValidData(t *testing.T) {
+ var expected = 1
+ ts := createMockAppmgrWithData("127.0.0.1:3000", BasicXAppLists, nil)
ts.Start()
defer ts.Close()
- xapplist, err := httpGetXapps(XMURL)
+ xapplist, err := httpGetXApps(XMURL)
if err != nil {
t.Error("Error occured: " + err.Error())
} else {
}
func TestRetrieveStartupData(t *testing.T) {
- ts := createMockAppmgrWithData("127.0.0.1:3000", BASIC_XAPPLIST, SUBSCRIPTION_RESP)
+ ts := createMockAppmgrWithData("127.0.0.1:3000", BasicXAppLists, SubscriptionResp)
ts.Start()
defer ts.Close()
sdlEngine, _ := sdl.GetSdl("file")
}
func TestRetrieveStartupDataWithInvalidSubResp(t *testing.T) {
- ts := createMockAppmgrWithData("127.0.0.1:3000", BASIC_XAPPLIST, INVALID_SUB_RESP)
+ ts := createMockAppmgrWithData("127.0.0.1:3000", BasicXAppLists, InvalidSubResp)
ts.Start()
defer ts.Close()
sdlEngine, _ := sdl.GetSdl("file")
)
var (
- SupportedNbis = []*NbiEngineConfig{
- &NbiEngineConfig{
+ SupportedNbis = []*EngineConfig{
+ {
Name: "httpGetter",
Version: "v1",
Protocol: "http",
Instance: NewHttpGetter(),
IsAvailable: true,
},
- &NbiEngineConfig{
+ {
Name: "httpRESTful",
Version: "v1",
Protocol: "http",
type Nbi struct {
}
-func GetNbi(nbiName string) (NbiEngine, error) {
+func GetNbi(nbiName string) (Engine, error) {
for _, nbi := range SupportedNbis {
if nbi.Name == nbiName && nbi.IsAvailable {
return nbi.Instance, nil
}
func CreateSubReq(restUrl string, restPort string) *appmgr_model.SubscriptionRequest {
- // TODO: parametize function
+ // TODO: parameterize function
subReq := appmgr_model.SubscriptionRequest{
TargetURL: swag.String(restUrl + ":" + restPort + "/ric/v1/handles/xapp-handle/"),
EventType: swag.String("all"),
client := apiclient.New(transport, strfmt.Default)
addSubParams := operations.NewAddSubscriptionParamsWithTimeout(10 * time.Second)
// create sub req with rest url and port
- subReq := CreateSubReq(string(nbiifUrl.Scheme+"://"+nbiifUrl.Hostname()), nbiifUrl.Port())
+ subReq := CreateSubReq(nbiifUrl.Scheme+"://"+nbiifUrl.Hostname(), nbiifUrl.Port())
resp, postErr := client.Operations.AddSubscription(addSubParams.WithSubscriptionRequest(subReq))
if postErr != nil {
rtmgr.Logger.Error("POST unsuccessful:" + postErr.Error())
*/
/*
Mnemonic: nbi.go
- Abstract: Containes NBI (NorthBound Interface) specific types
+ Abstract: Contains NBI (NorthBound Interface) specific types
Date: 12 March 2019
*/
"routing-manager/pkg/sdl"
)
-type FetchAllXappsHandler func(string) (*[]rtmgr.XApp, error)
+type FetchAllXAppsHandler func(string) (*[]rtmgr.XApp, error)
type RecvXappCallbackDataHandler func(<-chan *models.XappCallbackData) (*[]rtmgr.XApp, error)
type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData, chan<- *models.XappSubscriptionData)
type ProvideXappHandleHandlerImpl func(chan<- *models.XappCallbackData, *models.XappCallbackData) error
-type RetrieveStartupDataHandler func(string, string, string, string, sdl.SdlEngine) error
+type RetrieveStartupDataHandler func(string, string, string, string, sdl.Engine) error
-type NbiEngineConfig struct {
+type EngineConfig struct {
Name string
Version string
Protocol string
- Instance NbiEngine
+ Instance Engine
IsAvailable bool
}
-type NbiEngine interface {
- Initialize(string, string, string, string, sdl.SdlEngine, rpe.RpeEngine, chan<- bool) error
+type Engine interface {
+ Initialize(string, string, string, string, sdl.Engine, rpe.Engine, chan<- bool) error
Terminate() error
}
func resetTestDataset(testdata []rtmgr.Endpoint) {
rtmgr.Eps = make(map[string]*rtmgr.Endpoint)
- for _, endpoint := range stub.ValidEndpoints {
+ for _, endpoint := range testdata {
ep := endpoint
rtmgr.Eps[ep.Uuid] = &ep
}
)
var (
- SupportedRpes = []*RpeEngineConfig{
- &RpeEngineConfig{
+ SupportedRpes = []*EngineConfig{
+ {
Name: "rmrpush",
Version: "pubsush",
Protocol: "rmruta",
}
)
-func GetRpe(rpeName string) (RpeEngine, error) {
+func GetRpe(rpeName string) (Engine, error) {
for _, rpe := range SupportedRpes {
if rpe.Name == rpeName && rpe.IsAvailable {
return rpe.Instance, nil
return nil
}
-func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32) {
txList := rtmgr.EndpointList{*tx}
rxList := []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
- messageId := rtmgr.MESSAGETYPES[messageType]
+ messageId := rtmgr.MessageTypes[messageType]
route := rtmgr.RouteTableEntry{
- messageId,
- txList,
- rxList,
- -1}
+ MessageType: messageId,
+ TxList: txList,
+ RxGroups: rxList,
+ SubID: subId}
*routeTable = append(*routeTable, route)
- rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: -1", messageId, txList, rxList)
+ rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
+ rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
}
-func (r *Rpe) addSubscriptionRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoint, routeTable *rtmgr.RouteTable, subId int32) {
- txList := rtmgr.EndpointList{*tx}
- rxList := []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
- messageId := rtmgr.MESSAGETYPES[messageType]
- route := rtmgr.RouteTableEntry{
- messageId,
- txList,
- rxList,
- subId,
- }
- *routeTable = append(*routeTable, route)
- rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, txList, rxList, subId)
-}
-
-func (r *Rpe) generateXappRoutes(e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
rtmgr.Logger.Debug("rpe.generateXappRoutes invoked")
- endPointList := rtmgr.Eps
- for _, endPoint := range endPointList {
- rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
- if endPoint.XAppType != sbi.PLATFORMTYPE && len(endPoint.TxMessages) > 0 && len(endPoint.RxMessages) > 0 {
- //xApp -> Subscription Manager
- r.addRoute("RIC_SUB_REQ", endPoint, subManEp, routeTable)
- r.addRoute("RIC_SUB_DEL_REQ", endPoint, subManEp, routeTable)
- //xApp -> E2 Termination
- r.addRoute("RIC_CONTROL_REQ", endPoint, e2TermEp, routeTable)
- }
+ rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
+ if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.TxMessages) > 0 && len(xAppEp.RxMessages) > 0 {
+ //xApp -> Subscription Manager
+ r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1)
+ r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1)
+ //xApp -> E2 Termination
+ r.addRoute("RIC_CONTROL_REQ", xAppEp, e2TermEp, routeTable, -1)
}
}
func (r *Rpe) generateSubscriptionRoutes(e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
rtmgr.Logger.Debug("rpe.addSubscriptionRoutes invoked")
- subscriptionList := rtmgr.Subs
- for _, subscription := range subscriptionList {
+ subscriptionList := &rtmgr.Subs
+ for _, subscription := range *subscriptionList {
rtmgr.Logger.Debug("Subscription: %v", subscription)
xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port))
rtmgr.Logger.Debug("xApp UUID: %v", xAppUuid)
xAppEp := getEndpointByUuid(xAppUuid)
//Subscription Manager -> xApp
- r.addSubscriptionRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
- r.addSubscriptionRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
- r.addSubscriptionRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
- r.addSubscriptionRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
//E2 Termination -> xApp
- r.addSubscriptionRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID)
- r.addSubscriptionRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID)
- r.addSubscriptionRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID)
}
}
rtmgr.Logger.Debug("rpe.generatePlatformRoutes invoked")
//Platform Routes --- Subscription Routes
//Subscription Manager -> E2 Termination
- r.addRoute("RIC_SUB_REQ", subManEp, e2TermEp, routeTable)
- r.addRoute("RIC_SUB_DEL_REQ", subManEp, e2TermEp, routeTable)
+ r.addRoute("RIC_SUB_REQ", subManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_SUB_DEL_REQ", subManEp, e2TermEp, routeTable, -1)
//E2 Termination -> Subscription Manager
- r.addRoute("RIC_SUB_RESP", e2TermEp, subManEp, routeTable)
- r.addRoute("RIC_SUB_DEL_RESP", e2TermEp, subManEp, routeTable)
- r.addRoute("RIC_SUB_FAILURE", e2TermEp, subManEp, routeTable)
- r.addRoute("RIC_SUB_DEL_FAILURE", e2TermEp, subManEp, routeTable)
+ r.addRoute("RIC_SUB_RESP", e2TermEp, subManEp, routeTable, -1)
+ r.addRoute("RIC_SUB_DEL_RESP", e2TermEp, subManEp, routeTable, -1)
+ r.addRoute("RIC_SUB_FAILURE", e2TermEp, subManEp, routeTable, -1)
+ r.addRoute("RIC_SUB_DEL_FAILURE", e2TermEp, subManEp, routeTable, -1)
//TODO: UE Man Routes removed (since it is not existing)
//UE Manager -> Subscription Manager
//r.addRoute("RIC_SUB_REQ", ueManEp, subManEp, routeTable)
//Platform Routes --- X2 Routes
//E2 Manager -> E2 Termination
- r.addRoute("RIC_X2_SETUP_REQ", e2ManEp, e2TermEp, routeTable)
- r.addRoute("RIC_X2_SETUP_RESP", e2ManEp, e2TermEp, routeTable)
- r.addRoute("RIC_X2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable)
- r.addRoute("RIC_X2_RESET_RESP", e2ManEp, e2TermEp, routeTable)
- r.addRoute("RIC_ENDC_X2_SETUP_REQ", e2ManEp, e2TermEp, routeTable)
- r.addRoute("RIC_ENDC_X2_SETUP_RESP", e2ManEp, e2TermEp, routeTable)
- r.addRoute("RIC_ENDC_X2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable)
+ r.addRoute("RIC_X2_SETUP_REQ", e2ManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_X2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_X2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_X2_RESET_RESP", e2ManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_ENDC_X2_SETUP_REQ", e2ManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_ENDC_X2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_ENDC_X2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1)
//E2 Termination -> E2 Manager
- r.addRoute("RIC_X2_SETUP_REQ", e2TermEp, e2ManEp, routeTable)
- r.addRoute("RIC_X2_SETUP_RESP", e2TermEp, e2ManEp, routeTable)
- r.addRoute("RIC_X2_RESET", e2TermEp, e2ManEp, routeTable)
- r.addRoute("RIC_X2_RESOURCE_STATUS_RESPONSE", e2TermEp, e2ManEp, routeTable)
- r.addRoute("RIC_X2_RESET_RESP", e2TermEp, e2ManEp, routeTable)
- r.addRoute("RIC_ENDC_X2_SETUP_REQ", e2ManEp, e2TermEp, routeTable)
- r.addRoute("RIC_ENDC_X2_SETUP_RESP", e2ManEp, e2TermEp, routeTable)
- r.addRoute("RIC_ENDC_X2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable)
+ r.addRoute("RIC_X2_SETUP_REQ", e2TermEp, e2ManEp, routeTable, -1)
+ r.addRoute("RIC_X2_SETUP_RESP", e2TermEp, e2ManEp, routeTable, -1)
+ r.addRoute("RIC_X2_RESET", e2TermEp, e2ManEp, routeTable, -1)
+ r.addRoute("RIC_X2_RESOURCE_STATUS_RESPONSE", e2TermEp, e2ManEp, routeTable, -1)
+ r.addRoute("RIC_X2_RESET_RESP", e2TermEp, e2ManEp, routeTable, -1)
+ r.addRoute("RIC_ENDC_X2_SETUP_REQ", e2ManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_ENDC_X2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1)
+ r.addRoute("RIC_ENDC_X2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1)
}
func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
for _, endPoint := range endPointList {
rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
- if endPoint.XAppType != sbi.PLATFORMTYPE && len(endPoint.TxMessages) > 0 && len(endPoint.RxMessages) > 0 {
- r.generateXappRoutes(e2TermEp, subManEp, routeTable)
+ if endPoint.XAppType != sbi.PlatformType && len(endPoint.TxMessages) > 0 && len(endPoint.RxMessages) > 0 {
+ r.generateXappRoutes(endPoint, e2TermEp, subManEp, routeTable)
r.generateSubscriptionRoutes(e2TermEp, subManEp, routeTable)
}
}
import "routing-manager/pkg/rtmgr"
-type generatePolicies func(rtmgr.Endpoints) *[]string
-type generateRouteTable func(rtmgr.Endpoints) *rtmgr.RouteTable
+//type generatePolicies func(rtmgr.Endpoints) *[]string
+//type generateRouteTable func(rtmgr.Endpoints) *rtmgr.RouteTable
-type RpeEngineConfig struct {
+type EngineConfig struct {
Name string
Version string
Protocol string
- Instance RpeEngine
+ Instance Engine
IsAvailable bool
}
-type RpeEngine interface {
+type Engine interface {
GeneratePolicies(rtmgr.Endpoints) *[]string
GenerateRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable
}
*/
/*
Mnemonic: rtmgr/rtmgr.go
- Abstract: Containes RTMGR (Routing Manager) module's generic variables and functions
+ Abstract: Contains RTMGR (Routing Manager) module's generic variables and functions
Date: 26 March 2019
*/
var (
//TODO: temporary solution
- // CamelCase Message Types are for being able to test with old fashioned admin controll xApps
- // TODO: Add a seperate message definition file (Not using the one from RMR to not create dependency on that library).
- MESSAGETYPES = map[string]string{
+ // CamelCase Message Types are for being able to test with old fashioned admin control xApps
+ // TODO: Add a separate message definition file (Not using the one from RMR to not create dependency on that library).
+ MessageTypes = map[string]string{
"HandoverPreparation": "0",
"HandoverCancel": "1",
"LoadIndication": "2",
Logger.Level(lumber.ERROR)
return nil
case "DEBUG":
- Logger.Info("debugmode")
+ Logger.Info("Debug mode")
Logger.Level(lumber.DEBUG)
return nil
+ case "TRACE":
+ Logger.Info("Trace mode")
+ Logger.Level(lumber.TRACE)
+ return nil
default:
- Logger.Error("Invalid log mode, setting info")
+ Logger.Error("invalid log mode, setting info")
Logger.Level(lumber.INFO)
- return errors.New("Invalid log level, setting info")
+ return errors.New("invalid log level, setting info")
}
}
func GetPlatformComponents(configfile string) (*PlatformComponents, error) {
Logger.Debug("Invoked rtmgr.GetPlatformComponents(" + configfile + ")")
- var rcfg RtmgrConfig
+ var rcfg ConfigRtmgr
jsonFile, err := os.Open(configfile)
if err != nil {
return nil, errors.New("cannot open the file due to: " + err.Error())
)
func TestSetLogLevel(t *testing.T) {
- modes_ok := []string{"info", "warn", "debug", "error"}
- modes_nok := []string{"inValId", "LogLEVEL", "PRoviDeD"}
- for _, value := range modes_ok {
+ modeIsOk := []string{"info", "warn", "debug", "error"}
+ modeOsNok := []string{"inValId", "LogLEVEL", "Provided"}
+ for _, value := range modeIsOk {
if SetLogLevel(value) != nil {
t.Error("Invalid log level: " + value)
}
}
- for _, value := range modes_nok {
+ for _, value := range modeOsNok {
if SetLogLevel(value) == nil {
t.Error("Invalid log level: " + value)
}
package rtmgr
type XApps struct {
- XApplist []XApp
+ XAppList []XApp
}
type RouteTable []RouteTableEntry
Port uint16 `json:"port"`
}
-type RtmgrConfig struct {
+type ConfigRtmgr struct {
Pcs PlatformComponents `json:"PlatformComponents"`
}
type RicComponents struct {
- Xapps []XApp
+ XApps []XApp
Pcs PlatformComponents
}
func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
for _, ep := range rtmgr.Eps {
- uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+ uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
if uri == pipe.Address() {
switch event {
case 1:
var err error
var socket NngSocket
rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
- rtmgr.Logger.Debug("args: %v", (*ep))
+ rtmgr.Logger.Debug("args: %v", *ep)
socket, err = c.NewSocket()
if err != nil {
return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
- rtmgr.Logger.Debug("args: %v", (*ep))
+ rtmgr.Logger.Debug("args: %v", *ep)
if err := ep.Socket.(NngSocket).Close(); err != nil {
return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
}
*/
func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
- uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+ uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
options := make(map[string]interface{})
options[mangos.OptionDialAsynch] = true
if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
func (c *NngPush) DistributeAll(policies *[]string) error {
rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
- rtmgr.Logger.Debug("args: %v", (*policies))
+ rtmgr.Logger.Debug("args: %v", *policies)
for _, ep := range rtmgr.Eps {
if ep.IsReady {
go c.send(ep, policies)
rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
}
}
- rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+ rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
}
}
/*
-nngpush.UpdateEndpoints() is testd against stub.ValidXapps dataset
+nngpush.UpdateEndpoints() is testd against stub.ValidXApps dataset
*/
func TestNngPushUpdateEndpoints(t *testing.T) {
var nngpush = NngPush{}
"strconv"
)
-const DEFAULT_NNG_PIPELINE_SOCKET_PREFIX = "tcp://"
-const DEFAULT_NNG_PIPELINE_SOCKET_NUMBER = 4561
-const PLATFORMTYPE = "platform"
+const DefaultNngPipelineSocketPrefix = "tcp://"
+const DefaultNngPipelineSocketNumber = 4561
+const PlatformType = "platform"
var (
- SupportedSbis = []*SbiEngineConfig{
- &SbiEngineConfig{
+ SupportedSbis = []*EngineConfig{
+ {
Name: "nngpush",
Version: "v1",
Protocol: "nngpipeline",
}
)
-func GetSbi(sbiName string) (SbiEngine, error) {
+func GetSbi(sbiName string) (Engine, error) {
for _, sbi := range SupportedSbis {
if sbi.Name == sbiName && sbi.IsAvailable {
return sbi.Instance, nil
type Sbi struct {
}
-func (s *Sbi) pruneEndpointList(sbi SbiEngine) {
+func (s *Sbi) pruneEndpointList(sbi Engine) {
for _, ep := range rtmgr.Eps {
if !ep.Keepalive {
rtmgr.Logger.Debug("deleting %v", ep)
}
}
-func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbii SbiEngine) {
- for _, xapp := range (*rcs).Xapps {
+func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbi Engine) {
+ for _, xapp := range (*rcs).XApps {
for _, instance := range xapp.Instances {
uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
if _, ok := rtmgr.Eps[uuid]; ok {
rtmgr.Eps[uuid].Keepalive = true
} else {
ep := &rtmgr.Endpoint{
- uuid,
- instance.Name,
- xapp.Name,
- instance.Ip,
- instance.Port,
- instance.TxMessages,
- instance.RxMessages,
- nil,
- false,
- true,
+ Uuid: uuid,
+ Name: instance.Name,
+ XAppType: xapp.Name,
+ Ip: instance.Ip,
+ Port: instance.Port,
+ TxMessages: instance.TxMessages,
+ RxMessages: instance.RxMessages,
+ Socket: nil,
+ IsReady: false,
+ Keepalive: true,
}
- if err := sbii.AddEndpoint(ep); err != nil {
+ if err := sbi.AddEndpoint(ep); err != nil {
rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
continue
}
}
}
}
- s.updatePlatformEndpoints(&((*rcs).Pcs), sbii)
- s.pruneEndpointList(sbii)
+ s.updatePlatformEndpoints(&((*rcs).Pcs), sbi)
+ s.pruneEndpointList(sbi)
}
-func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbii SbiEngine) {
+func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) {
rtmgr.Logger.Debug("updatePlatformEndpoints invoked. PCS: %v", *pcs)
for _, pc := range *pcs {
uuid := pc.Fqdn + ":" + strconv.Itoa(int(pc.Port))
rtmgr.Eps[uuid].Keepalive = true
} else {
ep := &rtmgr.Endpoint{
- uuid,
- pc.Name,
- PLATFORMTYPE,
- pc.Fqdn,
- pc.Port,
- rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
- rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
- nil,
- false,
- true,
+ Uuid: uuid,
+ Name: pc.Name,
+ XAppType: PlatformType,
+ Ip: pc.Fqdn,
+ Port: pc.Port,
+ TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
+ RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
+ Socket: nil,
+ IsReady: false,
+ Keepalive: true,
}
rtmgr.Logger.Debug("ep created: %v", ep)
- if err := sbii.AddEndpoint(ep); err != nil {
+ if err := sbi.AddEndpoint(ep); err != nil {
rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
continue
}
import "routing-manager/pkg/rtmgr"
-type SbiEngineConfig struct {
+type EngineConfig struct {
Name string
Version string
Protocol string
- Instance SbiEngine
+ Instance Engine
IsAvailable bool
}
-type SbiEngine interface {
+type Engine interface {
Initialize(string) error
Terminate() error
DistributeAll(*[]string) error
return nil, errors.New("cannot open the file due to: " + err.Error())
}
defer jsonFile.Close()
+
byteValue, err := ioutil.ReadAll(jsonFile)
if err != nil {
return nil, errors.New("cannot read the file due to: " + err.Error())
func (f *File) WriteAll(file string, rcs *rtmgr.RicComponents) error {
rtmgr.Logger.Debug("Invoked sdl.WriteAll")
rtmgr.Logger.Debug("file.fileWriteAll writes into file: " + file)
- rtmgr.Logger.Debug("file.fileWriteAll writes data: %v", (*rcs))
+ rtmgr.Logger.Debug("file.fileWriteAll writes data: %v", *rcs)
byteValue, err := json.Marshal(rcs)
if err != nil {
return errors.New("cannot convert data due to: " + err.Error())
return nil
}
-func (f *File) WriteXapps(file string, xapps *[]rtmgr.XApp) error {
- rtmgr.Logger.Debug("Invoked sdl.WriteXapps")
- rtmgr.Logger.Debug("file.fileWriteXapps writes into file: " + file)
- rtmgr.Logger.Debug("file.fileWriteXapps writes data: %v", (*xapps))
+func (f *File) WriteXApps(file string, xApps *[]rtmgr.XApp) error {
+ rtmgr.Logger.Debug("Invoked sdl.WriteXApps")
+ rtmgr.Logger.Debug("file.fileWriteXApps writes into file: " + file)
+ rtmgr.Logger.Debug("file.fileWriteXApps writes data: %v", *xApps)
ricData, err := NewFile().ReadAll(file)
- if err != nil || ricData == nil {
+ if err != nil {
rtmgr.Logger.Error("cannot get data from sdl interface due to: " + err.Error())
- return errors.New("cannot read full ric data to modify xapps data, due to: " + err.Error())
+ return errors.New("cannot read full ric data to modify xApps data, due to: " + err.Error())
}
-
- ricData.Xapps = *xapps
+ ricData.XApps = *xApps
byteValue, err := json.Marshal(ricData)
if err != nil {
)
var (
- SupportedSdls = []*SdlEngineConfig{
- &SdlEngineConfig{
+ SupportedSdls = []*EngineConfig{
+ {
Name: "file",
Version: "v1",
Protocol: "rawfile",
Instance: NewFile(),
IsAvailable: true,
},
- &SdlEngineConfig{
+ {
Name: "redis",
Version: "v1",
Protocol: "ndsl",
}
)
-func GetSdl(sdlName string) (SdlEngine, error) {
+func GetSdl(sdlName string) (Engine, error) {
for _, sdl := range SupportedSdls {
if sdl.Name == sdlName && sdl.IsAvailable {
return sdl.Instance, nil
==================================================================================
*/
/*
- Mnemonic: nngpub_test.go
+ Mnemonic: sbi_test.go
Abstract:
Date: 25 April 2019
*/
*/
/*
Mnemonic: sdl/types.go
- Abstract: Containes SDL (Shared Data Layer) specific types
+ Abstract: Contains SDL (Shared Data Layer) specific types
Date: 16 March 2019
*/
package sdl
import "routing-manager/pkg/rtmgr"
-type readAll func(string) (*rtmgr.RicComponents, error)
-type writeAll func(string, *rtmgr.RicComponents) error
+//type readAll func(string) (*rtmgr.RicComponents, error)
+//type writeAll func(string, *rtmgr.RicComponents) error
-type SdlEngineConfig struct {
+type EngineConfig struct {
Name string
Version string
Protocol string
- Instance SdlEngine
+ Instance Engine
IsAvailable bool
}
-type SdlEngine interface {
+type Engine interface {
ReadAll(string) (*rtmgr.RicComponents, error)
WriteAll(string, *rtmgr.RicComponents) error
- WriteXapps(string, *[]rtmgr.XApp) error
+ WriteXApps(string, *[]rtmgr.XApp) error
}
import "routing-manager/pkg/rtmgr"
-var ValidXapps = &[]rtmgr.XApp{
+var ValidXApps = &[]rtmgr.XApp{
{Name: "app1", Status: "", Version: "", Instances: []rtmgr.XAppInstance{{Name: "E2TERM", Status: "unknown", Ip: "10.0.0.1", Port: 0, TxMessages: []string{"HandoverPreparation", "HandoverCancel"}, RxMessages: []string{"HandoverPreparation", "HandoverCancel"}}}},
{Name: "app2", Status: "", Version: "", Instances: []rtmgr.XAppInstance{{Name: "SUBMAN", Status: "unknown", Ip: "192.168.0.1", Port: 0, TxMessages: []string{"HandoverCancel", "HandoverPreparation"}, RxMessages: []string{"HandoverPreparation", "HandoverCancel"}}}},
{Name: "app3", Status: "", Version: "", Instances: []rtmgr.XAppInstance{{Name: "E2MAN", Status: "unknown", Ip: "10.1.1.1", Port: 0, TxMessages: []string{"X2Setup"}, RxMessages: []string{"Reset", "UEContextRelease"}}}},
}
var ValidRicComponents = rtmgr.RicComponents{
- *ValidXapps, *ValidPlatformComponents,
+ XApps: *ValidXApps, Pcs: *ValidPlatformComponents,
}
var ValidPolicies = &[]string{"", ""}