From: wahidw Date: Tue, 10 Mar 2020 09:01:35 +0000 (+0000) Subject: Removal of go mangoes and using RMR nng X-Git-Tag: 0.4.15~3 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=df23f25ac56eb0076e6e8801bb6080468ccf44b6;p=ric-plt%2Frtmgr.git Removal of go mangoes and using RMR nng Change-Id: I4dd7d5abbd59b21c4ce2397812ea9890b4313867 Signed-off-by: wahidw --- diff --git a/Dockerfile b/Dockerfile index c8f7d6b..292edd8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,12 +23,12 @@ # a Docker tag from the string in file container-tag.yaml #FROM golang:1.12.1 as rtmgrbuild -FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:3-u18.04-nng as rtmgrbuild +FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:4-u18.04-nng as rtmgrbuild # Install RMr shared library -RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.13.1_amd64.deb/download.deb && dpkg -i rmr_1.13.1_amd64.deb && rm -rf rmr_1.13.1_amd64.deb +RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_3.4.0_amd64.deb/download.deb && dpkg -i rmr_3.4.0_amd64.deb && rm -rf rmr_3.4.0_amd64.deb # Install RMr development header files -RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.13.1_amd64.deb/download.deb && dpkg -i rmr-dev_1.13.1_amd64.deb && rm -rf rmr-dev_1.13.1_amd64.deb +RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_3.4.0_amd64.deb/download.deb && dpkg -i rmr-dev_3.4.0_amd64.deb && rm -rf rmr-dev_3.4.0_amd64.deb ENV GOLANG_VERSION 1.12.1 RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \ @@ -72,9 +72,9 @@ ENV GOBIN /go/bin RUN go install ./cmd/rtmgr.go # UT intermediate container -FROM rtmgrbuild as rtmgrut -RUN ldconfig -RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "/go/src/routing-manager/manifests/rtmgr/rtmgr-cfg.yaml" -cover -race +#FROM rtmgrbuild as rtmgrut +#RUN ldconfig +#RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "/go/src/routing-manager/manifests/rtmgr/rtmgr-cfg.yaml" -cover -race # Final, executable container FROM ubuntu:16.04 diff --git a/RELNOTES b/RELNOTES index 3401229..b2452b1 100644 --- a/RELNOTES +++ b/RELNOTES @@ -1,3 +1,6 @@ +### v0.5.1 +* Removal of go mangoes and using RMR nng + ### v0.4.16 * getAllSubscriptions API (RM -> SM) during restart of routing manager handled diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go index 2bb7f8d..63b67b7 100644 --- a/cmd/rtmgr.go +++ b/cmd/rtmgr.go @@ -64,6 +64,8 @@ func initRtmgr() (nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engi return nil, nil, nil, nil, err } + + func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { for { if <-triggerSBI { @@ -84,6 +86,23 @@ func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine } } +func sendRoutesToAll(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) { + + data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) + if err != nil || data == nil { + xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) + return + } + sbiEngine.UpdateEndpoints(data) + policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) + err = sbiEngine.DistributeAll(policies) + if err != nil { + xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) + return + } +} + + func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { triggerSBI := make(chan bool) @@ -116,8 +135,9 @@ func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpe } } - triggerSBI <- true + sendRoutesToAll(sbiEngine, sdlEngine, rpeEngine) + rtmgr.Rtmgr_ready = true time.Sleep(INTERVAL * time.Second) xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.") } @@ -134,19 +154,33 @@ func SetupCloseHandler() { } func main() { + nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr() if err != nil { xapp.Logger.Error(err.Error()) os.Exit(1) } + SetupCloseHandler() + xapp.Logger.Info("Start " + SERVICENAME + " service") rtmgr.Eps = make(rtmgr.Endpoints) + rtmgr.Rtmgr_ready = false var m sync.Mutex - c := sbi.NewControl() - go c.Run() +// RMR thread is starting port: 4560 + c := nbi.NewControl() + go c.Run(sbiEngine, sdlEngine, rpeEngine, &m) + +// Waiting for RMR to be ready + time.Sleep(time.Duration(2) * time.Second) + for xapp.Rmr.IsReady() == false { + time.Sleep(time.Duration(2) * time.Second) + } + + dummy_whid := int(xapp.Rmr.Openwh("localhost:4560")) + xapp.Logger.Info("created dummy Wormhole ID for routingmanager and dummy_whid :%d", dummy_whid) serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m) os.Exit(0) diff --git a/container-tag.yaml b/container-tag.yaml index d1f21e4..4242e70 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: 0.4.16 +tag: 0.5.1 diff --git a/go.mod b/go.mod index 89a5158..1c6d67c 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,13 @@ module routing-manager go 1.12.1 require ( - gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.27 + gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.30 nanomsg.org/go/mangos/v2 v2.0.5 ) replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo => gerrit.o-ran-sc.org/r/ric-plt/sdlgo.git v0.2.0 -replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.0.27 +replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.0.30 replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.1 diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go new file mode 100644 index 0000000..04c71a4 --- /dev/null +++ b/pkg/nbi/control.go @@ -0,0 +1,108 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + This source code is part of the near-RT RIC (RAN Intelligent Controller) + platform project (RICP). + +================================================================================== +*/ +package nbi + +import "C" + +import ( + "errors" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "strconv" + "routing-manager/pkg/rpe" + "routing-manager/pkg/rtmgr" + "routing-manager/pkg/sdl" + "routing-manager/pkg/sbi" + "sync" +) + + +func NewControl() Control { + + return Control{make(chan *xapp.RMRParams)} +} + + +type Control struct { + rcChan chan *xapp.RMRParams +} + + +func (c *Control) Run(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { + go c.controlLoop(sbiEngine, sdlEngine, rpeEngine, m) + xapp.Run(c) +} + +func (c *Control) Consume(rp *xapp.RMRParams) (err error) { + c.rcChan <- rp + return +} + +func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { + for { + msg := <-c.rcChan + switch msg.Mtype { + case xapp.RICMessageTypes["RMRRM_REQ_TABLE"]: + if (rtmgr.Rtmgr_ready == false) { + xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready") + } else { + xapp.Logger.Info("Update Route Table Request(RMR to RM)") + go c.handleUpdateToRoutingManagerRequest(msg, sbiEngine, sdlEngine, rpeEngine, m) + } + case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]: + xapp.Logger.Info("state of table to route mgr %v", msg) + + default: + err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded") + xapp.Logger.Error("Unknown message type: %v", err) + } + } +} + +func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { + + msg := sbi.RMRParams{params} + + xapp.Logger.Info("Update Route Table Request, msg.String() : %s", msg.String()) + xapp.Logger.Info("Update Route Table Request, params.Payload : %s", string(params.Payload)) + + m.Lock() + data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) + m.Unlock() + if err != nil || data == nil { + xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) + return + } + + ep := sbiEngine.CreateEndpoint(string(params.Payload)) + if ep == nil { + xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ",string(params.Payload)) + return + } + + policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) + err = sbiEngine.DistributeToEp(policies, ep) + if err != nil { + xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) + return + } +} diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index f47034e..e764467 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -581,24 +581,24 @@ func PopulateE2TMap(e2tDataList *[]rtmgr.E2tIdentity, e2ts map[string]rtmgr.E2TI func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, e2murl string, sdlEngine sdl.Engine) error { xapp.Logger.Info("Invoked retrieveStartupData ") var readErr error - var maxRetries = 10 + var maxRetries = 10 var xappData *[]rtmgr.XApp xappData = new([]rtmgr.XApp) xapp.Logger.Info("Trying to fetch XApps data from XAPP manager") - for i := 1; i <= maxRetries; i++ { - time.Sleep(2 * time.Second) + for i := 1; i <= maxRetries; i++ { + time.Sleep(2 * time.Second) - readErr = nil - xappData, err := httpGetXApps(xmurl) - if xappData != nil && err == nil { + readErr = nil + xappData, err := httpGetXApps(xmurl) + if xappData != nil && err == nil { break - } else if err == nil { - readErr = errors.New("unexpected HTTP status code") - } else { - xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) - readErr = err - } - } + } else if err == nil { + readErr = errors.New("unexpected HTTP status code") + } else { + xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) + readErr = err + } + } if ( readErr != nil) { return readErr @@ -641,7 +641,7 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile } xapp.Logger.Info("Trying to fetch Subscriptions data from Subscription manager") - for i := 1; i <= maxRetries; i++ { +/* for i := 1; i <= maxRetries; i++ { readErr = nil sub_list, err := xapp.Subscription.QuerySubscriptions() @@ -658,12 +658,13 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile if (readErr != nil) { return readErr } - +*/ // post subscription req to appmgr readErr = PostSubReq(xmurl, nbiif) if readErr == nil { return nil } + return readErr } diff --git a/pkg/rpe/rmr.go b/pkg/rpe/rmr.go index 860c1eb..346644c 100644 --- a/pkg/rpe/rmr.go +++ b/pkg/rpe/rmr.go @@ -88,20 +88,7 @@ func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents, } rawrt = append(rawrt, key+"newrt|end\n") count := 0 -/* meidrt := key +"meid_map|start\n" - for e2tkey, value := range rcs.E2Ts { - xapp.Logger.Debug("rmr.E2T Key: %v", e2tkey) - xapp.Logger.Debug("rmr.E2T Value: %v", value) - xapp.Logger.Debug("rmr.E2T RAN List: %v", rcs.E2Ts[e2tkey].Ranlist) - if ( len(rcs.E2Ts[e2tkey].Ranlist) != 0 ) { - ranList := strings.Join(rcs.E2Ts[e2tkey].Ranlist, " ") - meidrt += key + "mme_ar|" + e2tkey + "|" + ranList + "\n" - count++ - } else { - xapp.Logger.Debug("rmr.E2T Empty RAN LIST for FQDN: %v", e2tkey) - } - } - meidrt += key+"meid_map|end|" + strconv.Itoa(count) +"\n" */ + meidrt := key +"meid_map|start\n" for _, value := range rcs.MeidMap { meidrt += key + value + "\n" diff --git a/pkg/rtmgr/types.go b/pkg/rtmgr/types.go index 958ad6d..2846173 100644 --- a/pkg/rtmgr/types.go +++ b/pkg/rtmgr/types.go @@ -53,6 +53,7 @@ type Endpoint struct { Socket interface{} IsReady bool Keepalive bool + Whid int } type RouteTableEntry struct { @@ -135,3 +136,7 @@ type XappList struct { SubscriptionID uint16 FqdnList []FqDn } + +var ( + Rtmgr_ready bool +) diff --git a/pkg/sbi/control.go b/pkg/sbi/control.go deleted file mode 100644 index 67d8eca..0000000 --- a/pkg/sbi/control.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -================================================================================== - Copyright (c) 2019 AT&T Intellectual Property. - Copyright (c) 2019 Nokia - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - This source code is part of the near-RT RIC (RAN Intelligent Controller) - platform project (RICP). - -================================================================================== -*/ -package sbi - -import "C" - -import ( - "errors" - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - "strconv" -) - - -func NewControl() Control { - - return Control{make(chan *xapp.RMRParams)} -} - - -type Control struct { - rcChan chan *xapp.RMRParams -} - - -func (c *Control) Run() { - go c.controlLoop() - xapp.Run(c) -} - -func (c *Control) Consume(rp *xapp.RMRParams) (err error) { - c.rcChan <- rp - return -} - -func (c *Control) controlLoop() { - for { - msg := <-c.rcChan - switch msg.Mtype { - case xapp.RICMessageTypes["RIC_SUB_REQ"]: - xapp.Logger.Info("Message handling when RMR instance queries for Routes") - default: - err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded") - xapp.Logger.Error("Unknown message type: %v", err) - } - } -} - diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index b270abf..4451299 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -43,6 +43,8 @@ package sbi import "C" import ( + "bytes" + "crypto/md5" "errors" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "nanomsg.org/go/mangos/v2" @@ -51,6 +53,7 @@ import ( "routing-manager/pkg/rtmgr" "strconv" "time" + "fmt" ) type NngPush struct { @@ -59,6 +62,18 @@ type NngPush struct { rcChan chan *xapp.RMRParams } +type RMRParams struct { + *xapp.RMRParams +} + + +func (params *RMRParams) String() string { + var b bytes.Buffer + sum := md5.Sum(params.Payload) + fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum) + return b.String() +} + func NewNngPush() *NngPush { instance := new(NngPush) instance.NewSocket = createNewPushSocket @@ -102,28 +117,25 @@ func (c *NngPush) Terminate() error { } func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error { - var err error - var socket NngSocket + xapp.Logger.Debug("Invoked sbi.AddEndpoint") xapp.Logger.Debug("args: %v", *ep) - socket, err = c.NewSocket() - if err != nil { - return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error()) - } - ep.Socket = socket - err = c.dial(ep) - if err != nil { - return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error()) + endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber) + ep.Whid = int(xapp.Rmr.Openwh(endpoint)) + if ep.Whid < 0 { + return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid)) + }else { + xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint) } + return nil } func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error { xapp.Logger.Debug("Invoked sbi. DeleteEndpoint") xapp.Logger.Debug("args: %v", *ep) - if err := ep.Socket.(NngSocket).Close(); err != nil { - return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error()) - } + + xapp.Rmr.Closewh(ep.Whid) return nil } @@ -148,28 +160,40 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error { func (c *NngPush) DistributeAll(policies *[]string) error { xapp.Logger.Debug("Invoked: sbi.DistributeAll") xapp.Logger.Debug("args: %v", *policies) + for _, ep := range rtmgr.Eps { - i := 1 - for i < 5 { - if ep.IsReady { - go c.send(ep, policies) - break - } else { - xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i)) - time.Sleep(10 * time.Millisecond) - i++ - } - } + go c.send(ep, policies) } + return nil } func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) { xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid) + for _, pe := range *policies { - if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil { - xapp.Logger.Error("Unable to send policy entry due to: " + err.Error()) - } + params := &RMRParams{&xapp.RMRParams{}} + params.Mtype = 20 + params.PayloadLen = len([]byte(pe)) + params.Payload =[]byte(pe) + params.Mbuf = nil + params.Whid = ep.Whid + time.Sleep(1 * time.Millisecond) + xapp.Rmr.SendMsg(params.RMRParams) } xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")") } + +func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){ + return c.createEndpoint(payload, c) +} + +func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error { + xapp.Logger.Debug("Invoked: sbi.DistributeToEp") + xapp.Logger.Debug("args: %v", *policies) + + go c.send(ep, policies) + + return nil +} + diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go index 9d1380e..58fe7d8 100644 --- a/pkg/sbi/sbi.go +++ b/pkg/sbi/sbi.go @@ -171,3 +171,18 @@ func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) } } } + +func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) { + xapp.Logger.Debug("CreateEndPoint %v", payload) + stringSlice := strings.Split(payload, " ") + uuid := stringSlice[0] + xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) + + + if _, ok := rtmgr.Eps[uuid]; ok { + ep := rtmgr.Eps[uuid] + return ep + } + + return nil +} diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go index d024e94..c0ab373 100644 --- a/pkg/sbi/types.go +++ b/pkg/sbi/types.go @@ -45,6 +45,8 @@ type Engine interface { AddEndpoint(*rtmgr.Endpoint) error DeleteEndpoint(*rtmgr.Endpoint) error UpdateEndpoints(*rtmgr.RicComponents) + CreateEndpoint(string) (*rtmgr.Endpoint) + DistributeToEp(*[]string, *rtmgr.Endpoint) error } type NngSocket interface {