From: wahidw Date: Tue, 11 May 2021 10:53:43 +0000 (+0000) Subject: Upgraded to RMR 4.7.4 and some improvements X-Git-Tag: 0.7.8~10 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=505e2497ce651c1127ffbcefe396b3f8c0d0f9dd;p=ric-plt%2Frtmgr.git Upgraded to RMR 4.7.4 and some improvements Signed-off-by: wahidw Change-Id: Icbc120c35db37f68e6524bd998b127593b0e094a --- diff --git a/Dockerfile b/Dockerfile index adae21d..0d561f0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,12 +26,12 @@ FROM nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as rtmgrbuild # Install RMr shared library -ARG RMRVERSION=4.5.2 +ARG RMRVERSION=4.7.4 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb # Install RMr development header files RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb -ENV GOLANG_VERSION 1.12.1 +ENV GOLANG_VERSION 1.13.10 RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \ && tar xvzf go$GOLANG_VERSION.linux-amd64.tar.gz -C /usr/local ENV PATH="/usr/local/go/bin:${PATH}" diff --git a/RELNOTES b/RELNOTES index dc34000..bfc5a93 100644 --- a/RELNOTES +++ b/RELNOTES @@ -1,3 +1,6 @@ +### v0.7.6 +* Upgraded to RMR 4.7.4 and some improvements + ### v0.7.5 * Open RMR connection in a a new thread diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go index c8ad5c6..2d3d81e 100644 --- a/cmd/rtmgr.go +++ b/cmd/rtmgr.go @@ -49,6 +49,14 @@ import ( const SERVICENAME = "rtmgr" +/*type RMRUpdateType int + +const ( + XappType = iota + SubsType + E2Type +)*/ + func SetupCloseHandler() { c := make(chan os.Signal, 2) signal.Notify(c, os.Interrupt, syscall.SIGTERM) diff --git a/container-tag.yaml b/container-tag.yaml index 4381920..547f709 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: 0.7.5 +tag: 0.7.6 diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go index d88f5b0..2ed63d8 100644 --- a/pkg/nbi/control.go +++ b/pkg/nbi/control.go @@ -30,16 +30,19 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "net/http" "os" + "routing-manager/pkg/models" "routing-manager/pkg/rpe" "routing-manager/pkg/rtmgr" "routing-manager/pkg/sbi" "routing-manager/pkg/sdl" "strconv" + "strings" "sync" "time" ) var m sync.Mutex +var EndpointLock sync.Mutex var nbiEngine Engine var sbiEngine sbi.Engine @@ -139,11 +142,15 @@ func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams) { } } - ep := sbiEngine.CheckEndpoint(string(params.Payload)) - if ep == nil { - xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload)) - return + /* hack with WA only for mcxapp in near future */ + if strings.Contains(msg.String(), "ricxapp") { + ep := sbiEngine.CheckEndpoint(string(params.Payload)) + if ep == nil { + xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload)) + return + } } + epstr, whid := sbiEngine.CreateEndpoint(msg.String()) if epstr == nil || whid < 0 { xapp.Logger.Error("Wormhole Id creation failed %d for %s", whid, msg.String()) @@ -184,7 +191,19 @@ func updateEp() (err error) { if err != nil { return errors.New("Routing table cannot be published due to: " + err.Error()) } + EndpointLock.Lock() sbiEngine.UpdateEndpoints(data) + EndpointLock.Unlock() + + return nil +} + +func sendPartialRoutesToAll(xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) (err error) { + policies := rpeEngine.GeneratePartialPolicies(rtmgr.Eps, xappSubData, updatetype) + err = sbiEngine.DistributeAll(policies) + if err != nil { + return errors.New("Routing table cannot be published due to: " + err.Error()) + } return nil } @@ -224,9 +243,18 @@ func Serve() { /* used for rtmgr restart case to connect to Endpoints */ go updateEp() time.Sleep(5 * time.Second) + sendRoutesToAll() + /* Sometimes first message fails, retry after 5 sec */ + time.Sleep(5 * time.Second) + sendRoutesToAll() for { - sendRoutesToAll() + xapp.Logger.Debug("Periodic Routes value = %s", xapp.Config.GetString("periodicroutes")) + if xapp.Config.GetString("periodicroutes") == "enable" { + go updateEp() + time.Sleep(5 * time.Second) + sendRoutesToAll() + } rtmgr.Rtmgr_ready = true time.Sleep(INTERVAL * time.Second) diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index 5393dc9..60d65fb 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -146,6 +146,7 @@ func provideXappHandleHandlerImpl(data *models.XappCallbackData) error { m.Unlock() updateEp() return sendRoutesToAll() + //return sendPartialRoutesToAll(nil, rtmgr.XappType) } } @@ -227,7 +228,7 @@ func provideXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error addSubscription(&rtmgr.Subs, data) xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps) updateEp() - return sendRoutesToAll() + return sendPartialRoutesToAll(data, rtmgr.SubsType) } func subscriptionExists(data *models.XappSubscriptionData) bool { @@ -295,6 +296,7 @@ func createNewE2tHandleHandlerImpl(data *models.E2tData) error { err, IsDuplicate := validateE2tData(data) if IsDuplicate == true { updateEp() + //return sendPartialRoutesToAll(nil, rtmgr.E2Type) return sendRoutesToAll() } @@ -309,6 +311,7 @@ func createNewE2tHandleHandlerImpl(data *models.E2tData) error { sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata) m.Unlock() updateEp() + //sendPartialRoutesToAll(nil, rtmgr.E2Type) sendRoutesToAll() time.Sleep(10 * time.Second) for ep, value := range rtmgr.RMRConnStatus { diff --git a/pkg/rpe/rmr.go b/pkg/rpe/rmr.go index eb7d7ed..17c977b 100644 --- a/pkg/rpe/rmr.go +++ b/pkg/rpe/rmr.go @@ -32,6 +32,7 @@ package rpe import ( "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "routing-manager/pkg/models" "routing-manager/pkg/rtmgr" "strconv" //"strings" @@ -114,6 +115,49 @@ func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents, return &rawrt } +/* +Produces the raw route message consumable by RMR +*/ +func (r *Rmr) generatePartialRMRPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, key string, updatetype rtmgr.RMRUpdateType) *[]string { + rawrt := []string{key + "updatert|start\n"} + rt := r.generatePartialRouteTable(eps, xappSubData, updatetype) + for _, rte := range *rt { + rawrte := key + "mse|" + rte.MessageType + for _, tx := range rte.TxList { + rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port)) + } + rawrte += "|" + strconv.Itoa(int(rte.SubID)) + "|" + group := "" + for _, rxg := range rte.RxGroups { + member := "" + for _, rx := range rxg { + if member == "" { + member += rx.Ip + ":" + strconv.Itoa(int(rx.Port)) + } else { + member += "," + rx.Ip + ":" + strconv.Itoa(int(rx.Port)) + } + } + if group == "" { + group += member + } else { + group += ";" + member + } + } + rawrte += group + + if rte.RouteType == "%meid" { + rawrte += group + rte.RouteType + } + + rawrt = append(rawrt, rawrte+"\n") + } + + rawrt = append(rawrt, key+"updatert|end\n") + //count := 0 + + xapp.Logger.Debug("rmr.GeneratePolicies returns: %v", rawrt) + return &rawrt +} func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) *[]string { xapp.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps) return r.generateRMRPolicies(eps, rcs, "") @@ -122,3 +166,8 @@ func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents func (r *RmrPush) GenerateRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable { return r.generateRouteTable(eps) } + +func (r *RmrPush) GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string { + xapp.Logger.Debug("Invoked rmr.GeneratePartialRMR, args: %v: ", eps) + return r.generatePartialRMRPolicies(eps, xappSubData, "", updatetype) +} diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go index 2c0423e..a11baf8 100644 --- a/pkg/rpe/rpe.go +++ b/pkg/rpe/rpe.go @@ -32,6 +32,7 @@ package rpe import ( "errors" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "routing-manager/pkg/models" "routing-manager/pkg/rtmgr" "routing-manager/pkg/sbi" "runtime" @@ -258,7 +259,27 @@ func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManE } } -func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { +func (r *Rpe) generatePartialSubscriptionTable(xappSubData *models.XappSubscriptionData, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { + xapp.Logger.Debug("rpe.addSingleSubscriptionRoutes invoked") + xAppUuid := *xappSubData.Address + ":" + strconv.Itoa(int(*xappSubData.Port)) + xapp.Logger.Debug("xApp UUID: %v", xAppUuid) + xAppEp := getEndpointByUuid(xAppUuid) + if xAppEp != nil { + //Subscription Manager -> xApp + r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "") + r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "") + r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "") + r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "") + //E2 Termination -> xApp + r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "") + r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "") + r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "") + } else { + xapp.Logger.Error("generateSubscriptionRoutes xAppEp is nil, xApp UUID: %v", xAppUuid) + } +} + +func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { xapp.Logger.Debug("rpe.generatePlatformRoutes invoked") //Platform Routes --- Subscription Routes //Subscription Manager -> E2 Termination @@ -274,8 +295,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr. sendEp = subManEp case "E2MAN": sendEp = e2ManEp - case "RSM": - sendEp = rsmEp + //case "RSM":, + // sendEp = rsmEp case "A1MEDIATOR": sendEp = a1mediatorEp } @@ -286,8 +307,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr. Ep = e2ManEp //case "UEMAN": // Ep = ueManEp - case "RSM": - Ep = rsmEp + //case "RSM": + // Ep = rsmEp case "A1MEDIATOR": Ep = a1mediatorEp } @@ -303,6 +324,54 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr. } } +func (r *Rpe) generatePartialRouteTable(endPointList rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *rtmgr.RouteTable { + xapp.Logger.Debug("rpe.generatePartialRouteTable invoked") + xapp.Logger.Debug("Endpoint List: %v", endPointList) + routeTable := &rtmgr.RouteTable{} + subManEp := getEndpointByName(&endPointList, "SUBMAN") + if subManEp == nil { + xapp.Logger.Error("Platform component not found: %v", "Subscription Manager") + xapp.Logger.Debug("Endpoints: %v", endPointList) + } + /*e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST") + if len(e2TermListEp) == 0 { + xapp.Logger.Error("Platform component not found: %v", "E2 Termination List") + xapp.Logger.Debug("Endpoints: %v", endPointList) + } + e2ManEp := getEndpointByName(&endPointList, "E2MAN") + if e2ManEp == nil { + xapp.Logger.Error("Platform component not found: %v", "E2 Manager") + xapp.Logger.Debug("Endpoints: %v", endPointList) + }*/ + + if xappSubData != nil && updatetype == rtmgr.SubsType { + xapp.Logger.Info("Updating partial subscription routes") + r.generatePartialSubscriptionTable(xappSubData, subManEp, routeTable) + } + /*if updatetype == rtmgr.XappType { + xapp.Logger.Info("Updating partial xapp routes") + for _, endPoint := range endPointList { + xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType) + if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) { + r.generateXappRoutes(endPoint, subManEp, routeTable) + r.generateXappToXappRoutes(endPoint, endPointList, routeTable) + } + } + } + if updatetype == rtmgr.E2Type { + xapp.Logger.Info("Updating partial E2 routes") + if len(e2TermListEp) > 0 { + r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermListEp, routeTable, -1, "") + r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermListEp, routeTable, -1, "") + r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermListEp, routeTable, -1, "") + r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermListEp, routeTable, -1, "") + } + }*/ + + return routeTable + +} + func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable { xapp.Logger.Debug("rpe.generateRouteTable invoked") xapp.Logger.Debug("Endpoint List: %v", endPointList) @@ -322,11 +391,11 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable xapp.Logger.Error("Platform component not found: %v", "E2 Manager") xapp.Logger.Debug("Endpoints: %v", endPointList) } - rsmEp := getEndpointByName(&endPointList, "RSM") + /*rsmEp := getEndpointByName(&endPointList, "RSM") if rsmEp == nil { xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager") xapp.Logger.Debug("Endpoints: %v", endPointList) - } + }*/ A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR") if A1MediatorEp == nil { xapp.Logger.Error("Platform component not found: %v", "A1Mediator") @@ -338,7 +407,7 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable xapp.Logger.Error("Platform component not found: %v", "E2 Termination List") xapp.Logger.Debug("Endpoints: %v", endPointList) } - r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable) + r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, A1MediatorEp, routeTable) for _, endPoint := range endPointList { xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType) diff --git a/pkg/rpe/types.go b/pkg/rpe/types.go index 4a5b9fc..2d466d3 100644 --- a/pkg/rpe/types.go +++ b/pkg/rpe/types.go @@ -29,7 +29,10 @@ package rpe -import "routing-manager/pkg/rtmgr" +import ( + "routing-manager/pkg/models" + "routing-manager/pkg/rtmgr" +) //type generatePolicies func(rtmgr.Endpoints) *[]string //type generateRouteTable func(rtmgr.Endpoints) *rtmgr.RouteTable @@ -45,4 +48,5 @@ type EngineConfig struct { type Engine interface { GeneratePolicies(rtmgr.Endpoints, *rtmgr.RicComponents) *[]string GenerateRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable + GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string } diff --git a/pkg/rtmgr/types.go b/pkg/rtmgr/types.go index 41e8cf0..b9b9604 100644 --- a/pkg/rtmgr/types.go +++ b/pkg/rtmgr/types.go @@ -28,6 +28,14 @@ package rtmgr +type RMRUpdateType int + +const ( + XappType = iota + SubsType + E2Type +) + type XApps struct { XAppList []XApp } diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index 16545cb..ec4d25a 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -87,7 +87,8 @@ func (c *RmrPush) Terminate() error { } func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error { - count := addendpointct + 1 + addendpointct = addendpointct + 1 + count := addendpointct xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count) endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber) ep.Whid = int(xapp.Rmr.Openwh(endpoint)) @@ -159,7 +160,6 @@ func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid) ret := c.send_data(ep, policies, call_id) - xapp.Logger.Debug("return value is %v", ret) conn.Lock() rtmgr.RMRConnStatus[ep.Uuid] = ret conn.Unlock() diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go index b52dcfa..094a8ad 100644 --- a/pkg/sbi/types.go +++ b/pkg/sbi/types.go @@ -45,9 +45,9 @@ type Engine interface { AddEndpoint(*rtmgr.Endpoint) error DeleteEndpoint(*rtmgr.Endpoint) error UpdateEndpoints(*rtmgr.RicComponents) - CreateEndpoint(string)(*string,int) - CheckEndpoint(string)*rtmgr.Endpoint - DistributeToEp(*[]string, string, int ) error + CheckEndpoint(string) *rtmgr.Endpoint + CreateEndpoint(string) (*string, int) + DistributeToEp(*[]string, string, int) error } /*type NngSocket interface {