From 0626bf0cad62977d0052976a4a9f5b58a0042d5d Mon Sep 17 00:00:00 2001 From: wahidw Date: Sat, 4 Apr 2020 16:48:14 +0000 Subject: [PATCH] Handling of synchronous RMR messages Change-Id: I792f146bdd0ead8097ab9460b5c5797915f72e5e Signed-off-by: wahidw --- Dockerfile | 3 +- cmd/rtmgr.go | 4 +- pkg/nbi/httprestful.go | 109 ++++++++++++++++++++++----------------------- pkg/rpe/rmr.go | 57 +++++++++++++++++++++++- pkg/rpe/rpe.go | 76 ++++++++++++++++++++++++++++++++ pkg/rpe/types.go | 2 +- pkg/sbi/nngpush.go | 117 ++++++++++++++++++++++++++++++++++++++++++++----- pkg/sbi/sbi.go | 79 +++++++++++++++++++-------------- pkg/sbi/types.go | 1 + uta_rtg_ric.rt | 2 + 10 files changed, 345 insertions(+), 105 deletions(-) create mode 100644 uta_rtg_ric.rt diff --git a/Dockerfile b/Dockerfile index 2919ebf..96e7a25 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,7 +26,7 @@ FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:4-u18.04-nng as rtmgrbuild # Install RMr shared library -ARG RMRVERSION=3.6.0 +ARG RMRVERSION=3.6.2 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb # Install RMr development header files RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb @@ -83,6 +83,7 @@ COPY --from=rtmgrbuild /go/bin/rtmgr / COPY --from=rtmgrbuild /run_rtmgr.sh / COPY --from=rtmgrbuild /usr/local/include /usr/local/include COPY --from=rtmgrbuild /usr/local/lib /usr/local/lib +COPY "uta_rtg_ric.rt" / RUN ldconfig RUN apt-get update && apt-get install -y iputils-ping net-tools curl tcpdump RUN mkdir /db && touch /db/rt.json && chmod 777 /db/rt.json diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go index 63b67b7..f3cbc5e 100644 --- a/cmd/rtmgr.go +++ b/cmd/rtmgr.go @@ -77,8 +77,8 @@ func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine continue } sbiEngine.UpdateEndpoints(data) - policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) - err = sbiEngine.DistributeAll(policies) + route_table, meid_table := rpeEngine.GenerateRouteTables(rtmgr.Eps, data) + err = sbiEngine.DistributeRouteTables(route_table, meid_table) if err != nil { xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) } diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index e764467..80d55c4 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -35,8 +35,8 @@ import ( "encoding/json" "errors" "fmt" - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" xfmodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "github.com/go-openapi/loads" "github.com/go-openapi/runtime/middleware" "net" @@ -574,40 +574,41 @@ func PopulateE2TMap(e2tDataList *[]rtmgr.E2tIdentity, e2ts map[string]rtmgr.E2TI } e2ts[e2tinst.Fqdn] = e2tinst - meids = append(meids,str) + meids = append(meids, str) } } func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, e2murl string, sdlEngine sdl.Engine) error { xapp.Logger.Info("Invoked retrieveStartupData ") - var readErr error - var maxRetries = 10 + var readErr error + var err error + var maxRetries = 10 var xappData *[]rtmgr.XApp xappData = new([]rtmgr.XApp) xapp.Logger.Info("Trying to fetch XApps data from XAPP manager") - for i := 1; i <= maxRetries; i++ { - time.Sleep(2 * time.Second) + for i := 1; i <= maxRetries; i++ { + time.Sleep(2 * time.Second) - readErr = nil - xappData, err := httpGetXApps(xmurl) - if xappData != nil && err == nil { + readErr = nil + xappData, err := httpGetXApps(xmurl) + if xappData != nil && err == nil { break - } else if err == nil { - readErr = errors.New("unexpected HTTP status code") - } else { - xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) - readErr = err - } - } - - if ( readErr != nil) { - return readErr + } else if err == nil { + readErr = errors.New("unexpected HTTP status code") + } else { + xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) + readErr = err + } + } + + if readErr != nil { + return readErr } var meids []string e2ts := make(map[string]rtmgr.E2TInstance) xapp.Logger.Info("Trying to fetch E2T data from E2manager") - for i := 1; i <= maxRetries; i++ { + for i := 1; i <= maxRetries; i++ { readErr = nil e2tDataList, err := httpGetE2TList(e2murl) @@ -620,50 +621,50 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile xapp.Logger.Warn("cannot get E2T data from E2M due to: " + err.Error()) readErr = err } - time.Sleep(2 * time.Second) + time.Sleep(2 * time.Second) } - if ( readErr != nil) { - return readErr + if readErr != nil { + return readErr } pcData, confErr := rtmgr.GetPlatformComponents(configfile) - if confErr != nil { - xapp.Logger.Error(confErr.Error()) - return confErr - } - xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.") - // Combine the xapps data and platform data before writing to the SDL + if confErr != nil { + xapp.Logger.Error(confErr.Error()) + return confErr + } + xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.") + // Combine the xapps data and platform data before writing to the SDL ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: e2ts, MeidMap: meids} - writeErr := sdlEngine.WriteAll(fileName, ricData) - if writeErr != nil { - xapp.Logger.Error(writeErr.Error()) - } + writeErr := sdlEngine.WriteAll(fileName, ricData) + if writeErr != nil { + xapp.Logger.Error(writeErr.Error()) + } xapp.Logger.Info("Trying to fetch Subscriptions data from Subscription manager") -/* for i := 1; i <= maxRetries; i++ { - readErr = nil - sub_list, err := xapp.Subscription.QuerySubscriptions() + /* for i := 1; i <= maxRetries; i++ { + readErr = nil + sub_list, err := xapp.Subscription.QuerySubscriptions() - if sub_list != nil && err == nil { - PopulateSubscription(sub_list) - break - } else { - readErr = err - xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error()) - } - time.Sleep(2 * time.Second) - } + if sub_list != nil && err == nil { + PopulateSubscription(sub_list) + break + } else { + readErr = err + xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error()) + } + time.Sleep(2 * time.Second) + } - if (readErr != nil) { - return readErr + if (readErr != nil) { + return readErr + } + */ + // post subscription req to appmgr + readErr = PostSubReq(xmurl, nbiif) + if readErr == nil { + return nil } -*/ - // post subscription req to appmgr - readErr = PostSubReq(xmurl, nbiif) - if readErr == nil { - return nil - } return readErr } @@ -873,7 +874,7 @@ func PopulateSubscription(sub_list xfmodel.SubscriptionList) { stringSlice := strings.Split(ep, ":") subdata.Address = &stringSlice[0] - intportval, _ := strconv.Atoi( stringSlice[1]) + intportval, _ := strconv.Atoi(stringSlice[1]) value := uint16(intportval) subdata.Port = &value xapp.Logger.Debug("Adding Subscription List has Address :%v, port :%v, SubscriptionID :%v ", subdata.Address, subdata.Address, subdata.SubscriptionID) diff --git a/pkg/rpe/rmr.go b/pkg/rpe/rmr.go index 346644c..0043dc9 100644 --- a/pkg/rpe/rmr.go +++ b/pkg/rpe/rmr.go @@ -102,11 +102,64 @@ func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents, return &rawrt } +/* +Produces the raw route message consumable by RMR +*/ +func (r *Rmr) generateRMRRouteTables(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents, key string) (*[]string, *[]string) { + rawrt := []string{key + "newrt|start\n"} + rt := r.generateRouteTable(eps) + for _, rte := range *rt { + rawrte := key + "mse|" + rte.MessageType + for _, tx := range rte.TxList { + rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port)) + } + rawrte += "|" + strconv.Itoa(int(rte.SubID)) + "|" + group := "" + for _, rxg := range rte.RxGroups { + member := "" + for _, rx := range rxg { + if member == "" { + member += rx.Ip + ":" + strconv.Itoa(int(rx.Port)) + } else { + member += "," + rx.Ip + ":" + strconv.Itoa(int(rx.Port)) + } + } + if group == "" { + group += member + } else { + group += ";" + member + } + } + rawrte += group + + if (rte.RouteType == "%meid") { + rawrte += group + rte.RouteType + } + + rawrt = append(rawrt, rawrte+"\n") + } + rawrt = append(rawrt, key+"newrt|end\n") + count := 0 + + meidrt := []string{key +"meid_map|start\n"} + for _, value := range rcs.MeidMap { + meidrt = append(meidrt, key + value + "\n") + count++ + } + meidrt = append(meidrt, key+"meid_map|end|" + strconv.Itoa(count) +"\n") + + xapp.Logger.Debug("rmr.generateRMRRouteTables returns: %v", rawrt) + xapp.Logger.Debug("rmr.generateRMRRouteTables returns: %v", meidrt) + xapp.Logger.Debug("rmr.generateRMRRouteTables returns: %v", rcs) + return &rawrt, &meidrt +} + func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) *[]string { xapp.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps) return r.generateRMRPolicies(eps, rcs, "") } -func (r *RmrPush) GenerateRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable { - return r.generateRouteTable(eps) +func (r *RmrPush) GenerateRouteTables(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) (*[]string, *[]string) { + xapp.Logger.Debug("Invoked rmr.GenerateRouteTables, args: %v: ", eps) + return r.generateRMRRouteTables(eps, rcs, "") } diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go index d26a704..be84fb7 100644 --- a/pkg/rpe/rpe.go +++ b/pkg/rpe/rpe.go @@ -182,6 +182,81 @@ func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoin } +func (r *Rpe) generateXappToXappRoutes(RecvxAppEp *rtmgr.Endpoint, endPointList rtmgr.Endpoints, routeTable *rtmgr.RouteTable) { + xapp.Logger.Debug("rpe.generateXappToXappRoutes invoked") + + for _, rxmsg := range RecvxAppEp.RxMessages { + + var src_present bool + xapp.Logger.Debug("RecvxAppEp.RxMessages Endpoint: %v, xAppType: %v and rxmsg: %v ", RecvxAppEp.Name, RecvxAppEp.XAppType, rxmsg) + if (rxmsg != "RIC_SUB_RESP" && rxmsg != "RIC_SUB_FAILURE" && rxmsg != "RIC_SUB_DEL_RESP" && rxmsg != "RIC_SUB_DEL_FAILURE" && rxmsg != "RIC_INDICATION" && rxmsg != "RIC_CONTROL_ACK" && rxmsg != "RIC_CONTROL_FAILURE" && rxmsg != "A1_POLICY_REQ") { + for _, SrcxAppEp := range endPointList { + if SrcxAppEp.XAppType != sbi.PlatformType && (len(SrcxAppEp.TxMessages) > 0) && SrcxAppEp.Name != RecvxAppEp.Name { + for _, txmsg := range SrcxAppEp.TxMessages { + if (rxmsg == txmsg) { + r.addRoute(rxmsg, SrcxAppEp, RecvxAppEp, routeTable, -1, "") + src_present = true + break + } + } + } + } + if src_present == false { + r.addRoute(rxmsg, nil, RecvxAppEp, routeTable, -1, "") + } + } + + } +} + +func (r *Rpe) generateXappToXappRoutes(RecvxAppEp *rtmgr.Endpoint, endPointList rtmgr.Endpoints, routeTable *rtmgr.RouteTable) { + + xapp.Logger.Debug("rpe.generateXappToXappRoutes invoked") + + + for _, rxmsg := range RecvxAppEp.RxMessages { + + + var src_present bool + + xapp.Logger.Debug("RecvxAppEp.RxMessages Endpoint: %v, xAppType: %v and rxmsg: %v ", RecvxAppEp.Name, RecvxAppEp.XAppType, rxmsg) + + if (rxmsg != "RIC_SUB_RESP" && rxmsg != "RIC_SUB_FAILURE" && rxmsg != "RIC_SUB_DEL_RESP" && rxmsg != "RIC_SUB_DEL_FAILURE" && rxmsg != "RIC_INDICATION" && rxmsg != "RIC_CONTROL_ACK" && rxmsg != "RIC_CONTROL_FAILURE" && rxmsg != "A1_POLICY_REQ") { + + for _, SrcxAppEp := range endPointList { + + if SrcxAppEp.XAppType != sbi.PlatformType && (len(SrcxAppEp.TxMessages) > 0) && SrcxAppEp.Name != RecvxAppEp.Name { + + for _, txmsg := rnge SrcxAppEp.TxMessages { + + if (rxmsg == txmsg) { + + r.addRoute(rxmsg, SrcxAppEp, RecvxAppEp, routeTable, -1, "") + + src_present = true + + break + + } + + } + + } + + } + + if src_present == false { + + r.addRoute(rxmsg, nil, RecvxAppEp, routeTable, -1, "") + + } + + } + + + } + +} func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) { xapp.Logger.Debug("rpe.addSubscriptionRoutes invoked") subscriptionList := &rtmgr.Subs @@ -296,6 +371,7 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) { r.generateXappRoutes(endPoint, subManEp, routeTable) r.generateSubscriptionRoutes(endPoint, subManEp, routeTable) + r.generateXappToXappRoutes(endPoint, endPointList, routeTable) } } return routeTable diff --git a/pkg/rpe/types.go b/pkg/rpe/types.go index 4a5b9fc..9db9669 100644 --- a/pkg/rpe/types.go +++ b/pkg/rpe/types.go @@ -44,5 +44,5 @@ type EngineConfig struct { type Engine interface { GeneratePolicies(rtmgr.Endpoints, *rtmgr.RicComponents) *[]string - GenerateRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable + GenerateRouteTables(rtmgr.Endpoints, *rtmgr.RicComponents) (*[]string, *[]string) } diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index 4451299..5ab4480 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -46,6 +46,7 @@ import ( "bytes" "crypto/md5" "errors" + "fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "nanomsg.org/go/mangos/v2" "nanomsg.org/go/mangos/v2/protocol/push" @@ -53,9 +54,13 @@ import ( "routing-manager/pkg/rtmgr" "strconv" "time" - "fmt" ) +type EPStatus struct { + endpoint string + status bool +} + type NngPush struct { Sbi NewSocket CreateNewNngSocketHandler @@ -63,15 +68,14 @@ type NngPush struct { } type RMRParams struct { - *xapp.RMRParams + *xapp.RMRParams } - func (params *RMRParams) String() string { - var b bytes.Buffer - sum := md5.Sum(params.Payload) - fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum) - return b.String() + var b bytes.Buffer + sum := md5.Sum(params.Payload) + fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum) + return b.String() } func NewNngPush() *NngPush { @@ -122,10 +126,10 @@ func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error { xapp.Logger.Debug("args: %v", *ep) endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber) ep.Whid = int(xapp.Rmr.Openwh(endpoint)) - if ep.Whid < 0 { + if ep.Whid < 0 { return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid)) - }else { - xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint) + } else { + xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint) } return nil @@ -175,7 +179,7 @@ func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) { params := &RMRParams{&xapp.RMRParams{}} params.Mtype = 20 params.PayloadLen = len([]byte(pe)) - params.Payload =[]byte(pe) + params.Payload = []byte(pe) params.Mbuf = nil params.Whid = ep.Whid time.Sleep(1 * time.Millisecond) @@ -184,7 +188,7 @@ func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) { xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")") } -func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){ +func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint { return c.createEndpoint(payload, c) } @@ -197,3 +201,92 @@ func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error { return nil } +func (c *NngPush) DistributeRouteTables(route_table *[]string, meid_table *[]string) error { + xapp.Logger.Debug("Invoked: sbi.DistributeRouteTables") + xapp.Logger.Debug("args route_table: %v", route_table) + xapp.Logger.Debug("args meid_table: %v", meid_table) + + channel := make(chan EPStatus) + + var i int = 2 + + for _, ep := range rtmgr.Eps { + go c.send_sync(ep, route_table, meid_table, channel, i) + i = i + 1 + } + + count := 0 + result := make([]EPStatus, len(rtmgr.Eps)) + for i, _ := range result { + result[i] = <-channel + if result[i].status == true { + count++ + } else { + xapp.Logger.Error("RMR send is failed for endpoint %v", result[i].endpoint) + } + } + + if count < len(rtmgr.Eps) { + return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps))) + } + + return nil +} + +func (c *NngPush) send_sync(ep *rtmgr.Endpoint, route_table *[]string, meidtable *[]string, channel chan EPStatus, call_id int) { + xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid) + + ret := c.send_data(ep, route_table, call_id) + + if ret == true { + ret = c.send_data(ep, meidtable, call_id) + } + channel <- EPStatus{ep.Uuid, ret} + +} + +/* + + 1. first n-1 records rmr_wh_send (async send) + 2. last record rmr_wh_call (sync send) + +*/ + +func (c *NngPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool { + xapp.Logger.Debug("sync send route data to endpoint: " + ep.Uuid + " call_id: " + string(call_id)) + var state int + var retstr string + + length := len(*policies) + + for index, pe := range *policies { + + params := &RMRParams{&xapp.RMRParams{}} + params.Mtype = 20 + params.PayloadLen = len([]byte(pe)) + params.Payload = []byte(pe) + params.Mbuf = nil + params.Whid = ep.Whid + if index == length-1 { + params.Callid = call_id + params.Timeout = 200 + state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams) + if state != C.RMR_OK { + xapp.Logger.Error("sync send route data to endpoint: " + ep.Uuid + " is failed, call_id: " + string(call_id) + " for xapp.Rmr.SendCallMsg " + " return payload: " + retstr) + return false + } else { + xapp.Logger.Info("sync send route data to endpoint: " + ep.Uuid + " is success, call_id: " + string(call_id) + " return payload: " + retstr) + return true + } + + } else { + if xapp.Rmr.SendMsg(params.RMRParams) != true { + xapp.Logger.Error("sync send route data to endpoint: " + ep.Uuid + " is failed, call_id: " + string(call_id) + " for xapp.Rmr.SendMsg") + return false + } + } + } + + xapp.Logger.Error("sync send route data to endpoint: " + ep.Uuid + " is failed, call_id: " + string(call_id) + " xapp.Rmr.SendCallMsg is not called") + return false +} diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go index 58fe7d8..be415f3 100644 --- a/pkg/sbi/sbi.go +++ b/pkg/sbi/sbi.go @@ -31,7 +31,9 @@ package sbi import ( "errors" + "fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "net" "routing-manager/pkg/rtmgr" "strconv" "strings" @@ -107,7 +109,7 @@ func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbi Engine) { } } s.updatePlatformEndpoints(&((*rcs).Pcs), sbi) - s.updateE2TEndpoints(&((*rcs).E2Ts), sbi) + s.updateE2TEndpoints(&((*rcs).E2Ts), sbi) s.pruneEndpointList(sbi) } @@ -141,48 +143,59 @@ func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) } func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) { - xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts) - for _, e2t := range *E2Ts { - uuid := e2t.Fqdn - stringSlice := strings.Split(e2t.Fqdn, ":") - ipaddress := stringSlice[0] - port, _ := strconv.Atoi(stringSlice[1]) - if _, ok := rtmgr.Eps[uuid]; ok { - rtmgr.Eps[uuid].Keepalive = true - } else { - ep := &rtmgr.Endpoint{ - Uuid: uuid, - Name: e2t.Name, - XAppType: PlatformType, - Ip: ipaddress, - Port: uint16(port), - TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"], - RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"], - Socket: nil, - IsReady: false, - Keepalive: true, - } - xapp.Logger.Debug("ep created: %v", ep) - if err := sbi.AddEndpoint(ep); err != nil { - xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error()) - continue - } - rtmgr.Eps[uuid] = ep - } - } + xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts) + for _, e2t := range *E2Ts { + uuid := e2t.Fqdn + stringSlice := strings.Split(e2t.Fqdn, ":") + ipaddress := stringSlice[0] + port, _ := strconv.Atoi(stringSlice[1]) + if _, ok := rtmgr.Eps[uuid]; ok { + rtmgr.Eps[uuid].Keepalive = true + } else { + ep := &rtmgr.Endpoint{ + Uuid: uuid, + Name: e2t.Name, + XAppType: PlatformType, + Ip: ipaddress, + Port: uint16(port), + TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"], + RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"], + Socket: nil, + IsReady: false, + Keepalive: true, + } + xapp.Logger.Debug("ep created: %v", ep) + if err := sbi.AddEndpoint(ep); err != nil { + xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error()) + continue + } + rtmgr.Eps[uuid] = ep + } + } } -func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) { +func (s *Sbi) createEndpoint(payload string, sbi Engine) *rtmgr.Endpoint { xapp.Logger.Debug("CreateEndPoint %v", payload) stringSlice := strings.Split(payload, " ") uuid := stringSlice[0] - xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) - + xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) if _, ok := rtmgr.Eps[uuid]; ok { ep := rtmgr.Eps[uuid] return ep } + /* incase the stored Endpoint list is in the form of IP:port*/ + stringsubsplit := strings.Split(uuid, ":") + addr, err := net.LookupIP(stringsubsplit[0]) + if err == nil { + convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1]) + xapp.Logger.Info(" IP:Port received is %s", convertedUuid) + if _, ok := rtmgr.Eps[convertedUuid]; ok { + ep := rtmgr.Eps[convertedUuid] + return ep + } + } + return nil } diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go index c0ab373..c5b6de6 100644 --- a/pkg/sbi/types.go +++ b/pkg/sbi/types.go @@ -47,6 +47,7 @@ type Engine interface { UpdateEndpoints(*rtmgr.RicComponents) CreateEndpoint(string) (*rtmgr.Endpoint) DistributeToEp(*[]string, *rtmgr.Endpoint) error + DistributeRouteTables(route_table *[]string, meid_table *[]string) error } type NngSocket interface { diff --git a/uta_rtg_ric.rt b/uta_rtg_ric.rt new file mode 100644 index 0000000..70046b3 --- /dev/null +++ b/uta_rtg_ric.rt @@ -0,0 +1,2 @@ +newrt|start +newrt|end -- 2.16.6