From: prabhukaliswamy Date: Tue, 3 Dec 2019 15:06:30 +0000 (+0000) Subject: Fixed integration and subscription routes related issues for R3 X-Git-Tag: 0.4.9~10 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=b47d12de30e72b6e1017c93ca487279b09038e00;p=ric-plt%2Frtmgr.git Fixed integration and subscription routes related issues for R3 Change-Id: I8f89ad2524b6dbe33f088f1685d9b3d5820b7e54 Signed-off-by: prabhukaliswamy --- diff --git a/RELNOTES b/RELNOTES index 92fde07..2d57a92 100644 --- a/RELNOTES +++ b/RELNOTES @@ -1,3 +1,6 @@ +### v0.4.1 +* Fixed integration and subscription routes related issues for R3 + ### v0.3.9 * Added RSM platform component routes for message types RAN_CONNECTED, RAN_RESTARTED, RAN_RECONFIGURED, RIC_RES_STATUS_REQ,RIC_RES_STATUS_RESP and RIC_RES_STATUS_FAILURE * xApp manager interface changes for Subscription Request API diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go index 101c743..568b7ad 100644 --- a/cmd/rtmgr.go +++ b/cmd/rtmgr.go @@ -47,7 +47,7 @@ import ( ) const SERVICENAME = "rtmgr" -const INTERVAL time.Duration = 2 +const INTERVAL time.Duration = 60 var ( args map[string]*string @@ -123,7 +123,6 @@ func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpe go serveSBI(triggerSBI, sbiEngine, sdlEngine, rpeEngine) for { - time.Sleep(INTERVAL * time.Second) if *args["nbi"] == "httpGetter" { data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXApps(*args["xm-url"]) if err != nil { @@ -134,6 +133,9 @@ func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpe } triggerSBI <- true + + time.Sleep(INTERVAL * time.Second) + rtmgr.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.") } } diff --git a/container-tag.yaml b/container-tag.yaml index dc6f324..3308c36 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: 0.4.0 +tag: 0.4.1 diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index 9606288..8ceca1d 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -35,6 +35,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/go-openapi/loads" + "github.com/go-openapi/runtime/middleware" "net/url" "os" "routing-manager/pkg/models" @@ -46,9 +48,6 @@ import ( "routing-manager/pkg/sdl" "strconv" "time" - - "github.com/go-openapi/loads" - "github.com/go-openapi/runtime/middleware" ) //var myClient = &http.Client{Timeout: 1 * time.Second} @@ -217,6 +216,8 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c if err != nil { return handle.NewProvideXappSubscriptionHandleBadRequest() } else { + //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints + time.Sleep(1 * time.Second) return handle.NewGetHandlesOK() } }) @@ -226,6 +227,8 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c if err != nil { return handle.NewDeleteXappSubscriptionHandleNoContent() } else { + //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints + time.Sleep(1 * time.Second) return handle.NewGetHandlesOK() } }) @@ -237,7 +240,7 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c } func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) { - rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl) + rtmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl) r, err := myClient.Get(xmurl) if err != nil { return nil, err @@ -252,10 +255,10 @@ func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) { rtmgr.Logger.Warn("Json decode failed: " + err.Error()) } rtmgr.Logger.Info("HTTP GET: OK") - rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps) + rtmgr.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps) return &xapps, err } - rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode) + rtmgr.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode) return nil, nil } @@ -315,8 +318,12 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co if err != nil { rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error()) } else if data != nil { - sdlEngine.WriteXApps(fileName, data) - triggerSBI <- true + rtmgr.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.") + alldata, err1 := httpGetXApps(xmurl) + if alldata != nil && err1 == nil { + sdlEngine.WriteXApps(fileName, alldata) + triggerSBI <- true + } } } }() diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go index 9a36564..aa8e12f 100644 --- a/pkg/rpe/rpe.go +++ b/pkg/rpe/rpe.go @@ -33,8 +33,8 @@ import ( "errors" "routing-manager/pkg/rtmgr" "routing-manager/pkg/sbi" - "runtime" "strconv" + "runtime" ) var ( @@ -90,26 +90,26 @@ func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoin rxList := []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}} messageId := rtmgr.MessageTypes[messageType] route := rtmgr.RouteTableEntry{ - MessageType: messageId, - TxList: txList, - RxGroups: rxList, - SubID: subId} - *routeTable = append(*routeTable, route) - rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId) - rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId) - } else { - pc, _, _, ok := runtime.Caller(1) - details := runtime.FuncForPC(pc) - if ok && details != nil { - rtmgr.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name()) + MessageType: messageId, + TxList: txList, + RxGroups: rxList, + SubID: subId} + *routeTable = append(*routeTable, route) + rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId) + rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId) + } else { + pc,_,_,ok := runtime.Caller(1) + details := runtime.FuncForPC(pc) + if ok && details != nil { + rtmgr.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name()) + } } - } } func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { rtmgr.Logger.Debug("rpe.generateXappRoutes invoked") rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType) - if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.TxMessages) > 0 && len(xAppEp.RxMessages) > 0 { + if xAppEp.XAppType != sbi.PlatformType && ( len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0 ) { //xApp -> Subscription Manager r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1) r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1) @@ -121,7 +121,7 @@ func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoin } } -func (r *Rpe) generateSubscriptionRoutes(e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { +func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { rtmgr.Logger.Debug("rpe.addSubscriptionRoutes invoked") subscriptionList := &rtmgr.Subs for _, subscription := range *subscriptionList { @@ -129,15 +129,18 @@ func (r *Rpe) generateSubscriptionRoutes(e2TermEp *rtmgr.Endpoint, subManEp *rtm xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port)) rtmgr.Logger.Debug("xApp UUID: %v", xAppUuid) xAppEp := getEndpointByUuid(xAppUuid) - //Subscription Manager -> xApp - r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID) - r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID) - r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID) - r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID) - //E2 Termination -> xApp - r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID) - r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID) - r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID) + if xAppEp.Uuid == selectedxAppEp.Uuid { + rtmgr.Logger.Debug("xApp UUID is matched for selected xApp.UUID: %v and xApp.Name: %v", selectedxAppEp.Uuid, selectedxAppEp.Name) + //Subscription Manager -> xApp + r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID) + r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID) + r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID) + r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID) + //E2 Termination -> xApp + r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID) + r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID) + r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID) + } } } @@ -227,9 +230,9 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable for _, endPoint := range endPointList { rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType) - if endPoint.XAppType != sbi.PlatformType && len(endPoint.TxMessages) > 0 && len(endPoint.RxMessages) > 0 { + if endPoint.XAppType != sbi.PlatformType && ( len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0 ) { r.generateXappRoutes(endPoint, e2TermEp, subManEp, routeTable) - r.generateSubscriptionRoutes(e2TermEp, subManEp, routeTable) + r.generateSubscriptionRoutes(endPoint, e2TermEp, subManEp, routeTable) } } return routeTable diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index 6e3e225..4f56753 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -30,12 +30,11 @@ package sbi import ( "errors" - "routing-manager/pkg/rtmgr" - "strconv" - "nanomsg.org/go/mangos/v2" "nanomsg.org/go/mangos/v2/protocol/push" _ "nanomsg.org/go/mangos/v2/transport/all" + "routing-manager/pkg/rtmgr" + "strconv" ) type NngPush struct { @@ -61,6 +60,7 @@ func createNewPushSocket() (NngSocket, error) { func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) { rtmgr.Logger.Debug("Invoked: pipeEventHandler()") + rtmgr.Logger.Debug("Received pipe event for " + pipe.Address() + " address") for _, ep := range rtmgr.Eps { uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber) if uri == pipe.Address() { diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go index 7d07160..2737cc1 100644 --- a/pkg/sbi/sbi.go +++ b/pkg/sbi/sbi.go @@ -64,6 +64,7 @@ type Sbi struct { } func (s *Sbi) pruneEndpointList(sbi Engine) { + rtmgr.Logger.Debug("pruneEndpointList invoked.") for _, ep := range rtmgr.Eps { if !ep.Keepalive { rtmgr.Logger.Debug("deleting %v", ep)