From dd6b05676beaff9d6252c3486cf8fb77748d37f8 Mon Sep 17 00:00:00 2001 From: wahidw Date: Tue, 31 Mar 2020 03:09:45 +0000 Subject: [PATCH] Reverting the code after releasing the image Change-Id: Ie8c567f805ccb3dc404d512b0a891f38c11d772d Signed-off-by: wahidw --- Dockerfile | 14 ++++-- RELNOTES | 12 +++++ cmd/rtmgr.go | 40 ++++++++++++++-- container-tag.yaml | 2 +- go.mod | 16 +++++-- manifests/rtmgr/rtmgr-cfg.yaml | 2 + pkg/nbi/control.go | 106 +++++++++++++++++++++++++++++++++++++++++ pkg/nbi/httprestful.go | 98 ++++++++++++++++++++++++++----------- pkg/nbi/httprestful_test.go | 25 ++++++++++ pkg/rpe/rmr.go | 15 +----- pkg/rpe/rpe.go | 2 + pkg/rtmgr/types.go | 5 ++ pkg/sbi/control.go | 67 -------------------------- pkg/sbi/nngpush.go | 78 +++++++++++++++++++----------- pkg/sbi/sbi.go | 15 ++++++ pkg/sbi/types.go | 2 + 16 files changed, 350 insertions(+), 149 deletions(-) create mode 100644 pkg/nbi/control.go delete mode 100644 pkg/sbi/control.go diff --git a/Dockerfile b/Dockerfile index 90e4404..7db5111 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,9 +26,10 @@ FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:4-u18.04-nng as rtmgrbuild # Install RMr shared library -RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.13.1_amd64.deb/download.deb && dpkg -i rmr_1.13.1_amd64.deb && rm -rf rmr_1.13.1_amd64.deb +ARG RMRVERSION=3.6.0 +RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb # Install RMr development header files -RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.13.1_amd64.deb/download.deb && dpkg -i rmr-dev_1.13.1_amd64.deb && rm -rf rmr-dev_1.13.1_amd64.deb +RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb ENV GOLANG_VERSION 1.12.1 RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \ @@ -62,17 +63,20 @@ COPY go.mod go.mod COPY pkg pkg COPY cmd cmd COPY run_rtmgr.sh /run_rtmgr.sh +RUN mkdir manifests +COPY manifests/ /go/src/routing-manager/manifests #RUN go mod download #RUN /usr/local/go/bin/go mod tidy ENV GOPATH /go ENV GOBIN /go/bin RUN go install ./cmd/rtmgr.go +CMD /bin/bash # UT intermediate container -FROM rtmgrbuild as rtmgrut -RUN ldconfig -RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "./manifests/rtmgr/rtmgr-cfg.yaml" -cover -race +#FROM rtmgrbuild as rtmgrut +#RUN ldconfig +#RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "/go/src/routing-manager/manifests/rtmgr/rtmgr-cfg.yaml" -cover -race # Final, executable container FROM ubuntu:16.04 diff --git a/RELNOTES b/RELNOTES index b497112..dc50594 100644 --- a/RELNOTES +++ b/RELNOTES @@ -1,3 +1,15 @@ +### v0.5.3 +* RMR updated to v3.6.0 with support for E2 Setup message types + +### v0.5.2 +* Switch to RMR Si95 mode + +### v0.5.1 +* Removal of go mangoes and using RMR nng + +### v0.4.16 +* getAllSubscriptions API (RM -> SM) during restart of routing manager handled + ### v0.4.15 * Retained (E2M->E2T issue) - retrying when is_Ready flag in socket handle is false diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go index 2bb7f8d..63b67b7 100644 --- a/cmd/rtmgr.go +++ b/cmd/rtmgr.go @@ -64,6 +64,8 @@ func initRtmgr() (nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engi return nil, nil, nil, nil, err } + + func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { for { if <-triggerSBI { @@ -84,6 +86,23 @@ func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine } } +func sendRoutesToAll(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) { + + data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) + if err != nil || data == nil { + xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) + return + } + sbiEngine.UpdateEndpoints(data) + policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) + err = sbiEngine.DistributeAll(policies) + if err != nil { + xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) + return + } +} + + func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { triggerSBI := make(chan bool) @@ -116,8 +135,9 @@ func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpe } } - triggerSBI <- true + sendRoutesToAll(sbiEngine, sdlEngine, rpeEngine) + rtmgr.Rtmgr_ready = true time.Sleep(INTERVAL * time.Second) xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.") } @@ -134,19 +154,33 @@ func SetupCloseHandler() { } func main() { + nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr() if err != nil { xapp.Logger.Error(err.Error()) os.Exit(1) } + SetupCloseHandler() + xapp.Logger.Info("Start " + SERVICENAME + " service") rtmgr.Eps = make(rtmgr.Endpoints) + rtmgr.Rtmgr_ready = false var m sync.Mutex - c := sbi.NewControl() - go c.Run() +// RMR thread is starting port: 4560 + c := nbi.NewControl() + go c.Run(sbiEngine, sdlEngine, rpeEngine, &m) + +// Waiting for RMR to be ready + time.Sleep(time.Duration(2) * time.Second) + for xapp.Rmr.IsReady() == false { + time.Sleep(time.Duration(2) * time.Second) + } + + dummy_whid := int(xapp.Rmr.Openwh("localhost:4560")) + xapp.Logger.Info("created dummy Wormhole ID for routingmanager and dummy_whid :%d", dummy_whid) serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m) os.Exit(0) diff --git a/container-tag.yaml b/container-tag.yaml index c754d3c..7b6e635 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: 0.4.15 +tag: 0.5.4 diff --git a/go.mod b/go.mod index e133c89..a6e285e 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,23 @@ module routing-manager go 1.12.1 require ( - gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.24 + gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.4.4 + github.com/ghodss/yaml v1.0.0 + github.com/go-openapi/errors v0.19.3 + github.com/go-openapi/loads v0.19.4 + github.com/go-openapi/runtime v0.19.4 + github.com/go-openapi/spec v0.19.3 + github.com/go-openapi/strfmt v0.19.4 + github.com/go-openapi/swag v0.19.7 + github.com/go-openapi/validate v0.19.6 + github.com/jessevdk/go-flags v1.4.0 + golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 nanomsg.org/go/mangos/v2 v2.0.5 ) -replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo => gerrit.o-ran-sc.org/r/ric-plt/sdlgo.git v0.2.0 +replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo => gerrit.o-ran-sc.org/r/ric-plt/sdlgo.git v0.5.2 -replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.0.24 +replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.4.4 replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.1 diff --git a/manifests/rtmgr/rtmgr-cfg.yaml b/manifests/rtmgr/rtmgr-cfg.yaml index 5fbc273..55aab19 100644 --- a/manifests/rtmgr/rtmgr-cfg.yaml +++ b/manifests/rtmgr/rtmgr-cfg.yaml @@ -65,3 +65,5 @@ data: "maxSize": 2072 "numWorkers": 1 } +subscription: + host: "127.0.0.1:8089" diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go new file mode 100644 index 0000000..8d9e6c4 --- /dev/null +++ b/pkg/nbi/control.go @@ -0,0 +1,106 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + This source code is part of the near-RT RIC (RAN Intelligent Controller) + platform project (RICP). + +================================================================================== +*/ +package nbi + +import "C" + +import ( + "errors" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "routing-manager/pkg/rpe" + "routing-manager/pkg/rtmgr" + "routing-manager/pkg/sbi" + "routing-manager/pkg/sdl" + "strconv" + "sync" +) + +func NewControl() Control { + + return Control{make(chan *xapp.RMRParams)} +} + +type Control struct { + rcChan chan *xapp.RMRParams +} + +func (c *Control) Run(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { + go c.controlLoop(sbiEngine, sdlEngine, rpeEngine, m) + xapp.Run(c) +} + +func (c *Control) Consume(rp *xapp.RMRParams) (err error) { + c.rcChan <- rp + return +} + +func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { + for { + msg := <-c.rcChan + xapp_msg := sbi.RMRParams{msg} + switch msg.Mtype { + case xapp.RICMessageTypes["RMRRM_REQ_TABLE"]: + if rtmgr.Rtmgr_ready == false { + xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready") + } else { + xapp.Logger.Info("Update Route Table Request(RMR to RM)") + go c.handleUpdateToRoutingManagerRequest(msg, sbiEngine, sdlEngine, rpeEngine, m) + } + case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]: + xapp.Logger.Info("state of table to route mgr %s,payload %s", xapp_msg.String(), msg.Payload) + + default: + err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded") + xapp.Logger.Error("Unknown message type: %v", err) + } + } +} + +func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) { + + msg := sbi.RMRParams{params} + + xapp.Logger.Info("Update Route Table Request, msg.String() : %s", msg.String()) + xapp.Logger.Info("Update Route Table Request, params.Payload : %s", string(params.Payload)) + + m.Lock() + data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile")) + m.Unlock() + if err != nil || data == nil { + xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error()) + return + } + + ep := sbiEngine.CreateEndpoint(string(params.Payload)) + if ep == nil { + xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload)) + return + } + + policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data) + err = sbiEngine.DistributeToEp(policies, ep) + if err != nil { + xapp.Logger.Error("Routing table cannot be published due to: " + err.Error()) + return + } +} diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go index b07a8db..e764467 100644 --- a/pkg/nbi/httprestful.go +++ b/pkg/nbi/httprestful.go @@ -36,6 +36,7 @@ import ( "errors" "fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + xfmodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "github.com/go-openapi/loads" "github.com/go-openapi/runtime/middleware" "net" @@ -579,25 +580,25 @@ func PopulateE2TMap(e2tDataList *[]rtmgr.E2tIdentity, e2ts map[string]rtmgr.E2TI func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, e2murl string, sdlEngine sdl.Engine) error { xapp.Logger.Info("Invoked retrieveStartupData ") - var readErr error - var maxRetries = 10 + var readErr error + var maxRetries = 10 var xappData *[]rtmgr.XApp xappData = new([]rtmgr.XApp) xapp.Logger.Info("Trying to fetch XApps data from XAPP manager") - for i := 1; i <= maxRetries; i++ { - time.Sleep(2 * time.Second) + for i := 1; i <= maxRetries; i++ { + time.Sleep(2 * time.Second) - readErr = nil - xappData, err := httpGetXApps(xmurl) - if xappData != nil && err == nil { + readErr = nil + xappData, err := httpGetXApps(xmurl) + if xappData != nil && err == nil { break - } else if err == nil { - readErr = errors.New("unexpected HTTP status code") - } else { - xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) - readErr = err - } - } + } else if err == nil { + readErr = errors.New("unexpected HTTP status code") + } else { + xapp.Logger.Warn("cannot get xapp data due to: " + err.Error()) + readErr = err + } + } if ( readErr != nil) { return readErr @@ -607,7 +608,6 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile e2ts := make(map[string]rtmgr.E2TInstance) xapp.Logger.Info("Trying to fetch E2T data from E2manager") for i := 1; i <= maxRetries; i++ { - time.Sleep(2 * time.Second) readErr = nil e2tDataList, err := httpGetE2TList(e2murl) @@ -620,6 +620,7 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile xapp.Logger.Warn("cannot get E2T data from E2M due to: " + err.Error()) readErr = err } + time.Sleep(2 * time.Second) } if ( readErr != nil) { @@ -627,22 +628,43 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile } pcData, confErr := rtmgr.GetPlatformComponents(configfile) - if confErr != nil { - xapp.Logger.Error(confErr.Error()) - return confErr - } - xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.") - // Combine the xapps data and platform data before writing to the SDL + if confErr != nil { + xapp.Logger.Error(confErr.Error()) + return confErr + } + xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.") + // Combine the xapps data and platform data before writing to the SDL ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: e2ts, MeidMap: meids} - writeErr := sdlEngine.WriteAll(fileName, ricData) - if writeErr != nil { - xapp.Logger.Error(writeErr.Error()) - } - // post subscription req to appmgr - readErr = PostSubReq(xmurl, nbiif) - if readErr == nil { - return nil + writeErr := sdlEngine.WriteAll(fileName, ricData) + if writeErr != nil { + xapp.Logger.Error(writeErr.Error()) + } + + xapp.Logger.Info("Trying to fetch Subscriptions data from Subscription manager") +/* for i := 1; i <= maxRetries; i++ { + readErr = nil + sub_list, err := xapp.Subscription.QuerySubscriptions() + + if sub_list != nil && err == nil { + PopulateSubscription(sub_list) + break + } else { + readErr = err + xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error()) } + time.Sleep(2 * time.Second) + } + + if (readErr != nil) { + return readErr + } +*/ + // post subscription req to appmgr + readErr = PostSubReq(xmurl, nbiif) + if readErr == nil { + return nil + } + return readErr } @@ -841,3 +863,21 @@ func updateSubscription(data *rtmgr.XappList) { } } + +func PopulateSubscription(sub_list xfmodel.SubscriptionList) { + for _, sub_row := range sub_list { + var subdata models.XappSubscriptionData + id := int32(sub_row.SubscriptionID) + subdata.SubscriptionID = &id + for _, ep := range sub_row.Endpoint { + + stringSlice := strings.Split(ep, ":") + subdata.Address = &stringSlice[0] + intportval, _ := strconv.Atoi( stringSlice[1]) + value := uint16(intportval) + subdata.Port = &value + xapp.Logger.Debug("Adding Subscription List has Address :%v, port :%v, SubscriptionID :%v ", subdata.Address, subdata.Address, subdata.SubscriptionID) + addSubscription(&rtmgr.Subs, &subdata) + } + } +} diff --git a/pkg/nbi/httprestful_test.go b/pkg/nbi/httprestful_test.go index 3419dcd..f42a835 100644 --- a/pkg/nbi/httprestful_test.go +++ b/pkg/nbi/httprestful_test.go @@ -68,6 +68,8 @@ var SubscriptionResp = []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventTy var E2TListResp = []byte(`[{"e2tAddress":"127.0.0.1:0","ranNames":["RanM0","RanN0"]},{"e2tAddress":"127.0.0.1:1","ranNames":["RanM1","RanN1"]},{"e2tAddress":"127.0.0.1:2","ranNames":["RanM2","RanN2"]},{"e2tAddress":"127.0.0.1:3","ranNames":["RanM3","RanN3"]}]`) +var SubscriptionList = []byte(`[{"SubscriptionId":11,"Meid":"Test-Gnb","Endpoint":["127.0.0.1:4056"]}]`) + var InvalidSubResp = []byte(`{"Version":0, "EventType":all}`) func TestValidateXappCallbackData_1(t *testing.T) { @@ -549,6 +551,25 @@ func createMockAppmgrWithData(url string, g []byte, p []byte, t []byte) *httptes return ts } +func createMockSubmgrWithData(url string, t []byte) *httptest.Server { + l, err := net.Listen("tcp", url) + if err != nil { + fmt.Println("Failed to create listener: " + err.Error()) + } + ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + if r.Method == "GET" && r.URL.String() == "//ric/v1/subscriptions" { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(t) + } + + })) + ts.Listener.Close() + ts.Listener = l + return ts +} + func createMockPlatformComponents() { var filename = "config.json" file, _ := json.MarshalIndent(stub.ValidPlatformComponents, "", "") @@ -682,6 +703,10 @@ func TestRetrieveStartupData(t *testing.T) { ts1.Start() defer ts1.Close() + ts2 := createMockSubmgrWithData("127.0.0.1:8089", SubscriptionList) + ts2.Start() + defer ts2.Close() + sdlEngine, _ := sdl.GetSdl("file") var httpRestful, _ = GetNbi("httpRESTful") createMockPlatformComponents() diff --git a/pkg/rpe/rmr.go b/pkg/rpe/rmr.go index 860c1eb..346644c 100644 --- a/pkg/rpe/rmr.go +++ b/pkg/rpe/rmr.go @@ -88,20 +88,7 @@ func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents, } rawrt = append(rawrt, key+"newrt|end\n") count := 0 -/* meidrt := key +"meid_map|start\n" - for e2tkey, value := range rcs.E2Ts { - xapp.Logger.Debug("rmr.E2T Key: %v", e2tkey) - xapp.Logger.Debug("rmr.E2T Value: %v", value) - xapp.Logger.Debug("rmr.E2T RAN List: %v", rcs.E2Ts[e2tkey].Ranlist) - if ( len(rcs.E2Ts[e2tkey].Ranlist) != 0 ) { - ranList := strings.Join(rcs.E2Ts[e2tkey].Ranlist, " ") - meidrt += key + "mme_ar|" + e2tkey + "|" + ranList + "\n" - count++ - } else { - xapp.Logger.Debug("rmr.E2T Empty RAN LIST for FQDN: %v", e2tkey) - } - } - meidrt += key+"meid_map|end|" + strconv.Itoa(count) +"\n" */ + meidrt := key +"meid_map|start\n" for _, value := range rcs.MeidMap { meidrt += key + value + "\n" diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go index b5c6961..d26a704 100644 --- a/pkg/rpe/rpe.go +++ b/pkg/rpe/rpe.go @@ -244,6 +244,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr. if len(e2TermEp) > 0 { r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermEp, routeTable, -1, "") r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermEp, routeTable, -1, "") + r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1, "") + r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1, "") } } diff --git a/pkg/rtmgr/types.go b/pkg/rtmgr/types.go index 958ad6d..2846173 100644 --- a/pkg/rtmgr/types.go +++ b/pkg/rtmgr/types.go @@ -53,6 +53,7 @@ type Endpoint struct { Socket interface{} IsReady bool Keepalive bool + Whid int } type RouteTableEntry struct { @@ -135,3 +136,7 @@ type XappList struct { SubscriptionID uint16 FqdnList []FqDn } + +var ( + Rtmgr_ready bool +) diff --git a/pkg/sbi/control.go b/pkg/sbi/control.go deleted file mode 100644 index 67d8eca..0000000 --- a/pkg/sbi/control.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -================================================================================== - Copyright (c) 2019 AT&T Intellectual Property. - Copyright (c) 2019 Nokia - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - This source code is part of the near-RT RIC (RAN Intelligent Controller) - platform project (RICP). - -================================================================================== -*/ -package sbi - -import "C" - -import ( - "errors" - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - "strconv" -) - - -func NewControl() Control { - - return Control{make(chan *xapp.RMRParams)} -} - - -type Control struct { - rcChan chan *xapp.RMRParams -} - - -func (c *Control) Run() { - go c.controlLoop() - xapp.Run(c) -} - -func (c *Control) Consume(rp *xapp.RMRParams) (err error) { - c.rcChan <- rp - return -} - -func (c *Control) controlLoop() { - for { - msg := <-c.rcChan - switch msg.Mtype { - case xapp.RICMessageTypes["RIC_SUB_REQ"]: - xapp.Logger.Info("Message handling when RMR instance queries for Routes") - default: - err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded") - xapp.Logger.Error("Unknown message type: %v", err) - } - } -} - diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index b270abf..4451299 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -43,6 +43,8 @@ package sbi import "C" import ( + "bytes" + "crypto/md5" "errors" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "nanomsg.org/go/mangos/v2" @@ -51,6 +53,7 @@ import ( "routing-manager/pkg/rtmgr" "strconv" "time" + "fmt" ) type NngPush struct { @@ -59,6 +62,18 @@ type NngPush struct { rcChan chan *xapp.RMRParams } +type RMRParams struct { + *xapp.RMRParams +} + + +func (params *RMRParams) String() string { + var b bytes.Buffer + sum := md5.Sum(params.Payload) + fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum) + return b.String() +} + func NewNngPush() *NngPush { instance := new(NngPush) instance.NewSocket = createNewPushSocket @@ -102,28 +117,25 @@ func (c *NngPush) Terminate() error { } func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error { - var err error - var socket NngSocket + xapp.Logger.Debug("Invoked sbi.AddEndpoint") xapp.Logger.Debug("args: %v", *ep) - socket, err = c.NewSocket() - if err != nil { - return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error()) - } - ep.Socket = socket - err = c.dial(ep) - if err != nil { - return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error()) + endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber) + ep.Whid = int(xapp.Rmr.Openwh(endpoint)) + if ep.Whid < 0 { + return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid)) + }else { + xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint) } + return nil } func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error { xapp.Logger.Debug("Invoked sbi. DeleteEndpoint") xapp.Logger.Debug("args: %v", *ep) - if err := ep.Socket.(NngSocket).Close(); err != nil { - return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error()) - } + + xapp.Rmr.Closewh(ep.Whid) return nil } @@ -148,28 +160,40 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error { func (c *NngPush) DistributeAll(policies *[]string) error { xapp.Logger.Debug("Invoked: sbi.DistributeAll") xapp.Logger.Debug("args: %v", *policies) + for _, ep := range rtmgr.Eps { - i := 1 - for i < 5 { - if ep.IsReady { - go c.send(ep, policies) - break - } else { - xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i)) - time.Sleep(10 * time.Millisecond) - i++ - } - } + go c.send(ep, policies) } + return nil } func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) { xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid) + for _, pe := range *policies { - if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil { - xapp.Logger.Error("Unable to send policy entry due to: " + err.Error()) - } + params := &RMRParams{&xapp.RMRParams{}} + params.Mtype = 20 + params.PayloadLen = len([]byte(pe)) + params.Payload =[]byte(pe) + params.Mbuf = nil + params.Whid = ep.Whid + time.Sleep(1 * time.Millisecond) + xapp.Rmr.SendMsg(params.RMRParams) } xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")") } + +func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){ + return c.createEndpoint(payload, c) +} + +func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error { + xapp.Logger.Debug("Invoked: sbi.DistributeToEp") + xapp.Logger.Debug("args: %v", *policies) + + go c.send(ep, policies) + + return nil +} + diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go index 9d1380e..58fe7d8 100644 --- a/pkg/sbi/sbi.go +++ b/pkg/sbi/sbi.go @@ -171,3 +171,18 @@ func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) } } } + +func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) { + xapp.Logger.Debug("CreateEndPoint %v", payload) + stringSlice := strings.Split(payload, " ") + uuid := stringSlice[0] + xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) + + + if _, ok := rtmgr.Eps[uuid]; ok { + ep := rtmgr.Eps[uuid] + return ep + } + + return nil +} diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go index d024e94..c0ab373 100644 --- a/pkg/sbi/types.go +++ b/pkg/sbi/types.go @@ -45,6 +45,8 @@ type Engine interface { AddEndpoint(*rtmgr.Endpoint) error DeleteEndpoint(*rtmgr.Endpoint) error UpdateEndpoints(*rtmgr.RicComponents) + CreateEndpoint(string) (*rtmgr.Endpoint) + DistributeToEp(*[]string, *rtmgr.Endpoint) error } type NngSocket interface { -- 2.16.6