Upgraded to RMR 4.7.4 and some improvements 58/6058/1
authorwahidw <abdulwahid.w@nokia.com>
Tue, 11 May 2021 10:53:43 +0000 (10:53 +0000)
committerwahidw <abdulwahid.w@nokia.com>
Tue, 11 May 2021 10:53:43 +0000 (10:53 +0000)
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
Change-Id: Icbc120c35db37f68e6524bd998b127593b0e094a

12 files changed:
Dockerfile
RELNOTES
cmd/rtmgr.go
container-tag.yaml
pkg/nbi/control.go
pkg/nbi/httprestful.go
pkg/rpe/rmr.go
pkg/rpe/rpe.go
pkg/rpe/types.go
pkg/rtmgr/types.go
pkg/sbi/nngpush.go
pkg/sbi/types.go

index adae21d..0d561f0 100644 (file)
 FROM nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as rtmgrbuild
 
 # Install RMr shared library
-ARG RMRVERSION=4.5.2
+ARG RMRVERSION=4.7.4
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb
 # Install RMr development header files
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb
 
-ENV GOLANG_VERSION 1.12.1
+ENV GOLANG_VERSION 1.13.10
 RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \
      && tar xvzf go$GOLANG_VERSION.linux-amd64.tar.gz -C /usr/local 
 ENV PATH="/usr/local/go/bin:${PATH}"
index dc34000..bfc5a93 100644 (file)
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.7.6
+* Upgraded to RMR 4.7.4 and some improvements 
+
 ### v0.7.5
 * Open RMR connection in a a new thread
 
index c8ad5c6..2d3d81e 100644 (file)
@@ -49,6 +49,14 @@ import (
 
 const SERVICENAME = "rtmgr"
 
+/*type RMRUpdateType int
+
+const (
+       XappType = iota
+       SubsType
+       E2Type
+)*/
+
 func SetupCloseHandler() {
        c := make(chan os.Signal, 2)
        signal.Notify(c, os.Interrupt, syscall.SIGTERM)
index 4381920..547f709 100644 (file)
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.7.5
+tag: 0.7.6
index d88f5b0..2ed63d8 100644 (file)
@@ -30,16 +30,19 @@ import (
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "net/http"
        "os"
+       "routing-manager/pkg/models"
        "routing-manager/pkg/rpe"
        "routing-manager/pkg/rtmgr"
        "routing-manager/pkg/sbi"
        "routing-manager/pkg/sdl"
        "strconv"
+       "strings"
        "sync"
        "time"
 )
 
 var m sync.Mutex
+var EndpointLock sync.Mutex
 
 var nbiEngine Engine
 var sbiEngine sbi.Engine
@@ -139,11 +142,15 @@ func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams) {
                }
        }
 
-       ep := sbiEngine.CheckEndpoint(string(params.Payload))
-       if ep == nil {
-               xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
-               return
+       /* hack with WA only for mcxapp in near future */
+       if strings.Contains(msg.String(), "ricxapp") {
+               ep := sbiEngine.CheckEndpoint(string(params.Payload))
+               if ep == nil {
+                       xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
+                       return
+               }
        }
+
        epstr, whid := sbiEngine.CreateEndpoint(msg.String())
        if epstr == nil || whid < 0 {
                xapp.Logger.Error("Wormhole Id creation failed %d for %s", whid, msg.String())
@@ -184,7 +191,19 @@ func updateEp() (err error) {
        if err != nil {
                return errors.New("Routing table cannot be published due to: " + err.Error())
        }
+       EndpointLock.Lock()
        sbiEngine.UpdateEndpoints(data)
+       EndpointLock.Unlock()
+
+       return nil
+}
+
+func sendPartialRoutesToAll(xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) (err error) {
+       policies := rpeEngine.GeneratePartialPolicies(rtmgr.Eps, xappSubData, updatetype)
+       err = sbiEngine.DistributeAll(policies)
+       if err != nil {
+               return errors.New("Routing table cannot be published due to: " + err.Error())
+       }
 
        return nil
 }
@@ -224,9 +243,18 @@ func Serve() {
        /* used for rtmgr restart case to connect to Endpoints */
        go updateEp()
        time.Sleep(5 * time.Second)
+       sendRoutesToAll()
+       /* Sometimes first message  fails, retry after 5 sec */
+       time.Sleep(5 * time.Second)
+       sendRoutesToAll()
 
        for {
-               sendRoutesToAll()
+               xapp.Logger.Debug("Periodic Routes value = %s", xapp.Config.GetString("periodicroutes"))
+               if xapp.Config.GetString("periodicroutes") == "enable" {
+                       go updateEp()
+                       time.Sleep(5 * time.Second)
+                       sendRoutesToAll()
+               }
 
                rtmgr.Rtmgr_ready = true
                time.Sleep(INTERVAL * time.Second)
index 5393dc9..60d65fb 100644 (file)
@@ -146,6 +146,7 @@ func provideXappHandleHandlerImpl(data *models.XappCallbackData) error {
                                m.Unlock()
                                updateEp()
                                return sendRoutesToAll()
+                               //return sendPartialRoutesToAll(nil, rtmgr.XappType)
                        }
                }
 
@@ -227,7 +228,7 @@ func provideXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error
        addSubscription(&rtmgr.Subs, data)
        xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
        updateEp()
-       return sendRoutesToAll()
+       return sendPartialRoutesToAll(data, rtmgr.SubsType)
 }
 
 func subscriptionExists(data *models.XappSubscriptionData) bool {
@@ -295,6 +296,7 @@ func createNewE2tHandleHandlerImpl(data *models.E2tData) error {
        err, IsDuplicate := validateE2tData(data)
        if IsDuplicate == true {
                updateEp()
+               //return sendPartialRoutesToAll(nil, rtmgr.E2Type)
                return sendRoutesToAll()
        }
 
@@ -309,6 +311,7 @@ func createNewE2tHandleHandlerImpl(data *models.E2tData) error {
        sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata)
        m.Unlock()
        updateEp()
+       //sendPartialRoutesToAll(nil, rtmgr.E2Type)
        sendRoutesToAll()
        time.Sleep(10 * time.Second)
        for ep, value := range rtmgr.RMRConnStatus {
index eb7d7ed..17c977b 100644 (file)
@@ -32,6 +32,7 @@ package rpe
 
 import (
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "routing-manager/pkg/models"
        "routing-manager/pkg/rtmgr"
        "strconv"
        //"strings"
@@ -114,6 +115,49 @@ func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents,
        return &rawrt
 }
 
+/*
+Produces the raw route message consumable by RMR
+*/
+func (r *Rmr) generatePartialRMRPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, key string, updatetype rtmgr.RMRUpdateType) *[]string {
+       rawrt := []string{key + "updatert|start\n"}
+       rt := r.generatePartialRouteTable(eps, xappSubData, updatetype)
+       for _, rte := range *rt {
+               rawrte := key + "mse|" + rte.MessageType
+               for _, tx := range rte.TxList {
+                       rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port))
+               }
+               rawrte += "|" + strconv.Itoa(int(rte.SubID)) + "|"
+               group := ""
+               for _, rxg := range rte.RxGroups {
+                       member := ""
+                       for _, rx := range rxg {
+                               if member == "" {
+                                       member += rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+                               } else {
+                                       member += "," + rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+                               }
+                       }
+                       if group == "" {
+                               group += member
+                       } else {
+                               group += ";" + member
+                       }
+               }
+               rawrte += group
+
+               if rte.RouteType == "%meid" {
+                       rawrte += group + rte.RouteType
+               }
+
+               rawrt = append(rawrt, rawrte+"\n")
+       }
+
+       rawrt = append(rawrt, key+"updatert|end\n")
+       //count := 0
+
+       xapp.Logger.Debug("rmr.GeneratePolicies returns: %v", rawrt)
+       return &rawrt
+}
 func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) *[]string {
        xapp.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps)
        return r.generateRMRPolicies(eps, rcs, "")
@@ -122,3 +166,8 @@ func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents
 func (r *RmrPush) GenerateRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
        return r.generateRouteTable(eps)
 }
+
+func (r *RmrPush) GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string {
+       xapp.Logger.Debug("Invoked rmr.GeneratePartialRMR, args: %v: ", eps)
+       return r.generatePartialRMRPolicies(eps, xappSubData, "", updatetype)
+}
index 2c0423e..a11baf8 100644 (file)
@@ -32,6 +32,7 @@ package rpe
 import (
        "errors"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "routing-manager/pkg/models"
        "routing-manager/pkg/rtmgr"
        "routing-manager/pkg/sbi"
        "runtime"
@@ -258,7 +259,27 @@ func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManE
        }
 }
 
-func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generatePartialSubscriptionTable(xappSubData *models.XappSubscriptionData, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+       xapp.Logger.Debug("rpe.addSingleSubscriptionRoutes invoked")
+       xAppUuid := *xappSubData.Address + ":" + strconv.Itoa(int(*xappSubData.Port))
+       xapp.Logger.Debug("xApp UUID: %v", xAppUuid)
+       xAppEp := getEndpointByUuid(xAppUuid)
+       if xAppEp != nil {
+               //Subscription Manager -> xApp
+               r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+               r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+               r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+               r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+               //E2 Termination -> xApp
+               r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+               r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+               r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+       } else {
+               xapp.Logger.Error("generateSubscriptionRoutes xAppEp is nil, xApp UUID: %v", xAppUuid)
+       }
+}
+
+func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
        xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
        //Platform Routes --- Subscription Routes
        //Subscription Manager -> E2 Termination
@@ -274,8 +295,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.
                        sendEp = subManEp
                case "E2MAN":
                        sendEp = e2ManEp
-               case "RSM":
-                       sendEp = rsmEp
+               //case "RSM":,
+               //      sendEp = rsmEp
                case "A1MEDIATOR":
                        sendEp = a1mediatorEp
                }
@@ -286,8 +307,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.
                        Ep = e2ManEp
                //case "UEMAN":
                //      Ep = ueManEp
-               case "RSM":
-                       Ep = rsmEp
+               //case "RSM":
+               //      Ep = rsmEp
                case "A1MEDIATOR":
                        Ep = a1mediatorEp
                }
@@ -303,6 +324,54 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.
        }
 }
 
+func (r *Rpe) generatePartialRouteTable(endPointList rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *rtmgr.RouteTable {
+       xapp.Logger.Debug("rpe.generatePartialRouteTable invoked")
+       xapp.Logger.Debug("Endpoint List:  %v", endPointList)
+       routeTable := &rtmgr.RouteTable{}
+       subManEp := getEndpointByName(&endPointList, "SUBMAN")
+       if subManEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       /*e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST")
+       if len(e2TermListEp) == 0 {
+               xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }
+       e2ManEp := getEndpointByName(&endPointList, "E2MAN")
+       if e2ManEp == nil {
+               xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
+               xapp.Logger.Debug("Endpoints: %v", endPointList)
+       }*/
+
+       if xappSubData != nil && updatetype == rtmgr.SubsType {
+               xapp.Logger.Info("Updating partial subscription routes")
+               r.generatePartialSubscriptionTable(xappSubData, subManEp, routeTable)
+       }
+       /*if updatetype == rtmgr.XappType {
+               xapp.Logger.Info("Updating partial xapp routes")
+               for _, endPoint := range endPointList {
+                       xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
+                       if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
+                               r.generateXappRoutes(endPoint, subManEp, routeTable)
+                               r.generateXappToXappRoutes(endPoint, endPointList, routeTable)
+                       }
+               }
+       }
+       if updatetype == rtmgr.E2Type {
+               xapp.Logger.Info("Updating partial E2 routes")
+               if len(e2TermListEp) > 0 {
+                       r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermListEp, routeTable, -1, "")
+                       r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermListEp, routeTable, -1, "")
+                       r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermListEp, routeTable, -1, "")
+                       r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermListEp, routeTable, -1, "")
+               }
+       }*/
+
+       return routeTable
+
+}
+
 func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
        xapp.Logger.Debug("rpe.generateRouteTable invoked")
        xapp.Logger.Debug("Endpoint List:  %v", endPointList)
@@ -322,11 +391,11 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable
                xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
                xapp.Logger.Debug("Endpoints: %v", endPointList)
        }
-       rsmEp := getEndpointByName(&endPointList, "RSM")
+       /*rsmEp := getEndpointByName(&endPointList, "RSM")
        if rsmEp == nil {
                xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
                xapp.Logger.Debug("Endpoints: %v", endPointList)
-       }
+       }*/
        A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR")
        if A1MediatorEp == nil {
                xapp.Logger.Error("Platform component not found: %v", "A1Mediator")
@@ -338,7 +407,7 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable
                xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
                xapp.Logger.Debug("Endpoints: %v", endPointList)
        }
-       r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
+       r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, A1MediatorEp, routeTable)
 
        for _, endPoint := range endPointList {
                xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
index 4a5b9fc..2d466d3 100644 (file)
 
 package rpe
 
-import "routing-manager/pkg/rtmgr"
+import (
+       "routing-manager/pkg/models"
+       "routing-manager/pkg/rtmgr"
+)
 
 //type generatePolicies func(rtmgr.Endpoints) *[]string
 //type generateRouteTable func(rtmgr.Endpoints) *rtmgr.RouteTable
@@ -45,4 +48,5 @@ type EngineConfig struct {
 type Engine interface {
        GeneratePolicies(rtmgr.Endpoints, *rtmgr.RicComponents) *[]string
        GenerateRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable
+       GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string
 }
index 41e8cf0..b9b9604 100644 (file)
 
 package rtmgr
 
+type RMRUpdateType int
+
+const (
+       XappType = iota
+       SubsType
+       E2Type
+)
+
 type XApps struct {
        XAppList []XApp
 }
index 16545cb..ec4d25a 100644 (file)
@@ -87,7 +87,8 @@ func (c *RmrPush) Terminate() error {
 }
 
 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
-       count := addendpointct + 1
+       addendpointct = addendpointct + 1
+       count := addendpointct
        xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
        endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
        ep.Whid = int(xapp.Rmr.Openwh(endpoint))
@@ -159,7 +160,6 @@ func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int)
        xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
 
        ret := c.send_data(ep, policies, call_id)
-       xapp.Logger.Debug("return value is %v", ret)
        conn.Lock()
        rtmgr.RMRConnStatus[ep.Uuid] = ret
        conn.Unlock()
index b52dcfa..094a8ad 100644 (file)
@@ -45,9 +45,9 @@ type Engine interface {
        AddEndpoint(*rtmgr.Endpoint) error
        DeleteEndpoint(*rtmgr.Endpoint) error
        UpdateEndpoints(*rtmgr.RicComponents)
-       CreateEndpoint(string)(*string,int)
-       CheckEndpoint(string)*rtmgr.Endpoint
-       DistributeToEp(*[]string, string, int ) error
+       CheckEndpoint(string) *rtmgr.Endpoint
+       CreateEndpoint(string) (*string, int)
+       DistributeToEp(*[]string, string, int) error
 }
 
 /*type NngSocket interface {