FROM nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as rtmgrbuild
# Install RMr shared library
-ARG RMRVERSION=4.5.2
+ARG RMRVERSION=4.7.4
RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb
# Install RMr development header files
RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb
-ENV GOLANG_VERSION 1.12.1
+ENV GOLANG_VERSION 1.13.10
RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \
&& tar xvzf go$GOLANG_VERSION.linux-amd64.tar.gz -C /usr/local
ENV PATH="/usr/local/go/bin:${PATH}"
+### v0.7.6
+* Upgraded to RMR 4.7.4 and some improvements
+
### v0.7.5
* Open RMR connection in a a new thread
const SERVICENAME = "rtmgr"
+/*type RMRUpdateType int
+
+const (
+ XappType = iota
+ SubsType
+ E2Type
+)*/
+
func SetupCloseHandler() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
# By default this file is in the docker build directory,
# but the location can configured in the JJB template.
---
-tag: 0.7.5
+tag: 0.7.6
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"net/http"
"os"
+ "routing-manager/pkg/models"
"routing-manager/pkg/rpe"
"routing-manager/pkg/rtmgr"
"routing-manager/pkg/sbi"
"routing-manager/pkg/sdl"
"strconv"
+ "strings"
"sync"
"time"
)
var m sync.Mutex
+var EndpointLock sync.Mutex
var nbiEngine Engine
var sbiEngine sbi.Engine
}
}
- ep := sbiEngine.CheckEndpoint(string(params.Payload))
- if ep == nil {
- xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
- return
+ /* hack with WA only for mcxapp in near future */
+ if strings.Contains(msg.String(), "ricxapp") {
+ ep := sbiEngine.CheckEndpoint(string(params.Payload))
+ if ep == nil {
+ xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
+ return
+ }
}
+
epstr, whid := sbiEngine.CreateEndpoint(msg.String())
if epstr == nil || whid < 0 {
xapp.Logger.Error("Wormhole Id creation failed %d for %s", whid, msg.String())
if err != nil {
return errors.New("Routing table cannot be published due to: " + err.Error())
}
+ EndpointLock.Lock()
sbiEngine.UpdateEndpoints(data)
+ EndpointLock.Unlock()
+
+ return nil
+}
+
+func sendPartialRoutesToAll(xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) (err error) {
+ policies := rpeEngine.GeneratePartialPolicies(rtmgr.Eps, xappSubData, updatetype)
+ err = sbiEngine.DistributeAll(policies)
+ if err != nil {
+ return errors.New("Routing table cannot be published due to: " + err.Error())
+ }
return nil
}
/* used for rtmgr restart case to connect to Endpoints */
go updateEp()
time.Sleep(5 * time.Second)
+ sendRoutesToAll()
+ /* Sometimes first message fails, retry after 5 sec */
+ time.Sleep(5 * time.Second)
+ sendRoutesToAll()
for {
- sendRoutesToAll()
+ xapp.Logger.Debug("Periodic Routes value = %s", xapp.Config.GetString("periodicroutes"))
+ if xapp.Config.GetString("periodicroutes") == "enable" {
+ go updateEp()
+ time.Sleep(5 * time.Second)
+ sendRoutesToAll()
+ }
rtmgr.Rtmgr_ready = true
time.Sleep(INTERVAL * time.Second)
m.Unlock()
updateEp()
return sendRoutesToAll()
+ //return sendPartialRoutesToAll(nil, rtmgr.XappType)
}
}
addSubscription(&rtmgr.Subs, data)
xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
updateEp()
- return sendRoutesToAll()
+ return sendPartialRoutesToAll(data, rtmgr.SubsType)
}
func subscriptionExists(data *models.XappSubscriptionData) bool {
err, IsDuplicate := validateE2tData(data)
if IsDuplicate == true {
updateEp()
+ //return sendPartialRoutesToAll(nil, rtmgr.E2Type)
return sendRoutesToAll()
}
sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata)
m.Unlock()
updateEp()
+ //sendPartialRoutesToAll(nil, rtmgr.E2Type)
sendRoutesToAll()
time.Sleep(10 * time.Second)
for ep, value := range rtmgr.RMRConnStatus {
import (
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "routing-manager/pkg/models"
"routing-manager/pkg/rtmgr"
"strconv"
//"strings"
return &rawrt
}
+/*
+Produces the raw route message consumable by RMR
+*/
+func (r *Rmr) generatePartialRMRPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, key string, updatetype rtmgr.RMRUpdateType) *[]string {
+ rawrt := []string{key + "updatert|start\n"}
+ rt := r.generatePartialRouteTable(eps, xappSubData, updatetype)
+ for _, rte := range *rt {
+ rawrte := key + "mse|" + rte.MessageType
+ for _, tx := range rte.TxList {
+ rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port))
+ }
+ rawrte += "|" + strconv.Itoa(int(rte.SubID)) + "|"
+ group := ""
+ for _, rxg := range rte.RxGroups {
+ member := ""
+ for _, rx := range rxg {
+ if member == "" {
+ member += rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+ } else {
+ member += "," + rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+ }
+ }
+ if group == "" {
+ group += member
+ } else {
+ group += ";" + member
+ }
+ }
+ rawrte += group
+
+ if rte.RouteType == "%meid" {
+ rawrte += group + rte.RouteType
+ }
+
+ rawrt = append(rawrt, rawrte+"\n")
+ }
+
+ rawrt = append(rawrt, key+"updatert|end\n")
+ //count := 0
+
+ xapp.Logger.Debug("rmr.GeneratePolicies returns: %v", rawrt)
+ return &rawrt
+}
func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) *[]string {
xapp.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps)
return r.generateRMRPolicies(eps, rcs, "")
func (r *RmrPush) GenerateRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
return r.generateRouteTable(eps)
}
+
+func (r *RmrPush) GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string {
+ xapp.Logger.Debug("Invoked rmr.GeneratePartialRMR, args: %v: ", eps)
+ return r.generatePartialRMRPolicies(eps, xappSubData, "", updatetype)
+}
import (
"errors"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "routing-manager/pkg/models"
"routing-manager/pkg/rtmgr"
"routing-manager/pkg/sbi"
"runtime"
}
}
-func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generatePartialSubscriptionTable(xappSubData *models.XappSubscriptionData, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+ xapp.Logger.Debug("rpe.addSingleSubscriptionRoutes invoked")
+ xAppUuid := *xappSubData.Address + ":" + strconv.Itoa(int(*xappSubData.Port))
+ xapp.Logger.Debug("xApp UUID: %v", xAppUuid)
+ xAppEp := getEndpointByUuid(xAppUuid)
+ if xAppEp != nil {
+ //Subscription Manager -> xApp
+ r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ //E2 Termination -> xApp
+ r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ } else {
+ xapp.Logger.Error("generateSubscriptionRoutes xAppEp is nil, xApp UUID: %v", xAppUuid)
+ }
+}
+
+func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
//Platform Routes --- Subscription Routes
//Subscription Manager -> E2 Termination
sendEp = subManEp
case "E2MAN":
sendEp = e2ManEp
- case "RSM":
- sendEp = rsmEp
+ //case "RSM":,
+ // sendEp = rsmEp
case "A1MEDIATOR":
sendEp = a1mediatorEp
}
Ep = e2ManEp
//case "UEMAN":
// Ep = ueManEp
- case "RSM":
- Ep = rsmEp
+ //case "RSM":
+ // Ep = rsmEp
case "A1MEDIATOR":
Ep = a1mediatorEp
}
}
}
+func (r *Rpe) generatePartialRouteTable(endPointList rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *rtmgr.RouteTable {
+ xapp.Logger.Debug("rpe.generatePartialRouteTable invoked")
+ xapp.Logger.Debug("Endpoint List: %v", endPointList)
+ routeTable := &rtmgr.RouteTable{}
+ subManEp := getEndpointByName(&endPointList, "SUBMAN")
+ if subManEp == nil {
+ xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
+ xapp.Logger.Debug("Endpoints: %v", endPointList)
+ }
+ /*e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST")
+ if len(e2TermListEp) == 0 {
+ xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
+ xapp.Logger.Debug("Endpoints: %v", endPointList)
+ }
+ e2ManEp := getEndpointByName(&endPointList, "E2MAN")
+ if e2ManEp == nil {
+ xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
+ xapp.Logger.Debug("Endpoints: %v", endPointList)
+ }*/
+
+ if xappSubData != nil && updatetype == rtmgr.SubsType {
+ xapp.Logger.Info("Updating partial subscription routes")
+ r.generatePartialSubscriptionTable(xappSubData, subManEp, routeTable)
+ }
+ /*if updatetype == rtmgr.XappType {
+ xapp.Logger.Info("Updating partial xapp routes")
+ for _, endPoint := range endPointList {
+ xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
+ if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
+ r.generateXappRoutes(endPoint, subManEp, routeTable)
+ r.generateXappToXappRoutes(endPoint, endPointList, routeTable)
+ }
+ }
+ }
+ if updatetype == rtmgr.E2Type {
+ xapp.Logger.Info("Updating partial E2 routes")
+ if len(e2TermListEp) > 0 {
+ r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermListEp, routeTable, -1, "")
+ r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermListEp, routeTable, -1, "")
+ r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermListEp, routeTable, -1, "")
+ r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermListEp, routeTable, -1, "")
+ }
+ }*/
+
+ return routeTable
+
+}
+
func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
xapp.Logger.Debug("rpe.generateRouteTable invoked")
xapp.Logger.Debug("Endpoint List: %v", endPointList)
xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
xapp.Logger.Debug("Endpoints: %v", endPointList)
}
- rsmEp := getEndpointByName(&endPointList, "RSM")
+ /*rsmEp := getEndpointByName(&endPointList, "RSM")
if rsmEp == nil {
xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
xapp.Logger.Debug("Endpoints: %v", endPointList)
- }
+ }*/
A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR")
if A1MediatorEp == nil {
xapp.Logger.Error("Platform component not found: %v", "A1Mediator")
xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
xapp.Logger.Debug("Endpoints: %v", endPointList)
}
- r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
+ r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, A1MediatorEp, routeTable)
for _, endPoint := range endPointList {
xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
package rpe
-import "routing-manager/pkg/rtmgr"
+import (
+ "routing-manager/pkg/models"
+ "routing-manager/pkg/rtmgr"
+)
//type generatePolicies func(rtmgr.Endpoints) *[]string
//type generateRouteTable func(rtmgr.Endpoints) *rtmgr.RouteTable
type Engine interface {
GeneratePolicies(rtmgr.Endpoints, *rtmgr.RicComponents) *[]string
GenerateRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable
+ GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string
}
package rtmgr
+type RMRUpdateType int
+
+const (
+ XappType = iota
+ SubsType
+ E2Type
+)
+
type XApps struct {
XAppList []XApp
}
}
func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
- count := addendpointct + 1
+ addendpointct = addendpointct + 1
+ count := addendpointct
xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
ep.Whid = int(xapp.Rmr.Openwh(endpoint))
xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
ret := c.send_data(ep, policies, call_id)
- xapp.Logger.Debug("return value is %v", ret)
conn.Lock()
rtmgr.RMRConnStatus[ep.Uuid] = ret
conn.Unlock()
AddEndpoint(*rtmgr.Endpoint) error
DeleteEndpoint(*rtmgr.Endpoint) error
UpdateEndpoints(*rtmgr.RicComponents)
- CreateEndpoint(string)(*string,int)
- CheckEndpoint(string)*rtmgr.Endpoint
- DistributeToEp(*[]string, string, int ) error
+ CheckEndpoint(string) *rtmgr.Endpoint
+ CreateEndpoint(string) (*string, int)
+ DistributeToEp(*[]string, string, int) error
}
/*type NngSocket interface {