From 45b86cc64dc6071f160b0f2c9ab8dfb57d1ce039 Mon Sep 17 00:00:00 2001 From: wahidw Date: Sat, 28 Mar 2020 13:54:53 +0000 Subject: [PATCH] Taking old commit for releasing image Change-Id: I71a75c5eb6d08abc1285a0e8d64c6aeb9c6fba84 Signed-off-by: wahidw --- Dockerfile | 13 ++--- RELNOTES | 12 ----- cmd/rtmgr.go | 40 ++-------------- container-tag.yaml | 2 +- go.mod | 4 +- manifests/rtmgr/rtmgr-cfg.yaml | 2 - pkg/nbi/control.go | 106 ----------------------------------------- pkg/nbi/httprestful.go | 98 +++++++++++-------------------------- pkg/nbi/httprestful_test.go | 25 ---------- pkg/rpe/rmr.go | 15 +++++- pkg/rpe/rpe.go | 2 - pkg/rtmgr/types.go | 5 -- pkg/sbi/control.go | 67 ++++++++++++++++++++++++++ pkg/sbi/nngpush.go | 78 +++++++++++------------------- pkg/sbi/sbi.go | 15 ------ pkg/sbi/types.go | 2 - 16 files changed, 148 insertions(+), 338 deletions(-) delete mode 100644 pkg/nbi/control.go create mode 100644 pkg/sbi/control.go diff --git a/Dockerfile b/Dockerfile index 2919ebf..90e4404 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,10 +26,9 @@ FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:4-u18.04-nng as rtmgrbuild # Install RMr shared library -ARG RMRVERSION=3.6.0 -RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb +RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.13.1_amd64.deb/download.deb && dpkg -i rmr_1.13.1_amd64.deb && rm -rf rmr_1.13.1_amd64.deb # Install RMr development header files -RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb +RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.13.1_amd64.deb/download.deb && dpkg -i rmr-dev_1.13.1_amd64.deb && rm -rf rmr-dev_1.13.1_amd64.deb ENV GOLANG_VERSION 1.12.1 RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \ @@ -63,8 +62,6 @@ COPY go.mod go.mod COPY pkg pkg COPY cmd cmd COPY run_rtmgr.sh /run_rtmgr.sh -RUN mkdir manifests -COPY manifests/ /go/src/routing-manager/manifests #RUN go mod download #RUN /usr/local/go/bin/go mod tidy ENV GOPATH /go @@ -73,9 +70,9 @@ ENV GOBIN /go/bin RUN go install ./cmd/rtmgr.go # UT intermediate container -#FROM rtmgrbuild as rtmgrut -#RUN ldconfig -#RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "/go/src/routing-manager/manifests/rtmgr/rtmgr-cfg.yaml" -cover -race +FROM rtmgrbuild as rtmgrut +RUN ldconfig +RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "./manifests/rtmgr/rtmgr-cfg.yaml" -cover -race # Final, executable container FROM ubuntu:16.04 diff --git a/RELNOTES b/RELNOTES index dc50594..b497112 100644 --- a/RELNOTES +++ b/RELNOTES @@ -1,15 +1,3 @@ -### v0.5.3 -* RMR updated to v3.6.0 with support for E2 Setup message types - -### v0.5.2 -* Switch to RMR Si95 mode - -### v0.5.1 -* Removal of go mangoes and using RMR nng - -### v0.4.16 -* getAllSubscriptions API (RM -> SM) during restart of routing manager handled - ### v0.4.15 * Retained (E2M->E2T issue) - retrying when is_Ready flag in socket handle is false diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go index 63b67b7..2bb7f8d 100644 --- a/cmd/rtmgr.go +++ b/cmd/rtmgr.go @@ -64,8 +64,6 @@ func initRtmgr() (nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engi return nil, nil, nil, nil, err } - - func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { for { if <-triggerSBI { @@ -86,23 +84,6 @@ func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine } } -func sendRoutesToAll(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) { - - data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) - if err != nil || data == nil { - xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) - return - } - sbiEngine.UpdateEndpoints(data) - policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) - err = sbiEngine.DistributeAll(policies) - if err != nil { - xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) - return - } -} - - func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { triggerSBI := make(chan bool) @@ -135,9 +116,8 @@ func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpe } } - sendRoutesToAll(sbiEngine, sdlEngine, rpeEngine) + triggerSBI <- true - rtmgr.Rtmgr_ready = true time.Sleep(INTERVAL * time.Second) xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.") } @@ -154,33 +134,19 @@ func SetupCloseHandler() { } func main() { - nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr() if err != nil { xapp.Logger.Error(err.Error()) os.Exit(1) } - SetupCloseHandler() - xapp.Logger.Info("Start " + SERVICENAME + " service") rtmgr.Eps = make(rtmgr.Endpoints) - rtmgr.Rtmgr_ready = false var m sync.Mutex -// RMR thread is starting port: 4560 - c := nbi.NewControl() - go c.Run(sbiEngine, sdlEngine, rpeEngine, &m) - -// Waiting for RMR to be ready - time.Sleep(time.Duration(2) * time.Second) - for xapp.Rmr.IsReady() == false { - time.Sleep(time.Duration(2) * time.Second) - } - - dummy_whid := int(xapp.Rmr.Openwh("localhost:4560")) - xapp.Logger.Info("created dummy Wormhole ID for routingmanager and dummy_whid :%d", dummy_whid) + c := sbi.NewControl() + go c.Run() serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m) os.Exit(0) diff --git a/container-tag.yaml b/container-tag.yaml index 71f44c7..c754d3c 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: 0.5.3 +tag: 0.4.15 diff --git a/go.mod b/go.mod index db8af42..e133c89 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,13 @@ module routing-manager go 1.12.1 require ( - gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.4.4 + gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.24 nanomsg.org/go/mangos/v2 v2.0.5 ) replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo => gerrit.o-ran-sc.org/r/ric-plt/sdlgo.git v0.2.0 -replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.4.4 +replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.0.24 replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.1 diff --git a/manifests/rtmgr/rtmgr-cfg.yaml b/manifests/rtmgr/rtmgr-cfg.yaml index 55aab19..5fbc273 100644 --- a/manifests/rtmgr/rtmgr-cfg.yaml +++ b/manifests/rtmgr/rtmgr-cfg.yaml @@ -65,5 +65,3 @@ data: "maxSize": 2072 "numWorkers": 1 } -subscription: - host: "127.0.0.1:8089" diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go deleted file mode 100644 index 8d9e6c4..0000000 --- a/pkg/nbi/control.go +++ /dev/null @@ -1,106 +0,0 @@ -/* -================================================================================== - Copyright (c) 2019 AT&T Intellectual Property. - Copyright (c) 2019 Nokia - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - This source code is part of the near-RT RIC (RAN Intelligent Controller) - platform project (RICP). - -================================================================================== -*/ -package nbi - -import "C" - -import ( - "errors" - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - "routing-manager/pkg/rpe" - "routing-manager/pkg/rtmgr" - "routing-manager/pkg/sbi" - "routing-manager/pkg/sdl" - "strconv" - "sync" -) - -func NewControl() Control { - - return Control{make(chan *xapp.RMRParams)} -} - -type Control struct { - rcChan chan *xapp.RMRParams -} - -func (c *Control) Run(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { - go c.controlLoop(sbiEngine, sdlEngine, rpeEngine, m) - xapp.Run(c) -} - -func (c *Control) Consume(rp *xapp.RMRParams) (err error) { - c.rcChan <- rp - return -} - -func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { - for { - msg := <-c.rcChan - xapp_msg := sbi.RMRParams{msg} - switch msg.Mtype { - case xapp.RICMessageTypes["RMRRM_REQ_TABLE"]: - if rtmgr.Rtmgr_ready == false { - xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready") - } else { - xapp.Logger.Info("Update Route Table Request(RMR to RM)") - go c.handleUpdateToRoutingManagerRequest(msg, sbiEngine, sdlEngine, rpeEngine, m) - } - case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]: - xapp.Logger.Info("state of table to route mgr %s,payload %s", xapp_msg.String(), msg.Payload) - - default: - err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded") - xapp.Logger.Error("Unknown message type: %v", err) - } - } -} - -func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { - - msg := sbi.RMRParams{params} - - xapp.Logger.Info("Update Route Table Request, msg.String() : %s", msg.String()) - xapp.Logger.Info("Update Route Table Request, params.Payload : %s", string(params.Payload)) - - m.Lock() - data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) - m.Unlock() - if err != nil || data == nil { - xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) - return - } - - ep := sbiEngine.CreateEndpoint(string(params.Payload)) - if ep == nil { - xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload)) - return - } - - policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) - err = sbiEngine.DistributeToEp(policies, ep) - if err != nil { - xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) - return - } -} diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index e764467..b07a8db 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -36,7 +36,6 @@ import ( "errors" "fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - xfmodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "github.com/go-openapi/loads" "github.com/go-openapi/runtime/middleware" "net" @@ -580,25 +579,25 @@ func PopulateE2TMap(e2tDataList *[]rtmgr.E2tIdentity, e2ts map[string]rtmgr.E2TI func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, e2murl string, sdlEngine sdl.Engine) error { xapp.Logger.Info("Invoked retrieveStartupData ") - var readErr error - var maxRetries = 10 + var readErr error + var maxRetries = 10 var xappData *[]rtmgr.XApp xappData = new([]rtmgr.XApp) xapp.Logger.Info("Trying to fetch XApps data from XAPP manager") - for i := 1; i <= maxRetries; i++ { - time.Sleep(2 * time.Second) + for i := 1; i <= maxRetries; i++ { + time.Sleep(2 * time.Second) - readErr = nil - xappData, err := httpGetXApps(xmurl) - if xappData != nil && err == nil { + readErr = nil + xappData, err := httpGetXApps(xmurl) + if xappData != nil && err == nil { break - } else if err == nil { - readErr = errors.New("unexpected HTTP status code") - } else { - xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) - readErr = err - } - } + } else if err == nil { + readErr = errors.New("unexpected HTTP status code") + } else { + xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) + readErr = err + } + } if ( readErr != nil) { return readErr @@ -608,6 +607,7 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile e2ts := make(map[string]rtmgr.E2TInstance) xapp.Logger.Info("Trying to fetch E2T data from E2manager") for i := 1; i <= maxRetries; i++ { + time.Sleep(2 * time.Second) readErr = nil e2tDataList, err := httpGetE2TList(e2murl) @@ -620,7 +620,6 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile xapp.Logger.Warn("cannot get E2T data from E2M due to: " + err.Error()) readErr = err } - time.Sleep(2 * time.Second) } if ( readErr != nil) { @@ -628,43 +627,22 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile } pcData, confErr := rtmgr.GetPlatformComponents(configfile) - if confErr != nil { - xapp.Logger.Error(confErr.Error()) - return confErr - } - xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.") - // Combine the xapps data and platform data before writing to the SDL + if confErr != nil { + xapp.Logger.Error(confErr.Error()) + return confErr + } + xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.") + // Combine the xapps data and platform data before writing to the SDL ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: e2ts, MeidMap: meids} - writeErr := sdlEngine.WriteAll(fileName, ricData) - if writeErr != nil { - xapp.Logger.Error(writeErr.Error()) - } - - xapp.Logger.Info("Trying to fetch Subscriptions data from Subscription manager") -/* for i := 1; i <= maxRetries; i++ { - readErr = nil - sub_list, err := xapp.Subscription.QuerySubscriptions() - - if sub_list != nil && err == nil { - PopulateSubscription(sub_list) - break - } else { - readErr = err - xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error()) + writeErr := sdlEngine.WriteAll(fileName, ricData) + if writeErr != nil { + xapp.Logger.Error(writeErr.Error()) + } + // post subscription req to appmgr + readErr = PostSubReq(xmurl, nbiif) + if readErr == nil { + return nil } - time.Sleep(2 * time.Second) - } - - if (readErr != nil) { - return readErr - } -*/ - // post subscription req to appmgr - readErr = PostSubReq(xmurl, nbiif) - if readErr == nil { - return nil - } - return readErr } @@ -863,21 +841,3 @@ func updateSubscription(data *rtmgr.XappList) { } } - -func PopulateSubscription(sub_list xfmodel.SubscriptionList) { - for _, sub_row := range sub_list { - var subdata models.XappSubscriptionData - id := int32(sub_row.SubscriptionID) - subdata.SubscriptionID = &id - for _, ep := range sub_row.Endpoint { - - stringSlice := strings.Split(ep, ":") - subdata.Address = &stringSlice[0] - intportval, _ := strconv.Atoi( stringSlice[1]) - value := uint16(intportval) - subdata.Port = &value - xapp.Logger.Debug("Adding Subscription List has Address :%v, port :%v, SubscriptionID :%v ", subdata.Address, subdata.Address, subdata.SubscriptionID) - addSubscription(&rtmgr.Subs, &subdata) - } - } -} diff --git a/pkg/nbi/httprestful_test.go b/pkg/nbi/httprestful_test.go index f42a835..3419dcd 100644 --- a/pkg/nbi/httprestful_test.go +++ b/pkg/nbi/httprestful_test.go @@ -68,8 +68,6 @@ var SubscriptionResp = []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventTy var E2TListResp = []byte(`[{"e2tAddress":"127.0.0.1:0","ranNames":["RanM0","RanN0"]},{"e2tAddress":"127.0.0.1:1","ranNames":["RanM1","RanN1"]},{"e2tAddress":"127.0.0.1:2","ranNames":["RanM2","RanN2"]},{"e2tAddress":"127.0.0.1:3","ranNames":["RanM3","RanN3"]}]`) -var SubscriptionList = []byte(`[{"SubscriptionId":11,"Meid":"Test-Gnb","Endpoint":["127.0.0.1:4056"]}]`) - var InvalidSubResp = []byte(`{"Version":0, "EventType":all}`) func TestValidateXappCallbackData_1(t *testing.T) { @@ -551,25 +549,6 @@ func createMockAppmgrWithData(url string, g []byte, p []byte, t []byte) *httptes return ts } -func createMockSubmgrWithData(url string, t []byte) *httptest.Server { - l, err := net.Listen("tcp", url) - if err != nil { - fmt.Println("Failed to create listener: " + err.Error()) - } - ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - if r.Method == "GET" && r.URL.String() == "//ric/v1/subscriptions" { - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - w.Write(t) - } - - })) - ts.Listener.Close() - ts.Listener = l - return ts -} - func createMockPlatformComponents() { var filename = "config.json" file, _ := json.MarshalIndent(stub.ValidPlatformComponents, "", "") @@ -703,10 +682,6 @@ func TestRetrieveStartupData(t *testing.T) { ts1.Start() defer ts1.Close() - ts2 := createMockSubmgrWithData("127.0.0.1:8089", SubscriptionList) - ts2.Start() - defer ts2.Close() - sdlEngine, _ := sdl.GetSdl("file") var httpRestful, _ = GetNbi("httpRESTful") createMockPlatformComponents() diff --git a/pkg/rpe/rmr.go b/pkg/rpe/rmr.go index 346644c..860c1eb 100644 --- a/pkg/rpe/rmr.go +++ b/pkg/rpe/rmr.go @@ -88,7 +88,20 @@ func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents, } rawrt = append(rawrt, key+"newrt|end\n") count := 0 - +/* meidrt := key +"meid_map|start\n" + for e2tkey, value := range rcs.E2Ts { + xapp.Logger.Debug("rmr.E2T Key: %v", e2tkey) + xapp.Logger.Debug("rmr.E2T Value: %v", value) + xapp.Logger.Debug("rmr.E2T RAN List: %v", rcs.E2Ts[e2tkey].Ranlist) + if ( len(rcs.E2Ts[e2tkey].Ranlist) != 0 ) { + ranList := strings.Join(rcs.E2Ts[e2tkey].Ranlist, " ") + meidrt += key + "mme_ar|" + e2tkey + "|" + ranList + "\n" + count++ + } else { + xapp.Logger.Debug("rmr.E2T Empty RAN LIST for FQDN: %v", e2tkey) + } + } + meidrt += key+"meid_map|end|" + strconv.Itoa(count) +"\n" */ meidrt := key +"meid_map|start\n" for _, value := range rcs.MeidMap { meidrt += key + value + "\n" diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go index d26a704..b5c6961 100644 --- a/pkg/rpe/rpe.go +++ b/pkg/rpe/rpe.go @@ -244,8 +244,6 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr. if len(e2TermEp) > 0 { r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermEp, routeTable, -1, "") r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermEp, routeTable, -1, "") - r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1, "") - r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1, "") } } diff --git a/pkg/rtmgr/types.go b/pkg/rtmgr/types.go index 2846173..958ad6d 100644 --- a/pkg/rtmgr/types.go +++ b/pkg/rtmgr/types.go @@ -53,7 +53,6 @@ type Endpoint struct { Socket interface{} IsReady bool Keepalive bool - Whid int } type RouteTableEntry struct { @@ -136,7 +135,3 @@ type XappList struct { SubscriptionID uint16 FqdnList []FqDn } - -var ( - Rtmgr_ready bool -) diff --git a/pkg/sbi/control.go b/pkg/sbi/control.go new file mode 100644 index 0000000..67d8eca --- /dev/null +++ b/pkg/sbi/control.go @@ -0,0 +1,67 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + This source code is part of the near-RT RIC (RAN Intelligent Controller) + platform project (RICP). + +================================================================================== +*/ +package sbi + +import "C" + +import ( + "errors" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "strconv" +) + + +func NewControl() Control { + + return Control{make(chan *xapp.RMRParams)} +} + + +type Control struct { + rcChan chan *xapp.RMRParams +} + + +func (c *Control) Run() { + go c.controlLoop() + xapp.Run(c) +} + +func (c *Control) Consume(rp *xapp.RMRParams) (err error) { + c.rcChan <- rp + return +} + +func (c *Control) controlLoop() { + for { + msg := <-c.rcChan + switch msg.Mtype { + case xapp.RICMessageTypes["RIC_SUB_REQ"]: + xapp.Logger.Info("Message handling when RMR instance queries for Routes") + default: + err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded") + xapp.Logger.Error("Unknown message type: %v", err) + } + } +} + diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index 4451299..b270abf 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -43,8 +43,6 @@ package sbi import "C" import ( - "bytes" - "crypto/md5" "errors" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "nanomsg.org/go/mangos/v2" @@ -53,7 +51,6 @@ import ( "routing-manager/pkg/rtmgr" "strconv" "time" - "fmt" ) type NngPush struct { @@ -62,18 +59,6 @@ type NngPush struct { rcChan chan *xapp.RMRParams } -type RMRParams struct { - *xapp.RMRParams -} - - -func (params *RMRParams) String() string { - var b bytes.Buffer - sum := md5.Sum(params.Payload) - fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum) - return b.String() -} - func NewNngPush() *NngPush { instance := new(NngPush) instance.NewSocket = createNewPushSocket @@ -117,25 +102,28 @@ func (c *NngPush) Terminate() error { } func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error { - + var err error + var socket NngSocket xapp.Logger.Debug("Invoked sbi.AddEndpoint") xapp.Logger.Debug("args: %v", *ep) - endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber) - ep.Whid = int(xapp.Rmr.Openwh(endpoint)) - if ep.Whid < 0 { - return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid)) - }else { - xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint) + socket, err = c.NewSocket() + if err != nil { + return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error()) + } + ep.Socket = socket + err = c.dial(ep) + if err != nil { + return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error()) } - return nil } func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error { xapp.Logger.Debug("Invoked sbi. DeleteEndpoint") xapp.Logger.Debug("args: %v", *ep) - - xapp.Rmr.Closewh(ep.Whid) + if err := ep.Socket.(NngSocket).Close(); err != nil { + return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error()) + } return nil } @@ -160,40 +148,28 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error { func (c *NngPush) DistributeAll(policies *[]string) error { xapp.Logger.Debug("Invoked: sbi.DistributeAll") xapp.Logger.Debug("args: %v", *policies) - for _, ep := range rtmgr.Eps { - go c.send(ep, policies) + i := 1 + for i < 5 { + if ep.IsReady { + go c.send(ep, policies) + break + } else { + xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i)) + time.Sleep(10 * time.Millisecond) + i++ + } + } } - return nil } func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) { xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid) - for _, pe := range *policies { - params := &RMRParams{&xapp.RMRParams{}} - params.Mtype = 20 - params.PayloadLen = len([]byte(pe)) - params.Payload =[]byte(pe) - params.Mbuf = nil - params.Whid = ep.Whid - time.Sleep(1 * time.Millisecond) - xapp.Rmr.SendMsg(params.RMRParams) + if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil { + xapp.Logger.Error("Unable to send policy entry due to: " + err.Error()) + } } xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")") } - -func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){ - return c.createEndpoint(payload, c) -} - -func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error { - xapp.Logger.Debug("Invoked: sbi.DistributeToEp") - xapp.Logger.Debug("args: %v", *policies) - - go c.send(ep, policies) - - return nil -} - diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go index 58fe7d8..9d1380e 100644 --- a/pkg/sbi/sbi.go +++ b/pkg/sbi/sbi.go @@ -171,18 +171,3 @@ func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) } } } - -func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) { - xapp.Logger.Debug("CreateEndPoint %v", payload) - stringSlice := strings.Split(payload, " ") - uuid := stringSlice[0] - xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) - - - if _, ok := rtmgr.Eps[uuid]; ok { - ep := rtmgr.Eps[uuid] - return ep - } - - return nil -} diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go index c0ab373..d024e94 100644 --- a/pkg/sbi/types.go +++ b/pkg/sbi/types.go @@ -45,8 +45,6 @@ type Engine interface { AddEndpoint(*rtmgr.Endpoint) error DeleteEndpoint(*rtmgr.Endpoint) error UpdateEndpoints(*rtmgr.RicComponents) - CreateEndpoint(string) (*rtmgr.Endpoint) - DistributeToEp(*[]string, *rtmgr.Endpoint) error } type NngSocket interface { -- 2.16.6