Fixed integration and subscription routes related issues for R3 15/1915/1
authorprabhukaliswamy <prabhu.k@nokia.com>
Tue, 3 Dec 2019 15:06:30 +0000 (15:06 +0000)
committerprabhukaliswamy <prabhu.k@nokia.com>
Tue, 3 Dec 2019 15:06:48 +0000 (15:06 +0000)
Change-Id: I8f89ad2524b6dbe33f088f1685d9b3d5820b7e54
Signed-off-by: prabhukaliswamy <prabhu.k@nokia.com>
RELNOTES
cmd/rtmgr.go
container-tag.yaml
pkg/nbi/httprestful.go
pkg/rpe/rpe.go
pkg/sbi/nngpush.go
pkg/sbi/sbi.go

index 92fde07..2d57a92 100644 (file)
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.4.1
+* Fixed integration and subscription routes related issues for R3
+
 ### v0.3.9
 * Added RSM platform component routes for message types RAN_CONNECTED, RAN_RESTARTED, RAN_RECONFIGURED, RIC_RES_STATUS_REQ,RIC_RES_STATUS_RESP and RIC_RES_STATUS_FAILURE
 * xApp manager interface changes for Subscription Request API
index 101c743..568b7ad 100644 (file)
@@ -47,7 +47,7 @@ import (
 )
 
 const SERVICENAME = "rtmgr"
-const INTERVAL time.Duration = 2
+const INTERVAL time.Duration = 60
 
 var (
        args map[string]*string
@@ -123,7 +123,6 @@ func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpe
        go serveSBI(triggerSBI, sbiEngine, sdlEngine, rpeEngine)
 
        for {
-               time.Sleep(INTERVAL * time.Second)
                if *args["nbi"] == "httpGetter" {
                        data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXApps(*args["xm-url"])
                        if err != nil {
@@ -134,6 +133,9 @@ func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpe
                }
 
                triggerSBI <- true
+
+               time.Sleep(INTERVAL * time.Second)
+               rtmgr.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
        }
 }
 
index dc6f324..3308c36 100644 (file)
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.4.0
+tag: 0.4.1
index 9606288..8ceca1d 100644 (file)
@@ -35,6 +35,8 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+       "github.com/go-openapi/loads"
+       "github.com/go-openapi/runtime/middleware"
        "net/url"
        "os"
        "routing-manager/pkg/models"
@@ -46,9 +48,6 @@ import (
        "routing-manager/pkg/sdl"
        "strconv"
        "time"
-
-       "github.com/go-openapi/loads"
-       "github.com/go-openapi/runtime/middleware"
 )
 
 //var myClient = &http.Client{Timeout: 1 * time.Second}
@@ -217,6 +216,8 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c
                        if err != nil {
                                return handle.NewProvideXappSubscriptionHandleBadRequest()
                        } else {
+                               //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+                               time.Sleep(1 * time.Second)
                                return handle.NewGetHandlesOK()
                        }
                })
@@ -226,6 +227,8 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c
                        if err != nil {
                                return handle.NewDeleteXappSubscriptionHandleNoContent()
                        } else {
+                               //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+                               time.Sleep(1 * time.Second)
                                return handle.NewGetHandlesOK()
                        }
                })
@@ -237,7 +240,7 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c
 }
 
 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
-       rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
+       rtmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
        r, err := myClient.Get(xmurl)
        if err != nil {
                return nil, err
@@ -252,10 +255,10 @@ func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
                        rtmgr.Logger.Warn("Json decode failed: " + err.Error())
                }
                rtmgr.Logger.Info("HTTP GET: OK")
-               rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
+               rtmgr.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
                return &xapps, err
        }
-       rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
+       rtmgr.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
        return nil, nil
 }
 
@@ -315,8 +318,12 @@ func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, co
                        if err != nil {
                                rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
                        } else if data != nil {
-                               sdlEngine.WriteXApps(fileName, data)
-                               triggerSBI <- true
+                               rtmgr.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
+                               alldata, err1 := httpGetXApps(xmurl)
+                               if alldata != nil && err1 == nil {
+                                       sdlEngine.WriteXApps(fileName, alldata)
+                                       triggerSBI <- true
+                               }
                        }
                }
        }()
index 9a36564..aa8e12f 100644 (file)
@@ -33,8 +33,8 @@ import (
        "errors"
        "routing-manager/pkg/rtmgr"
        "routing-manager/pkg/sbi"
-       "runtime"
        "strconv"
+       "runtime"
 )
 
 var (
@@ -90,26 +90,26 @@ func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoin
                rxList := []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
                messageId := rtmgr.MessageTypes[messageType]
                route := rtmgr.RouteTableEntry{
-                       MessageType: messageId,
-                       TxList:      txList,
-                       RxGroups:    rxList,
-                       SubID:       subId}
-               *routeTable = append(*routeTable, route)
-               rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
-               rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
-       } else {
-               pc, _, _, ok := runtime.Caller(1)
-               details := runtime.FuncForPC(pc)
-               if ok && details != nil {
-                       rtmgr.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
+                               MessageType: messageId,
+                               TxList:      txList,
+                               RxGroups:    rxList,
+                               SubID:       subId}
+                       *routeTable = append(*routeTable, route)
+                       rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
+                       rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
+               } else {
+                       pc,_,_,ok := runtime.Caller(1)
+                       details := runtime.FuncForPC(pc)
+                       if ok && details != nil {
+                               rtmgr.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
+                       }
                }
-       }
 }
 
 func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
        rtmgr.Logger.Debug("rpe.generateXappRoutes invoked")
        rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
-       if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.TxMessages) > 0 && len(xAppEp.RxMessages) > 0 {
+       if xAppEp.XAppType != sbi.PlatformType && ( len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0 ) {
                //xApp -> Subscription Manager
                r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1)
                r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1)
@@ -121,7 +121,7 @@ func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoin
        }
 }
 
-func (r *Rpe) generateSubscriptionRoutes(e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
        rtmgr.Logger.Debug("rpe.addSubscriptionRoutes invoked")
        subscriptionList := &rtmgr.Subs
        for _, subscription := range *subscriptionList {
@@ -129,15 +129,18 @@ func (r *Rpe) generateSubscriptionRoutes(e2TermEp *rtmgr.Endpoint, subManEp *rtm
                xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port))
                rtmgr.Logger.Debug("xApp UUID: %v", xAppUuid)
                xAppEp := getEndpointByUuid(xAppUuid)
-               //Subscription Manager -> xApp
-               r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
-               r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
-               r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
-               r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
-               //E2 Termination -> xApp
-               r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID)
-               r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID)
-               r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID)
+               if xAppEp.Uuid == selectedxAppEp.Uuid { 
+                       rtmgr.Logger.Debug("xApp UUID is matched for selected xApp.UUID: %v and xApp.Name: %v", selectedxAppEp.Uuid, selectedxAppEp.Name)
+                       //Subscription Manager -> xApp
+                       r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
+                       r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
+                       r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
+                       r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
+                       //E2 Termination -> xApp
+                       r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID)
+                       r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID)
+                       r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID)
+               }
        }
 }
 
@@ -227,9 +230,9 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable
 
        for _, endPoint := range endPointList {
                rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
-               if endPoint.XAppType != sbi.PlatformType && len(endPoint.TxMessages) > 0 && len(endPoint.RxMessages) > 0 {
+               if endPoint.XAppType != sbi.PlatformType && ( len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0 ) {
                        r.generateXappRoutes(endPoint, e2TermEp, subManEp, routeTable)
-                       r.generateSubscriptionRoutes(e2TermEp, subManEp, routeTable)
+                       r.generateSubscriptionRoutes(endPoint, e2TermEp, subManEp, routeTable)
                }
        }
        return routeTable
index 6e3e225..4f56753 100644 (file)
@@ -30,12 +30,11 @@ package sbi
 
 import (
        "errors"
-       "routing-manager/pkg/rtmgr"
-       "strconv"
-
        "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/push"
        _ "nanomsg.org/go/mangos/v2/transport/all"
+       "routing-manager/pkg/rtmgr"
+       "strconv"
 )
 
 type NngPush struct {
@@ -61,6 +60,7 @@ func createNewPushSocket() (NngSocket, error) {
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
        rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
+       rtmgr.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
        for _, ep := range rtmgr.Eps {
                uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
                if uri == pipe.Address() {
index 7d07160..2737cc1 100644 (file)
@@ -64,6 +64,7 @@ type Sbi struct {
 }
 
 func (s *Sbi) pruneEndpointList(sbi Engine) {
+       rtmgr.Logger.Debug("pruneEndpointList invoked.")
        for _, ep := range rtmgr.Eps {
                if !ep.Keepalive {
                        rtmgr.Logger.Debug("deleting %v", ep)