Publish Route only once to the endpoint that requests it. Periodic/Event based distri... 91/4591/1
authorwahidw <abdulwahid.w@nokia.com>
Sat, 22 Aug 2020 15:04:59 +0000 (20:34 +0530)
committerwahidw <abdulwahid.w@nokia.com>
Sat, 22 Aug 2020 15:05:08 +0000 (20:35 +0530)
Change-Id: Ieae5db99d62e580a2bc8d9e5ae3a00963b0618a5
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
RELNOTES
container-tag.yaml
pkg/nbi/control.go
pkg/sbi/nngpush.go
pkg/sbi/nngpush_test.go
pkg/sbi/sbi.go
pkg/sbi/types.go
pkg/stub/stub.go

index c09f8c0..8a1ed82 100644 (file)
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.6.7
+* Publish Route only once to the endpoint that requests it. Periodic/Event based distribution will be done only for the process with RMR Control Port 4561.
+
 ### v0.6.5
 * Removed Book Keeping of RMR CTL ports. Route Distribution on demand.
 
index 1bbe2d3..041cba7 100644 (file)
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.6.6
+tag: 0.6.7
index e31379d..db6c405 100644 (file)
@@ -144,14 +144,22 @@ func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams) {
                return
        }
 
-        ep,whid := sbiEngine.CreateEndpoint(string(params.Payload),msg.String())
-       if ep == nil || whid < 0 {
+       ep := sbiEngine.CheckEndpoint(string(params.Payload))
+       if ep == nil {
                xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
                return
        }
+       epstr,whid := sbiEngine.CreateEndpoint(msg.String())
+       if epstr == nil || whid < 0 {
+               xapp.Logger.Error("Wormhole Id creation failed %d for %s",whid,msg.String() )
+               return
+       }
 
+       /*This is to ensure the latest routes are sent.
+       Assumption is that in this time interval the routes are built for this endpoint */
+        time.Sleep(100 * time.Millisecond)
        policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
-       err = sbiEngine.DistributeToEp(policies, *ep, whid)
+       err = sbiEngine.DistributeToEp(policies, *epstr, whid)
        if err != nil {
                xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
                return
index 1b0bed0..4b7f871 100644 (file)
@@ -177,20 +177,21 @@ func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int)
         params.Timeout = 200
         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
        routestatus := strings.Split(retstr," ")
-        if state != C.RMR_OK && routestatus[0] == "OK" {
+        if state != C.RMR_OK && routestatus[0] != "OK" {
               xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
               return false
         } else {
                xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
               return true
         }
+}
 
-        xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
-        return false
+func (c *RmrPush) CheckEndpoint(payload string)(ep *rtmgr.Endpoint)  {
+       return c.checkEndpoint(payload)
 }
 
-func (c *RmrPush) CreateEndpoint(payload string,rmrsrc string)(ep *string,whid int)  {
-       return c.createEndpoint(payload,rmrsrc, c)
+func (c *RmrPush) CreateEndpoint(rmrsrc string)(ep *string,whid int)  {
+       return c.createEndpoint(rmrsrc)
 }
 
 func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
@@ -230,15 +231,12 @@ func (c *RmrPush) sendDynamicRoutes(ep string,whid int, policies *[]string, call
         params.Timeout = 200
         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
         routestatus := strings.Split(retstr," ")
-        if state != C.RMR_OK && routestatus[0] == "OK" {
+        if state != C.RMR_OK && routestatus[0] != "OK" {
                 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
               return false
         } else {
-                xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whi_id: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+                xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
       return true
         }
-
-        xapp.Logger.Error("Route Update to endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " xapp.Rmr.SendCallMsg not called")
-        return false
 }
 
index 79821eb..aae9121 100644 (file)
@@ -27,6 +27,7 @@
 */
 package sbi
 
+
 import (
        //"errors"
        "routing-manager/pkg/rtmgr"
@@ -117,6 +118,7 @@ func TestRmrPushDistributeAll(t *testing.T) {
        var rmrpush = RmrPush{}
        resetTestPushDataset(rmrpush, stub.ValidEndpoints)
 
+       rmrcallid = 200
        err = rmrpush.DistributeAll(stub.ValidPolicies)
        if err != nil {
                t.Errorf("rmrpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil")
@@ -131,6 +133,7 @@ func TestDistributeToEp(t *testing.T) {
        var rmrpush = RmrPush{}
        resetTestPushDataset(rmrpush, stub.ValidEndpoints)
 
+       rmrdynamiccallid = 255
        err = rmrpush.DistributeToEp(stub.ValidPolicies,"localhost:4561",100)
        if err != nil {
                t.Errorf("rmrpush.DistributetoEp(policies) was incorrect, got: %v, want: %v.", err, "nil")
@@ -148,11 +151,18 @@ func TestDeleteEndpoint(t *testing.T) {
        }
 }
 
+func TestCheckEndpoint(t *testing.T) {
+       var rmrpush = RmrPush{}
+       resetTestPushDataset(rmrpush, stub.ValidEndpoints1)
+       rmrpush.CheckEndpoint("192.168.0.1:0")
+       rmrpush.CheckEndpoint("10.2.2.1:0")
+       rmrpush.CheckEndpoint("localhost:0")
+}
+
 func TestCreateEndpoint(t *testing.T) {
        var rmrpush = RmrPush{}
        resetTestPushDataset(rmrpush, stub.ValidEndpoints1)
-       rmrpush.CreateEndpoint("192.168.0.1:0","Src=192.168.0.1:4561")
-       rmrpush.CreateEndpoint("localhost:4560","Src=192.168.11.1:4444")
+       rmrpush.CreateEndpoint("Src=127.0.0.1:4561 hello")
 }
 /*
 Initialize and send policies
@@ -163,3 +173,26 @@ func TestRmrPushInitializeandsendPolicies(t *testing.T) {
         policies := []string{"hello","welcome"}
        rmrpush.send_data(rtmgr.Eps["localhost"],&policies,1)
 }
+
+func TestString( t *testing.T) {
+       var params xapp.RMRParams
+       params.Payload = []byte("abcdefgh")
+       params.Meid = &xapp.RMRMeid{}
+       msg := RMRParams{&params}
+       msg.String()
+
+}
+
+func TestSenddata(t *testing.T) {
+       var rmrpush = RmrPush{}
+       ep := rtmgr.Endpoint{Whid:-1, Ip:"1.1.1.1"}
+       policies := []string{"mse|12345|-1|local.com"}
+       rmrpush.send_data(&ep, &policies,300)
+}
+
+func TestSendDynamicdata(t *testing.T) {
+       var rmrpush = RmrPush{}
+       ep := "1.1.1.1"
+       policies := []string{"mse|12345|-1|local.com"}
+       rmrpush.sendDynamicRoutes(ep,1, &policies,300)
+}
index 21ff78e..8833987 100644 (file)
@@ -31,9 +31,9 @@ package sbi
 
 import (
        "errors"
-       //"fmt"
+       "fmt"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-       //"net"
+       "net"
        "routing-manager/pkg/rtmgr"
        "strconv"
        "strings"
@@ -123,16 +123,16 @@ func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine)
                        rtmgr.Eps[uuid].Keepalive = true
                } else {
                        ep := &rtmgr.Endpoint{
-                               Uuid:       uuid,
-                               Name:       pc.Name,
-                               XAppType:   PlatformType,
-                               Ip:         pc.Fqdn,
-                               Port:       pc.Port,
+                               Uuid:     uuid,
+                               Name:     pc.Name,
+                               XAppType: PlatformType,
+                               Ip:       pc.Fqdn,
+                               Port:     pc.Port,
                                //TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
                                //RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
-                               Socket:     nil,
-                               IsReady:    false,
-                               Keepalive:  true,
+                               Socket:    nil,
+                               IsReady:   false,
+                               Keepalive: true,
                        }
                        xapp.Logger.Debug("ep created: %v", ep)
                        if err := sbi.AddEndpoint(ep); err != nil {
@@ -155,16 +155,16 @@ func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine)
                        rtmgr.Eps[uuid].Keepalive = true
                } else {
                        ep := &rtmgr.Endpoint{
-                               Uuid:       uuid,
-                               Name:       e2t.Name,
-                               XAppType:   PlatformType,
-                               Ip:         ipaddress,
-                               Port:       uint16(port),
+                               Uuid:     uuid,
+                               Name:     e2t.Name,
+                               XAppType: PlatformType,
+                               Ip:       ipaddress,
+                               Port:     uint16(port),
                                //TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"],
                                //RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"],
-                               Socket:     nil,
-                               IsReady:    false,
-                               Keepalive:  true,
+                               Socket:    nil,
+                               IsReady:   false,
+                               Keepalive: true,
                        }
                        xapp.Logger.Debug("ep created: %v", ep)
                        if err := sbi.AddEndpoint(ep); err != nil {
@@ -176,40 +176,47 @@ func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine)
        }
 }
 
-func (s *Sbi) createEndpoint(payload string,rmrsrc string, sbi Engine) (*string,int) {
-       xapp.Logger.Debug("CreateEndPoint %v", payload)
-//     stringSlice := strings.Split(payload, " ")
-//     uuid := stringSlice[0]
-//     xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
-
-/*     if _, ok := rtmgr.Eps[uuid]; ok {
-               ep := rtmgr.Eps[uuid]
-               return ep
-       }*/
+func (s *Sbi) checkEndpoint(payload string) *rtmgr.Endpoint {
+       /* Payload contains endpoint in the form of IP<domain name>:Port.
+       Port is data port of sender endpoint.
+       Eps contains the UUID in the form of IP<domain name>:Port.
+       Port is the Application Port(http) */
+
+       xapp.Logger.Debug("Invoked checkEndPoint %v", payload)
+       stringSlice := strings.Split(payload, " ")
+       uuid := stringSlice[0]
+       stringsubsplit := strings.Split(uuid, ":")
+       xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
+       for _, ep := range rtmgr.Eps {
+               if strings.Contains(ep.Uuid, stringsubsplit[0]) == true {
+                       endpoint := rtmgr.Eps[ep.Uuid]
+                       return endpoint
+               }
+       }
 
        /* incase the stored Endpoint list is in the form of IP:port*/
-/*     stringsubsplit := strings.Split(uuid, ":")
        addr, err := net.LookupIP(stringsubsplit[0])
        if err == nil {
                convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1])
                xapp.Logger.Info(" IP:Port received is %s", convertedUuid)
-               if _, ok := rtmgr.Eps[convertedUuid]; ok {
-                       ep := rtmgr.Eps[convertedUuid]
-                       return ep
+               IP := fmt.Sprintf("%s", addr[0])
+               for _, ep := range rtmgr.Eps {
+                       res := strings.Contains(ep.Uuid, IP)
+                       if res == true {
+                               endpoint := rtmgr.Eps[ep.Uuid]
+                               return endpoint
+                       }
                }
-       }*/
+       }
+       return nil
+}
 
+func (s *Sbi) createEndpoint(rmrsrc string) (*string, int) {
        /* Create a new mapping, this case is assumed for multiple process sending RMR request from a container */
-       srcString := strings.Split(rmrsrc," ")
-       srcStringSlice := strings.Split(srcString[0],"=")
+       srcString := strings.Split(rmrsrc, " ")
+       srcStringSlice := strings.Split(srcString[0], "=")
        Whid := int(xapp.Rmr.Openwh(srcStringSlice[1]))
 
-       xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s",Whid,srcStringSlice[1])
-       if Whid > 0 {
-//             rtmgr.RmrEp[srcStringSlice[1]] = Whid
-               xapp.Logger.Info("received %s and mapped to Whid = %d",srcStringSlice[1],Whid)
-               return &srcStringSlice[1],Whid
-       }
-
-       return nil,Whid
- }
+       xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s", Whid, srcStringSlice[1])
+       return &srcStringSlice[1], Whid
+}
index e2f14ed..b52dcfa 100644 (file)
@@ -45,7 +45,8 @@ type Engine interface {
        AddEndpoint(*rtmgr.Endpoint) error
        DeleteEndpoint(*rtmgr.Endpoint) error
        UpdateEndpoints(*rtmgr.RicComponents)
-       CreateEndpoint(string,string)(*string,int)
+       CreateEndpoint(string)(*string,int)
+       CheckEndpoint(string)*rtmgr.Endpoint
        DistributeToEp(*[]string, string, int ) error
 }
 
index b3e5f3d..f5f3b72 100644 (file)
@@ -71,6 +71,7 @@ var ValidEndpoints1 = []rtmgr.Endpoint{
         {Uuid: "192.168.0.1:0", Name: "SUBMAN", XAppType: "app2", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: false, Keepalive: false},
         {Uuid: "10.1.1.1:0", Name: "E2MAN", XAppType: "app3", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: false},
         {Uuid: "10.2.2.1:0", Name: "UEMAN", XAppType: "app4", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Policies: []int32{1, 2}, Socket: nil, IsReady: false, Keepalive: true},
+        {Uuid: "127.0.0.1:0", Name: "UEMAN1", XAppType: "app4", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Policies: []int32{1, 2}, Socket: nil, IsReady: false, Keepalive: true},
 }
 
 var ValidSubscriptions = &[]rtmgr.Subscription{