Handling of synchronous RMR messages 56/3156/1
authorwahidw <abdulwahid.w@nokia.com>
Sat, 4 Apr 2020 16:48:14 +0000 (16:48 +0000)
committerwahidw <abdulwahid.w@nokia.com>
Mon, 6 Apr 2020 07:32:39 +0000 (07:32 +0000)
Change-Id: I792f146bdd0ead8097ab9460b5c5797915f72e5e
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
Dockerfile
cmd/rtmgr.go
pkg/nbi/httprestful.go
pkg/rpe/rmr.go
pkg/rpe/rpe.go
pkg/rpe/types.go
pkg/sbi/nngpush.go
pkg/sbi/sbi.go
pkg/sbi/types.go
uta_rtg_ric.rt [new file with mode: 0644]

index 2919ebf..96e7a25 100644 (file)
@@ -26,7 +26,7 @@
 FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:4-u18.04-nng as rtmgrbuild
 
 # Install RMr shared library
-ARG RMRVERSION=3.6.0
+ARG RMRVERSION=3.6.2
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb
 # Install RMr development header files
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb
@@ -83,6 +83,7 @@ COPY --from=rtmgrbuild /go/bin/rtmgr /
 COPY --from=rtmgrbuild /run_rtmgr.sh /
 COPY --from=rtmgrbuild /usr/local/include /usr/local/include
 COPY --from=rtmgrbuild /usr/local/lib /usr/local/lib
+COPY "uta_rtg_ric.rt" /
 RUN ldconfig
 RUN apt-get update && apt-get install -y iputils-ping net-tools curl tcpdump
 RUN mkdir /db && touch /db/rt.json && chmod 777 /db/rt.json
index 63b67b7..f3cbc5e 100644 (file)
@@ -77,8 +77,8 @@ func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine
                                continue
                        }
                        sbiEngine.UpdateEndpoints(data)
-                       policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
-                       err = sbiEngine.DistributeAll(policies)
+                       route_table, meid_table := rpeEngine.GenerateRouteTables(rtmgr.Eps, data)
+                       err = sbiEngine.DistributeRouteTables(route_table, meid_table)
                        if err != nil {
                                xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
                        }
index e764467..80d55c4 100644 (file)
@@ -35,8 +35,8 @@ import (
        "encoding/json"
        "errors"
        "fmt"
-       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        xfmodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "github.com/go-openapi/loads"
        "github.com/go-openapi/runtime/middleware"
        "net"
@@ -574,40 +574,41 @@ func PopulateE2TMap(e2tDataList *[]rtmgr.E2tIdentity, e2ts map[string]rtmgr.E2TI
                }
 
                e2ts[e2tinst.Fqdn] = e2tinst
-               meids = append(meids,str)
+               meids = append(meids, str)
        }
 }
 
 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, e2murl string, sdlEngine sdl.Engine) error {
        xapp.Logger.Info("Invoked retrieveStartupData ")
-    var readErr error
-   var maxRetries = 10
+       var readErr error
+       var err error
+       var maxRetries = 10
        var xappData *[]rtmgr.XApp
        xappData = new([]rtmgr.XApp)
        xapp.Logger.Info("Trying to fetch XApps data from XAPP manager")
-    for i := 1; i <= maxRetries; i++ {
-       time.Sleep(2 * time.Second)
+       for i := 1; i <= maxRetries; i++ {
+               time.Sleep(2 * time.Second)
 
-          readErr = nil
-       xappData, err := httpGetXApps(xmurl)
-       if xappData != nil && err == nil {
+               readErr = nil
+               xappData, err := httpGetXApps(xmurl)
+               if xappData != nil && err == nil {
                        break
-       } else if err == nil {
-               readErr = errors.New("unexpected HTTP status code")
-       } else {
-               xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
-               readErr = err
-       }
-    }
-
-       if ( readErr != nil) {
-               return readErr
+               } else if err == nil {
+                       readErr = errors.New("unexpected HTTP status code")
+               } else {
+                       xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
+                       readErr = err
+               }
+       }
+
+       if readErr != nil {
+               return readErr
        }
 
        var meids []string
        e2ts := make(map[string]rtmgr.E2TInstance)
        xapp.Logger.Info("Trying to fetch E2T data from E2manager")
-        for i := 1; i <= maxRetries; i++ {
+       for i := 1; i <= maxRetries; i++ {
 
                readErr = nil
                e2tDataList, err := httpGetE2TList(e2murl)
@@ -620,50 +621,50 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile
                        xapp.Logger.Warn("cannot get E2T data from E2M due to: " + err.Error())
                        readErr = err
                }
-       time.Sleep(2 * time.Second)
+               time.Sleep(2 * time.Second)
        }
 
-       if ( readErr != nil) {
-               return readErr
+       if readErr != nil {
+               return readErr
        }
 
        pcData, confErr := rtmgr.GetPlatformComponents(configfile)
-    if confErr != nil {
-            xapp.Logger.Error(confErr.Error())
-            return confErr
-    }
-    xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.")
-    // Combine the xapps data and platform data before writing to the SDL
+       if confErr != nil {
+               xapp.Logger.Error(confErr.Error())
+               return confErr
+       }
+       xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.")
+       // Combine the xapps data and platform data before writing to the SDL
        ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: e2ts, MeidMap: meids}
-    writeErr := sdlEngine.WriteAll(fileName, ricData)
-    if writeErr != nil {
-            xapp.Logger.Error(writeErr.Error())
-    }
+       writeErr := sdlEngine.WriteAll(fileName, ricData)
+       if writeErr != nil {
+               xapp.Logger.Error(writeErr.Error())
+       }
 
        xapp.Logger.Info("Trying to fetch Subscriptions data from Subscription manager")
-/*    for i := 1; i <= maxRetries; i++ {
-               readErr = nil
-               sub_list, err := xapp.Subscription.QuerySubscriptions()
+       /*    for i := 1; i <= maxRetries; i++ {
+                       readErr = nil
+                       sub_list, err := xapp.Subscription.QuerySubscriptions()
 
-               if sub_list != nil && err == nil {
-                       PopulateSubscription(sub_list)
-                       break
-               } else {
-                       readErr = err
-                       xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error())
-        }
-               time.Sleep(2 * time.Second)
-       }
+                       if sub_list != nil && err == nil {
+                               PopulateSubscription(sub_list)
+                               break
+                       } else {
+                               readErr = err
+                               xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error())
+               }
+                       time.Sleep(2 * time.Second)
+               }
 
-       if (readErr != nil) {
-        return readErr
+               if (readErr != nil) {
+               return readErr
+               }
+       */
+       // post subscription req to appmgr
+       readErr = PostSubReq(xmurl, nbiif)
+       if readErr == nil {
+               return nil
        }
-*/
-    // post subscription req to appmgr
-    readErr = PostSubReq(xmurl, nbiif)
-    if readErr == nil {
-            return nil
-    }
 
        return readErr
 }
@@ -873,7 +874,7 @@ func PopulateSubscription(sub_list xfmodel.SubscriptionList) {
 
                        stringSlice := strings.Split(ep, ":")
                        subdata.Address = &stringSlice[0]
-                       intportval, _ := strconv.Atoi( stringSlice[1])
+                       intportval, _ := strconv.Atoi(stringSlice[1])
                        value := uint16(intportval)
                        subdata.Port = &value
                        xapp.Logger.Debug("Adding Subscription List has Address :%v, port :%v, SubscriptionID :%v ", subdata.Address, subdata.Address, subdata.SubscriptionID)
index 346644c..0043dc9 100644 (file)
@@ -102,11 +102,64 @@ func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents,
        return &rawrt
 }
 
+/*
+Produces the raw route message consumable by RMR
+*/
+func (r *Rmr) generateRMRRouteTables(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents, key string) (*[]string, *[]string) {
+       rawrt := []string{key + "newrt|start\n"}
+       rt := r.generateRouteTable(eps)
+       for _, rte := range *rt {
+               rawrte := key + "mse|" + rte.MessageType
+               for _, tx := range rte.TxList {
+                       rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port))
+               }
+               rawrte += "|" + strconv.Itoa(int(rte.SubID)) + "|"
+               group := ""
+               for _, rxg := range rte.RxGroups {
+                       member := ""
+                       for _, rx := range rxg {
+                               if member == "" {
+                                       member += rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+                               } else {
+                                       member += "," + rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+                               }
+                       }
+                       if group == "" {
+                               group += member
+                       } else {
+                               group += ";" + member
+                       }
+               }
+               rawrte += group
+
+                if (rte.RouteType == "%meid") {
+                        rawrte += group + rte.RouteType
+                }
+
+               rawrt = append(rawrt, rawrte+"\n")
+       }
+       rawrt = append(rawrt, key+"newrt|end\n")
+        count := 0
+
+       meidrt := []string{key +"meid_map|start\n"}
+        for _, value := range rcs.MeidMap {
+            meidrt = append(meidrt, key + value + "\n")
+            count++
+        }
+        meidrt = append(meidrt, key+"meid_map|end|" + strconv.Itoa(count) +"\n")
+
+       xapp.Logger.Debug("rmr.generateRMRRouteTables returns: %v", rawrt)
+       xapp.Logger.Debug("rmr.generateRMRRouteTables returns: %v", meidrt)
+       xapp.Logger.Debug("rmr.generateRMRRouteTables returns: %v", rcs)
+       return &rawrt, &meidrt
+}
+
 func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) *[]string {
        xapp.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps)
        return r.generateRMRPolicies(eps, rcs, "")
 }
 
-func (r *RmrPush) GenerateRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
-       return r.generateRouteTable(eps)
+func (r *RmrPush) GenerateRouteTables(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) (*[]string, *[]string) {
+       xapp.Logger.Debug("Invoked rmr.GenerateRouteTables, args: %v: ", eps)
+       return r.generateRMRRouteTables(eps, rcs, "")
 }
index d26a704..be84fb7 100644 (file)
@@ -182,6 +182,81 @@ func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoin
 
 }
 
+func (r *Rpe) generateXappToXappRoutes(RecvxAppEp *rtmgr.Endpoint, endPointList rtmgr.Endpoints, routeTable *rtmgr.RouteTable) {
+    xapp.Logger.Debug("rpe.generateXappToXappRoutes invoked")
+
+    for _, rxmsg := range RecvxAppEp.RxMessages {
+
+        var src_present bool
+        xapp.Logger.Debug("RecvxAppEp.RxMessages Endpoint: %v, xAppType: %v and rxmsg: %v ", RecvxAppEp.Name, RecvxAppEp.XAppType, rxmsg)
+        if (rxmsg != "RIC_SUB_RESP" && rxmsg != "RIC_SUB_FAILURE" && rxmsg != "RIC_SUB_DEL_RESP" && rxmsg != "RIC_SUB_DEL_FAILURE" && rxmsg !=               "RIC_INDICATION" && rxmsg != "RIC_CONTROL_ACK" && rxmsg != "RIC_CONTROL_FAILURE" && rxmsg != "A1_POLICY_REQ") {
+            for _, SrcxAppEp := range endPointList {
+                if SrcxAppEp.XAppType != sbi.PlatformType && (len(SrcxAppEp.TxMessages) > 0) && SrcxAppEp.Name != RecvxAppEp.Name {
+                    for _, txmsg := range SrcxAppEp.TxMessages {
+                            if (rxmsg == txmsg) {
+                                r.addRoute(rxmsg, SrcxAppEp, RecvxAppEp, routeTable, -1, "")
+                                src_present = true
+                                break
+                            }
+                    }
+                }
+            }
+            if src_present == false {
+                r.addRoute(rxmsg, nil, RecvxAppEp, routeTable, -1, "")
+            }
+        }
+
+    }
+}
+
+func (r *Rpe) generateXappToXappRoutes(RecvxAppEp *rtmgr.Endpoint, endPointList rtmgr.Endpoints, routeTable *rtmgr.RouteTable) {
+
+       xapp.Logger.Debug("rpe.generateXappToXappRoutes invoked")
+
+
+       for _, rxmsg := range RecvxAppEp.RxMessages {
+
+
+               var src_present bool
+
+               xapp.Logger.Debug("RecvxAppEp.RxMessages Endpoint: %v, xAppType: %v and rxmsg: %v ", RecvxAppEp.Name, RecvxAppEp.XAppType, rxmsg)
+
+               if (rxmsg != "RIC_SUB_RESP" && rxmsg != "RIC_SUB_FAILURE" && rxmsg != "RIC_SUB_DEL_RESP" && rxmsg != "RIC_SUB_DEL_FAILURE" && rxmsg != "RIC_INDICATION" && rxmsg != "RIC_CONTROL_ACK" && rxmsg != "RIC_CONTROL_FAILURE" && rxmsg != "A1_POLICY_REQ") {
+
+                       for _, SrcxAppEp := range endPointList {
+
+                               if SrcxAppEp.XAppType != sbi.PlatformType && (len(SrcxAppEp.TxMessages) > 0) && SrcxAppEp.Name != RecvxAppEp.Name {
+
+                                       for _, txmsg := rnge SrcxAppEp.TxMessages {
+       
+                                                       if (rxmsg == txmsg) {
+       
+                                                               r.addRoute(rxmsg, SrcxAppEp, RecvxAppEp, routeTable, -1, "")
+       
+                                                               src_present = true
+       
+                                                               break
+       
+                                                       }
+       
+                                       }
+       
+                               }
+       
+                       }
+       
+                       if src_present == false {
+       
+                               r.addRoute(rxmsg, nil, RecvxAppEp, routeTable, -1, "")
+       
+                       }
+       
+               }
+       
+       
+       }
+       
+}
 func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
        xapp.Logger.Debug("rpe.addSubscriptionRoutes invoked")
        subscriptionList := &rtmgr.Subs
@@ -296,6 +371,7 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable
                if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
                        r.generateXappRoutes(endPoint, subManEp, routeTable)
                        r.generateSubscriptionRoutes(endPoint, subManEp, routeTable)
+                       r.generateXappToXappRoutes(endPoint, endPointList, routeTable)
                }
        }
        return routeTable
index 4a5b9fc..9db9669 100644 (file)
@@ -44,5 +44,5 @@ type EngineConfig struct {
 
 type Engine interface {
        GeneratePolicies(rtmgr.Endpoints, *rtmgr.RicComponents) *[]string
-       GenerateRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable
+       GenerateRouteTables(rtmgr.Endpoints, *rtmgr.RicComponents) (*[]string, *[]string)
 }
index 4451299..5ab4480 100644 (file)
@@ -46,6 +46,7 @@ import (
        "bytes"
        "crypto/md5"
        "errors"
+       "fmt"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/push"
@@ -53,9 +54,13 @@ import (
        "routing-manager/pkg/rtmgr"
        "strconv"
        "time"
-       "fmt"
 )
 
+type EPStatus struct {
+       endpoint string
+       status   bool
+}
+
 type NngPush struct {
        Sbi
        NewSocket CreateNewNngSocketHandler
@@ -63,15 +68,14 @@ type NngPush struct {
 }
 
 type RMRParams struct {
-        *xapp.RMRParams
+       *xapp.RMRParams
 }
 
-
 func (params *RMRParams) String() string {
-        var b bytes.Buffer
-        sum := md5.Sum(params.Payload)
-        fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
-        return b.String()
+       var b bytes.Buffer
+       sum := md5.Sum(params.Payload)
+       fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
+       return b.String()
 }
 
 func NewNngPush() *NngPush {
@@ -122,10 +126,10 @@ func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
        xapp.Logger.Debug("args: %v", *ep)
        endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
        ep.Whid = int(xapp.Rmr.Openwh(endpoint))
-       if ep.Whid < 0   {
+       if ep.Whid < 0 {
                return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
-       }else {
-               xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
+       } else {
+               xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
        }
 
        return nil
@@ -175,7 +179,7 @@ func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
                params := &RMRParams{&xapp.RMRParams{}}
                params.Mtype = 20
                params.PayloadLen = len([]byte(pe))
-               params.Payload =[]byte(pe)
+               params.Payload = []byte(pe)
                params.Mbuf = nil
                params.Whid = ep.Whid
                time.Sleep(1 * time.Millisecond)
@@ -184,7 +188,7 @@ func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
        xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
 }
 
-func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
+func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
        return c.createEndpoint(payload, c)
 }
 
@@ -197,3 +201,92 @@ func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
        return nil
 }
 
+func (c *NngPush) DistributeRouteTables(route_table *[]string, meid_table *[]string) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeRouteTables")
+       xapp.Logger.Debug("args route_table: %v", route_table)
+       xapp.Logger.Debug("args meid_table: %v", meid_table)
+
+       channel := make(chan EPStatus)
+
+       var i int = 2
+
+       for _, ep := range rtmgr.Eps {
+               go c.send_sync(ep, route_table, meid_table, channel, i)
+               i = i + 1
+       }
+
+       count := 0
+       result := make([]EPStatus, len(rtmgr.Eps))
+       for i, _ := range result {
+               result[i] = <-channel
+               if result[i].status == true {
+                       count++
+               } else {
+                       xapp.Logger.Error("RMR send is failed for endpoint %v", result[i].endpoint)
+               }
+       }
+
+       if count < len(rtmgr.Eps) {
+               return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
+       }
+
+       return nil
+}
+
+func (c *NngPush) send_sync(ep *rtmgr.Endpoint, route_table *[]string, meidtable *[]string, channel chan EPStatus, call_id int) {
+       xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+
+       ret := c.send_data(ep, route_table, call_id)
+
+       if ret == true {
+               ret = c.send_data(ep, meidtable, call_id)
+       }
+       channel <- EPStatus{ep.Uuid, ret}
+
+}
+
+/*
+
+       1. first n-1 records rmr_wh_send (async send)
+       2. last record rmr_wh_call (sync send)
+
+*/
+
+func (c *NngPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
+       xapp.Logger.Debug("sync send route data to endpoint: " + ep.Uuid + " call_id: " + string(call_id))
+       var state int
+       var retstr string
+
+       length := len(*policies)
+
+       for index, pe := range *policies {
+
+               params := &RMRParams{&xapp.RMRParams{}}
+               params.Mtype = 20
+               params.PayloadLen = len([]byte(pe))
+               params.Payload = []byte(pe)
+               params.Mbuf = nil
+               params.Whid = ep.Whid
+               if index == length-1 {
+                       params.Callid = call_id
+                       params.Timeout = 200
+                       state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+                       if state != C.RMR_OK {
+                               xapp.Logger.Error("sync send route data to endpoint: " + ep.Uuid + " is failed,   call_id: " + string(call_id) + " for xapp.Rmr.SendCallMsg " + " return payload: " + retstr)
+                               return false
+                       } else {
+                               xapp.Logger.Info("sync send route data to endpoint: " + ep.Uuid + " is success,  call_id: " + string(call_id) + " return payload: " + retstr)
+                               return true
+                       }
+
+               } else {
+                       if xapp.Rmr.SendMsg(params.RMRParams) != true {
+                               xapp.Logger.Error("sync send route data to endpoint: " + ep.Uuid + " is failed, call_id: " + string(call_id) + " for xapp.Rmr.SendMsg")
+                               return false
+                       }
+               }
+       }
+
+       xapp.Logger.Error("sync send route data to endpoint: " + ep.Uuid + " is failed, call_id: " + string(call_id) + " xapp.Rmr.SendCallMsg is not called")
+       return false
+}
index 58fe7d8..be415f3 100644 (file)
@@ -31,7 +31,9 @@ package sbi
 
 import (
        "errors"
+       "fmt"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "net"
        "routing-manager/pkg/rtmgr"
        "strconv"
        "strings"
@@ -107,7 +109,7 @@ func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbi Engine) {
                }
        }
        s.updatePlatformEndpoints(&((*rcs).Pcs), sbi)
-        s.updateE2TEndpoints(&((*rcs).E2Ts), sbi)
+       s.updateE2TEndpoints(&((*rcs).E2Ts), sbi)
        s.pruneEndpointList(sbi)
 }
 
@@ -141,48 +143,59 @@ func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine)
 }
 
 func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) {
-        xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts)
-        for _, e2t := range *E2Ts {
-                uuid := e2t.Fqdn
-                stringSlice := strings.Split(e2t.Fqdn, ":")
-                ipaddress := stringSlice[0]
-                port, _ := strconv.Atoi(stringSlice[1])
-                if _, ok := rtmgr.Eps[uuid]; ok {
-                        rtmgr.Eps[uuid].Keepalive = true
-                } else {
-                        ep := &rtmgr.Endpoint{
-                                Uuid:       uuid,
-                                Name:       e2t.Name,
-                                XAppType:   PlatformType,
-                                Ip:         ipaddress,
-                                Port:       uint16(port),
-                                TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"],
-                                RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"],
-                                Socket:     nil,
-                                IsReady:    false,
-                                Keepalive:  true,
-                        }
-                        xapp.Logger.Debug("ep created: %v", ep)
-                        if err := sbi.AddEndpoint(ep); err != nil {
-                                xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
-                                continue
-                        }
-                        rtmgr.Eps[uuid] = ep
-                }
-        }
+       xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts)
+       for _, e2t := range *E2Ts {
+               uuid := e2t.Fqdn
+               stringSlice := strings.Split(e2t.Fqdn, ":")
+               ipaddress := stringSlice[0]
+               port, _ := strconv.Atoi(stringSlice[1])
+               if _, ok := rtmgr.Eps[uuid]; ok {
+                       rtmgr.Eps[uuid].Keepalive = true
+               } else {
+                       ep := &rtmgr.Endpoint{
+                               Uuid:       uuid,
+                               Name:       e2t.Name,
+                               XAppType:   PlatformType,
+                               Ip:         ipaddress,
+                               Port:       uint16(port),
+                               TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"],
+                               RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"],
+                               Socket:     nil,
+                               IsReady:    false,
+                               Keepalive:  true,
+                       }
+                       xapp.Logger.Debug("ep created: %v", ep)
+                       if err := sbi.AddEndpoint(ep); err != nil {
+                               xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+                               continue
+                       }
+                       rtmgr.Eps[uuid] = ep
+               }
+       }
 }
 
-func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) {
+func (s *Sbi) createEndpoint(payload string, sbi Engine) *rtmgr.Endpoint {
        xapp.Logger.Debug("CreateEndPoint %v", payload)
        stringSlice := strings.Split(payload, " ")
        uuid := stringSlice[0]
-       xapp.Logger.Debug(">>> uuid %v",  stringSlice[0])
-
+       xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
 
        if _, ok := rtmgr.Eps[uuid]; ok {
                ep := rtmgr.Eps[uuid]
                return ep
        }
 
+       /* incase the stored Endpoint list is in the form of IP:port*/
+       stringsubsplit := strings.Split(uuid, ":")
+       addr, err := net.LookupIP(stringsubsplit[0])
+       if err == nil {
+               convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1])
+               xapp.Logger.Info(" IP:Port received is %s", convertedUuid)
+               if _, ok := rtmgr.Eps[convertedUuid]; ok {
+                       ep := rtmgr.Eps[convertedUuid]
+                       return ep
+               }
+       }
+
        return nil
 }
index c0ab373..c5b6de6 100644 (file)
@@ -47,6 +47,7 @@ type Engine interface {
        UpdateEndpoints(*rtmgr.RicComponents)
        CreateEndpoint(string) (*rtmgr.Endpoint)
        DistributeToEp(*[]string, *rtmgr.Endpoint) error
+       DistributeRouteTables(route_table *[]string, meid_table *[]string) error
 }
 
 type NngSocket interface {
diff --git a/uta_rtg_ric.rt b/uta_rtg_ric.rt
new file mode 100644 (file)
index 0000000..70046b3
--- /dev/null
@@ -0,0 +1,2 @@
+newrt|start
+newrt|end