From 4984522994d50af32fe8f0d1b0fbc7339bdf077f Mon Sep 17 00:00:00 2001 From: wahidw Date: Sat, 22 Aug 2020 20:34:59 +0530 Subject: [PATCH] Publish Route only once to the endpoint that requests it. Periodic/Event based distribution will be done only for the process with RMR Control Port 4561 Change-Id: Ieae5db99d62e580a2bc8d9e5ae3a00963b0618a5 Signed-off-by: wahidw --- RELNOTES | 3 ++ container-tag.yaml | 2 +- pkg/nbi/control.go | 14 ++++++-- pkg/sbi/nngpush.go | 18 +++++----- pkg/sbi/nngpush_test.go | 37 +++++++++++++++++-- pkg/sbi/sbi.go | 95 ++++++++++++++++++++++++++----------------------- pkg/sbi/types.go | 3 +- pkg/stub/stub.go | 1 + 8 files changed, 112 insertions(+), 61 deletions(-) diff --git a/RELNOTES b/RELNOTES index c09f8c0..8a1ed82 100644 --- a/RELNOTES +++ b/RELNOTES @@ -1,3 +1,6 @@ +### v0.6.7 +* Publish Route only once to the endpoint that requests it. Periodic/Event based distribution will be done only for the process with RMR Control Port 4561. + ### v0.6.5 * Removed Book Keeping of RMR CTL ports. Route Distribution on demand. diff --git a/container-tag.yaml b/container-tag.yaml index 1bbe2d3..041cba7 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: 0.6.6 +tag: 0.6.7 diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go index e31379d..db6c405 100644 --- a/pkg/nbi/control.go +++ b/pkg/nbi/control.go @@ -144,14 +144,22 @@ func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams) { return } - ep,whid := sbiEngine.CreateEndpoint(string(params.Payload),msg.String()) - if ep == nil || whid < 0 { + ep := sbiEngine.CheckEndpoint(string(params.Payload)) + if ep == nil { xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload)) return } + epstr,whid := sbiEngine.CreateEndpoint(msg.String()) + if epstr == nil || whid < 0 { + xapp.Logger.Error("Wormhole Id creation failed %d for %s",whid,msg.String() ) + return + } + /*This is to ensure the latest routes are sent. + Assumption is that in this time interval the routes are built for this endpoint */ + time.Sleep(100 * time.Millisecond) policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) - err = sbiEngine.DistributeToEp(policies, *ep, whid) + err = sbiEngine.DistributeToEp(policies, *epstr, whid) if err != nil { xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) return diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index 1b0bed0..4b7f871 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -177,20 +177,21 @@ func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) params.Timeout = 200 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams) routestatus := strings.Split(retstr," ") - if state != C.RMR_OK && routestatus[0] == "OK" { + if state != C.RMR_OK && routestatus[0] != "OK" { xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0]) return false } else { xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies))) return true } +} - xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called") - return false +func (c *RmrPush) CheckEndpoint(payload string)(ep *rtmgr.Endpoint) { + return c.checkEndpoint(payload) } -func (c *RmrPush) CreateEndpoint(payload string,rmrsrc string)(ep *string,whid int) { - return c.createEndpoint(payload,rmrsrc, c) +func (c *RmrPush) CreateEndpoint(rmrsrc string)(ep *string,whid int) { + return c.createEndpoint(rmrsrc) } func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error { @@ -230,15 +231,12 @@ func (c *RmrPush) sendDynamicRoutes(ep string,whid int, policies *[]string, call params.Timeout = 200 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams) routestatus := strings.Split(retstr," ") - if state != C.RMR_OK && routestatus[0] == "OK" { + if state != C.RMR_OK && routestatus[0] != "OK" { xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0]) return false } else { - xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whi_id: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies))) + xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies))) return true } - - xapp.Logger.Error("Route Update to endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " xapp.Rmr.SendCallMsg not called") - return false } diff --git a/pkg/sbi/nngpush_test.go b/pkg/sbi/nngpush_test.go index 79821eb..aae9121 100644 --- a/pkg/sbi/nngpush_test.go +++ b/pkg/sbi/nngpush_test.go @@ -27,6 +27,7 @@ */ package sbi + import ( //"errors" "routing-manager/pkg/rtmgr" @@ -117,6 +118,7 @@ func TestRmrPushDistributeAll(t *testing.T) { var rmrpush = RmrPush{} resetTestPushDataset(rmrpush, stub.ValidEndpoints) + rmrcallid = 200 err = rmrpush.DistributeAll(stub.ValidPolicies) if err != nil { t.Errorf("rmrpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil") @@ -131,6 +133,7 @@ func TestDistributeToEp(t *testing.T) { var rmrpush = RmrPush{} resetTestPushDataset(rmrpush, stub.ValidEndpoints) + rmrdynamiccallid = 255 err = rmrpush.DistributeToEp(stub.ValidPolicies,"localhost:4561",100) if err != nil { t.Errorf("rmrpush.DistributetoEp(policies) was incorrect, got: %v, want: %v.", err, "nil") @@ -148,11 +151,18 @@ func TestDeleteEndpoint(t *testing.T) { } } +func TestCheckEndpoint(t *testing.T) { + var rmrpush = RmrPush{} + resetTestPushDataset(rmrpush, stub.ValidEndpoints1) + rmrpush.CheckEndpoint("192.168.0.1:0") + rmrpush.CheckEndpoint("10.2.2.1:0") + rmrpush.CheckEndpoint("localhost:0") +} + func TestCreateEndpoint(t *testing.T) { var rmrpush = RmrPush{} resetTestPushDataset(rmrpush, stub.ValidEndpoints1) - rmrpush.CreateEndpoint("192.168.0.1:0","Src=192.168.0.1:4561") - rmrpush.CreateEndpoint("localhost:4560","Src=192.168.11.1:4444") + rmrpush.CreateEndpoint("Src=127.0.0.1:4561 hello") } /* Initialize and send policies @@ -163,3 +173,26 @@ func TestRmrPushInitializeandsendPolicies(t *testing.T) { policies := []string{"hello","welcome"} rmrpush.send_data(rtmgr.Eps["localhost"],&policies,1) } + +func TestString( t *testing.T) { + var params xapp.RMRParams + params.Payload = []byte("abcdefgh") + params.Meid = &xapp.RMRMeid{} + msg := RMRParams{¶ms} + msg.String() + +} + +func TestSenddata(t *testing.T) { + var rmrpush = RmrPush{} + ep := rtmgr.Endpoint{Whid:-1, Ip:"1.1.1.1"} + policies := []string{"mse|12345|-1|local.com"} + rmrpush.send_data(&ep, &policies,300) +} + +func TestSendDynamicdata(t *testing.T) { + var rmrpush = RmrPush{} + ep := "1.1.1.1" + policies := []string{"mse|12345|-1|local.com"} + rmrpush.sendDynamicRoutes(ep,1, &policies,300) +} diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go index 21ff78e..8833987 100644 --- a/pkg/sbi/sbi.go +++ b/pkg/sbi/sbi.go @@ -31,9 +31,9 @@ package sbi import ( "errors" - //"fmt" + "fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - //"net" + "net" "routing-manager/pkg/rtmgr" "strconv" "strings" @@ -123,16 +123,16 @@ func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) rtmgr.Eps[uuid].Keepalive = true } else { ep := &rtmgr.Endpoint{ - Uuid: uuid, - Name: pc.Name, - XAppType: PlatformType, - Ip: pc.Fqdn, - Port: pc.Port, + Uuid: uuid, + Name: pc.Name, + XAppType: PlatformType, + Ip: pc.Fqdn, + Port: pc.Port, //TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"], //RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"], - Socket: nil, - IsReady: false, - Keepalive: true, + Socket: nil, + IsReady: false, + Keepalive: true, } xapp.Logger.Debug("ep created: %v", ep) if err := sbi.AddEndpoint(ep); err != nil { @@ -155,16 +155,16 @@ func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) rtmgr.Eps[uuid].Keepalive = true } else { ep := &rtmgr.Endpoint{ - Uuid: uuid, - Name: e2t.Name, - XAppType: PlatformType, - Ip: ipaddress, - Port: uint16(port), + Uuid: uuid, + Name: e2t.Name, + XAppType: PlatformType, + Ip: ipaddress, + Port: uint16(port), //TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"], //RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"], - Socket: nil, - IsReady: false, - Keepalive: true, + Socket: nil, + IsReady: false, + Keepalive: true, } xapp.Logger.Debug("ep created: %v", ep) if err := sbi.AddEndpoint(ep); err != nil { @@ -176,40 +176,47 @@ func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) } } -func (s *Sbi) createEndpoint(payload string,rmrsrc string, sbi Engine) (*string,int) { - xapp.Logger.Debug("CreateEndPoint %v", payload) -// stringSlice := strings.Split(payload, " ") -// uuid := stringSlice[0] -// xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) - -/* if _, ok := rtmgr.Eps[uuid]; ok { - ep := rtmgr.Eps[uuid] - return ep - }*/ +func (s *Sbi) checkEndpoint(payload string) *rtmgr.Endpoint { + /* Payload contains endpoint in the form of IP:Port. + Port is data port of sender endpoint. + Eps contains the UUID in the form of IP:Port. + Port is the Application Port(http) */ + + xapp.Logger.Debug("Invoked checkEndPoint %v", payload) + stringSlice := strings.Split(payload, " ") + uuid := stringSlice[0] + stringsubsplit := strings.Split(uuid, ":") + xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) + for _, ep := range rtmgr.Eps { + if strings.Contains(ep.Uuid, stringsubsplit[0]) == true { + endpoint := rtmgr.Eps[ep.Uuid] + return endpoint + } + } /* incase the stored Endpoint list is in the form of IP:port*/ -/* stringsubsplit := strings.Split(uuid, ":") addr, err := net.LookupIP(stringsubsplit[0]) if err == nil { convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1]) xapp.Logger.Info(" IP:Port received is %s", convertedUuid) - if _, ok := rtmgr.Eps[convertedUuid]; ok { - ep := rtmgr.Eps[convertedUuid] - return ep + IP := fmt.Sprintf("%s", addr[0]) + for _, ep := range rtmgr.Eps { + res := strings.Contains(ep.Uuid, IP) + if res == true { + endpoint := rtmgr.Eps[ep.Uuid] + return endpoint + } } - }*/ + } + return nil +} +func (s *Sbi) createEndpoint(rmrsrc string) (*string, int) { /* Create a new mapping, this case is assumed for multiple process sending RMR request from a container */ - srcString := strings.Split(rmrsrc," ") - srcStringSlice := strings.Split(srcString[0],"=") + srcString := strings.Split(rmrsrc, " ") + srcStringSlice := strings.Split(srcString[0], "=") Whid := int(xapp.Rmr.Openwh(srcStringSlice[1])) - xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s",Whid,srcStringSlice[1]) - if Whid > 0 { -// rtmgr.RmrEp[srcStringSlice[1]] = Whid - xapp.Logger.Info("received %s and mapped to Whid = %d",srcStringSlice[1],Whid) - return &srcStringSlice[1],Whid - } - - return nil,Whid - } + xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s", Whid, srcStringSlice[1]) + return &srcStringSlice[1], Whid +} diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go index e2f14ed..b52dcfa 100644 --- a/pkg/sbi/types.go +++ b/pkg/sbi/types.go @@ -45,7 +45,8 @@ type Engine interface { AddEndpoint(*rtmgr.Endpoint) error DeleteEndpoint(*rtmgr.Endpoint) error UpdateEndpoints(*rtmgr.RicComponents) - CreateEndpoint(string,string)(*string,int) + CreateEndpoint(string)(*string,int) + CheckEndpoint(string)*rtmgr.Endpoint DistributeToEp(*[]string, string, int ) error } diff --git a/pkg/stub/stub.go b/pkg/stub/stub.go index b3e5f3d..f5f3b72 100644 --- a/pkg/stub/stub.go +++ b/pkg/stub/stub.go @@ -71,6 +71,7 @@ var ValidEndpoints1 = []rtmgr.Endpoint{ {Uuid: "192.168.0.1:0", Name: "SUBMAN", XAppType: "app2", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: false, Keepalive: false}, {Uuid: "10.1.1.1:0", Name: "E2MAN", XAppType: "app3", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: false}, {Uuid: "10.2.2.1:0", Name: "UEMAN", XAppType: "app4", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Policies: []int32{1, 2}, Socket: nil, IsReady: false, Keepalive: true}, + {Uuid: "127.0.0.1:0", Name: "UEMAN1", XAppType: "app4", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Policies: []int32{1, 2}, Socket: nil, IsReady: false, Keepalive: true}, } var ValidSubscriptions = &[]rtmgr.Subscription{ -- 2.16.6