Reverting the code after releasing the image 65/3065/1
authorwahidw <abdulwahid.w@nokia.com>
Tue, 31 Mar 2020 03:09:45 +0000 (03:09 +0000)
committerwahidw <abdulwahid.w@nokia.com>
Tue, 31 Mar 2020 03:11:40 +0000 (03:11 +0000)
Change-Id: Ie8c567f805ccb3dc404d512b0a891f38c11d772d
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
16 files changed:
Dockerfile
RELNOTES
cmd/rtmgr.go
container-tag.yaml
go.mod
manifests/rtmgr/rtmgr-cfg.yaml
pkg/nbi/control.go [new file with mode: 0644]
pkg/nbi/httprestful.go
pkg/nbi/httprestful_test.go
pkg/rpe/rmr.go
pkg/rpe/rpe.go
pkg/rtmgr/types.go
pkg/sbi/control.go [deleted file]
pkg/sbi/nngpush.go
pkg/sbi/sbi.go
pkg/sbi/types.go

index 90e4404..7db5111 100644 (file)
 FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:4-u18.04-nng as rtmgrbuild
 
 # Install RMr shared library
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.13.1_amd64.deb/download.deb && dpkg -i rmr_1.13.1_amd64.deb && rm -rf rmr_1.13.1_amd64.deb
+ARG RMRVERSION=3.6.0
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb
 # Install RMr development header files
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.13.1_amd64.deb/download.deb && dpkg -i rmr-dev_1.13.1_amd64.deb && rm -rf rmr-dev_1.13.1_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb
 
 ENV GOLANG_VERSION 1.12.1
 RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \
@@ -62,17 +63,20 @@ COPY go.mod go.mod
 COPY pkg pkg
 COPY cmd cmd
 COPY run_rtmgr.sh /run_rtmgr.sh
+RUN mkdir manifests
+COPY manifests/ /go/src/routing-manager/manifests
 #RUN go mod download 
 #RUN /usr/local/go/bin/go mod tidy
 ENV GOPATH /go
 
 ENV GOBIN /go/bin
 RUN go install ./cmd/rtmgr.go
+CMD /bin/bash
 
 # UT intermediate container
-FROM rtmgrbuild as rtmgrut
-RUN ldconfig
-RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "./manifests/rtmgr/rtmgr-cfg.yaml" -cover -race
+#FROM rtmgrbuild as rtmgrut
+#RUN ldconfig
+#RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "/go/src/routing-manager/manifests/rtmgr/rtmgr-cfg.yaml" -cover -race
 
 # Final, executable container
 FROM ubuntu:16.04
index b497112..dc50594 100644 (file)
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,15 @@
+### v0.5.3
+* RMR updated to v3.6.0 with support for E2 Setup message types
+
+### v0.5.2
+* Switch to RMR Si95 mode 
+
+### v0.5.1
+* Removal of go mangoes and using RMR nng
+
+### v0.4.16
+* getAllSubscriptions API (RM -> SM) during restart of routing manager handled
+
 ### v0.4.15
 * Retained (E2M->E2T issue) - retrying when is_Ready flag in socket handle is false 
 
index 2bb7f8d..63b67b7 100644 (file)
@@ -64,6 +64,8 @@ func initRtmgr() (nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engi
        return nil, nil, nil, nil, err
 }
 
+
+
 func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
        for {
                if <-triggerSBI {
@@ -84,6 +86,23 @@ func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine
        }
 }
 
+func sendRoutesToAll(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) {
+
+       data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
+       if err != nil || data == nil {
+               xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
+               return
+       }
+       sbiEngine.UpdateEndpoints(data)
+       policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
+       err = sbiEngine.DistributeAll(policies)
+       if err != nil {
+               xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
+               return
+       }
+}
+
+
 func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
 
        triggerSBI := make(chan bool)
@@ -116,8 +135,9 @@ func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpe
                        }
                }
 
-               triggerSBI <- true
+               sendRoutesToAll(sbiEngine, sdlEngine, rpeEngine)
 
+               rtmgr.Rtmgr_ready = true
                time.Sleep(INTERVAL * time.Second)
                xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
        }
@@ -134,19 +154,33 @@ func SetupCloseHandler() {
 }
 
 func main() {
+
        nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr()
        if err != nil {
                xapp.Logger.Error(err.Error())
                os.Exit(1)
        }
+
        SetupCloseHandler()
+
        xapp.Logger.Info("Start " + SERVICENAME + " service")
        rtmgr.Eps = make(rtmgr.Endpoints)
+       rtmgr.Rtmgr_ready = false
 
        var m sync.Mutex
 
-       c := sbi.NewControl()
-       go c.Run()
+// RMR thread is starting port: 4560
+       c := nbi.NewControl()
+       go c.Run(sbiEngine, sdlEngine, rpeEngine, &m)
+
+// Waiting for RMR to be ready
+       time.Sleep(time.Duration(2) * time.Second)
+       for xapp.Rmr.IsReady() == false {
+               time.Sleep(time.Duration(2) * time.Second)
+       }
+
+       dummy_whid := int(xapp.Rmr.Openwh("localhost:4560"))
+       xapp.Logger.Info("created dummy Wormhole ID for routingmanager and dummy_whid :%d", dummy_whid)
 
        serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m)
        os.Exit(0)
index c754d3c..7b6e635 100644 (file)
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.4.15
+tag: 0.5.4
diff --git a/go.mod b/go.mod
index e133c89..a6e285e 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -3,13 +3,23 @@ module routing-manager
 go 1.12.1
 
 require (
-       gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.24
+       gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.4.4
+       github.com/ghodss/yaml v1.0.0
+       github.com/go-openapi/errors v0.19.3
+       github.com/go-openapi/loads v0.19.4
+       github.com/go-openapi/runtime v0.19.4
+       github.com/go-openapi/spec v0.19.3
+       github.com/go-openapi/strfmt v0.19.4
+       github.com/go-openapi/swag v0.19.7
+       github.com/go-openapi/validate v0.19.6
+       github.com/jessevdk/go-flags v1.4.0
+       golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297
        nanomsg.org/go/mangos/v2 v2.0.5
 )
 
-replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo => gerrit.o-ran-sc.org/r/ric-plt/sdlgo.git v0.2.0
+replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo => gerrit.o-ran-sc.org/r/ric-plt/sdlgo.git v0.5.2
 
-replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.0.24
+replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.4.4
 
 replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.1
 
index 5fbc273..55aab19 100644 (file)
@@ -65,3 +65,5 @@ data:
       "maxSize": 2072
       "numWorkers": 1
     }
+subscription:
+ host: "127.0.0.1:8089"
diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go
new file mode 100644 (file)
index 0000000..8d9e6c4
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
+==================================================================================
+*/
+package nbi
+
+import "C"
+
+import (
+       "errors"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "routing-manager/pkg/rpe"
+       "routing-manager/pkg/rtmgr"
+       "routing-manager/pkg/sbi"
+       "routing-manager/pkg/sdl"
+       "strconv"
+       "sync"
+)
+
+func NewControl() Control {
+
+       return Control{make(chan *xapp.RMRParams)}
+}
+
+type Control struct {
+       rcChan chan *xapp.RMRParams
+}
+
+func (c *Control) Run(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
+       go c.controlLoop(sbiEngine, sdlEngine, rpeEngine, m)
+       xapp.Run(c)
+}
+
+func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
+       c.rcChan <- rp
+       return
+}
+
+func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
+       for {
+               msg := <-c.rcChan
+               xapp_msg := sbi.RMRParams{msg}
+               switch msg.Mtype {
+               case xapp.RICMessageTypes["RMRRM_REQ_TABLE"]:
+                       if rtmgr.Rtmgr_ready == false {
+                               xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready")
+                       } else {
+                               xapp.Logger.Info("Update Route Table Request(RMR to RM)")
+                               go c.handleUpdateToRoutingManagerRequest(msg, sbiEngine, sdlEngine, rpeEngine, m)
+                       }
+               case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]:
+                       xapp.Logger.Info("state of table to route mgr %s,payload %s", xapp_msg.String(), msg.Payload)
+
+               default:
+                       err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
+                       xapp.Logger.Error("Unknown message type: %v", err)
+               }
+       }
+}
+
+func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
+
+       msg := sbi.RMRParams{params}
+
+       xapp.Logger.Info("Update Route Table Request, msg.String() : %s", msg.String())
+       xapp.Logger.Info("Update Route Table Request, params.Payload : %s", string(params.Payload))
+
+       m.Lock()
+       data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
+       m.Unlock()
+       if err != nil || data == nil {
+               xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
+               return
+       }
+
+       ep := sbiEngine.CreateEndpoint(string(params.Payload))
+       if ep == nil {
+               xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
+               return
+       }
+
+       policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
+       err = sbiEngine.DistributeToEp(policies, ep)
+       if err != nil {
+               xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
+               return
+       }
+}
index b07a8db..e764467 100644 (file)
@@ -36,6 +36,7 @@ import (
        "errors"
        "fmt"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       xfmodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
        "github.com/go-openapi/loads"
        "github.com/go-openapi/runtime/middleware"
        "net"
@@ -579,25 +580,25 @@ func PopulateE2TMap(e2tDataList *[]rtmgr.E2tIdentity, e2ts map[string]rtmgr.E2TI
 
 func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, e2murl string, sdlEngine sdl.Engine) error {
        xapp.Logger.Info("Invoked retrieveStartupData ")
-        var readErr error
-        var maxRetries = 10
+    var readErr error
+   var maxRetries = 10
        var xappData *[]rtmgr.XApp
        xappData = new([]rtmgr.XApp)
        xapp.Logger.Info("Trying to fetch XApps data from XAPP manager")
-        for i := 1; i <= maxRetries; i++ {
-                time.Sleep(2 * time.Second)
+    for i := 1; i <= maxRetries; i++ {
+       time.Sleep(2 * time.Second)
 
-               readErr = nil
-                xappData, err := httpGetXApps(xmurl)
-                if xappData != nil && err == nil {
+          readErr = nil
+       xappData, err := httpGetXApps(xmurl)
+       if xappData != nil && err == nil {
                        break
-                } else if err == nil {
-                        readErr = errors.New("unexpected HTTP status code")
-                } else {
-                        xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
-                        readErr = err
-                }
-        }
+       } else if err == nil {
+               readErr = errors.New("unexpected HTTP status code")
+       } else {
+               xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
+               readErr = err
+       }
+    }
 
        if ( readErr != nil) {
                return readErr
@@ -607,7 +608,6 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile
        e2ts := make(map[string]rtmgr.E2TInstance)
        xapp.Logger.Info("Trying to fetch E2T data from E2manager")
         for i := 1; i <= maxRetries; i++ {
-                time.Sleep(2 * time.Second)
 
                readErr = nil
                e2tDataList, err := httpGetE2TList(e2murl)
@@ -620,6 +620,7 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile
                        xapp.Logger.Warn("cannot get E2T data from E2M due to: " + err.Error())
                        readErr = err
                }
+       time.Sleep(2 * time.Second)
        }
 
        if ( readErr != nil) {
@@ -627,22 +628,43 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile
        }
 
        pcData, confErr := rtmgr.GetPlatformComponents(configfile)
-        if confErr != nil {
-                xapp.Logger.Error(confErr.Error())
-                return confErr
-        }
-        xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.")
-        // Combine the xapps data and platform data before writing to the SDL
+    if confErr != nil {
+            xapp.Logger.Error(confErr.Error())
+            return confErr
+    }
+    xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.")
+    // Combine the xapps data and platform data before writing to the SDL
        ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: e2ts, MeidMap: meids}
-        writeErr := sdlEngine.WriteAll(fileName, ricData)
-        if writeErr != nil {
-                xapp.Logger.Error(writeErr.Error())
-        }
-        // post subscription req to appmgr
-        readErr = PostSubReq(xmurl, nbiif)
-        if readErr == nil {
-                return nil
+    writeErr := sdlEngine.WriteAll(fileName, ricData)
+    if writeErr != nil {
+            xapp.Logger.Error(writeErr.Error())
+    }
+
+       xapp.Logger.Info("Trying to fetch Subscriptions data from Subscription manager")
+/*    for i := 1; i <= maxRetries; i++ {
+               readErr = nil
+               sub_list, err := xapp.Subscription.QuerySubscriptions()
+
+               if sub_list != nil && err == nil {
+                       PopulateSubscription(sub_list)
+                       break
+               } else {
+                       readErr = err
+                       xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error())
         }
+               time.Sleep(2 * time.Second)
+       }
+
+       if (readErr != nil) {
+        return readErr
+       }
+*/
+    // post subscription req to appmgr
+    readErr = PostSubReq(xmurl, nbiif)
+    if readErr == nil {
+            return nil
+    }
+
        return readErr
 }
 
@@ -841,3 +863,21 @@ func updateSubscription(data *rtmgr.XappList) {
        }
 
 }
+
+func PopulateSubscription(sub_list xfmodel.SubscriptionList) {
+       for _, sub_row := range sub_list {
+               var subdata models.XappSubscriptionData
+               id := int32(sub_row.SubscriptionID)
+               subdata.SubscriptionID = &id
+               for _, ep := range sub_row.Endpoint {
+
+                       stringSlice := strings.Split(ep, ":")
+                       subdata.Address = &stringSlice[0]
+                       intportval, _ := strconv.Atoi( stringSlice[1])
+                       value := uint16(intportval)
+                       subdata.Port = &value
+                       xapp.Logger.Debug("Adding Subscription List has Address :%v, port :%v, SubscriptionID :%v ", subdata.Address, subdata.Address, subdata.SubscriptionID)
+                       addSubscription(&rtmgr.Subs, &subdata)
+               }
+       }
+}
index 3419dcd..f42a835 100644 (file)
@@ -68,6 +68,8 @@ var SubscriptionResp = []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventTy
 
 var E2TListResp = []byte(`[{"e2tAddress":"127.0.0.1:0","ranNames":["RanM0","RanN0"]},{"e2tAddress":"127.0.0.1:1","ranNames":["RanM1","RanN1"]},{"e2tAddress":"127.0.0.1:2","ranNames":["RanM2","RanN2"]},{"e2tAddress":"127.0.0.1:3","ranNames":["RanM3","RanN3"]}]`)
 
+var SubscriptionList = []byte(`[{"SubscriptionId":11,"Meid":"Test-Gnb","Endpoint":["127.0.0.1:4056"]}]`)
+
 var InvalidSubResp = []byte(`{"Version":0, "EventType":all}`)
 
 func TestValidateXappCallbackData_1(t *testing.T) {
@@ -549,6 +551,25 @@ func createMockAppmgrWithData(url string, g []byte, p []byte, t []byte) *httptes
        return ts
 }
 
+func createMockSubmgrWithData(url string, t []byte) *httptest.Server {
+       l, err := net.Listen("tcp", url)
+       if err != nil {
+               fmt.Println("Failed to create listener: " + err.Error())
+       }
+       ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+
+               if r.Method == "GET" && r.URL.String() == "//ric/v1/subscriptions" {
+                       w.Header().Add("Content-Type", "application/json")
+                       w.WriteHeader(http.StatusOK)
+                       w.Write(t)
+               }
+
+       }))
+       ts.Listener.Close()
+       ts.Listener = l
+       return ts
+}
+
 func createMockPlatformComponents() {
        var filename = "config.json"
        file, _ := json.MarshalIndent(stub.ValidPlatformComponents, "", "")
@@ -682,6 +703,10 @@ func TestRetrieveStartupData(t *testing.T) {
        ts1.Start()
        defer ts1.Close()
 
+       ts2 := createMockSubmgrWithData("127.0.0.1:8089", SubscriptionList)
+       ts2.Start()
+       defer ts2.Close()
+
        sdlEngine, _ := sdl.GetSdl("file")
        var httpRestful, _ = GetNbi("httpRESTful")
        createMockPlatformComponents()
index 860c1eb..346644c 100644 (file)
@@ -88,20 +88,7 @@ func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents,
        }
        rawrt = append(rawrt, key+"newrt|end\n")
         count := 0
-/*     meidrt := key +"meid_map|start\n"
-       for e2tkey, value := range rcs.E2Ts {
-               xapp.Logger.Debug("rmr.E2T Key: %v", e2tkey)
-               xapp.Logger.Debug("rmr.E2T Value: %v", value)
-               xapp.Logger.Debug("rmr.E2T RAN List: %v", rcs.E2Ts[e2tkey].Ranlist)
-               if ( len(rcs.E2Ts[e2tkey].Ranlist) != 0 ) {
-                       ranList := strings.Join(rcs.E2Ts[e2tkey].Ranlist, " ")
-                       meidrt += key + "mme_ar|" + e2tkey + "|" + ranList + "\n"
-                       count++
-               } else {
-               xapp.Logger.Debug("rmr.E2T Empty RAN LIST for FQDN: %v", e2tkey)
-               }
-       }
-        meidrt += key+"meid_map|end|" + strconv.Itoa(count) +"\n" */
+
        meidrt := key +"meid_map|start\n"
         for _, value := range rcs.MeidMap {
             meidrt += key + value + "\n"
index b5c6961..d26a704 100644 (file)
@@ -244,6 +244,8 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.
        if len(e2TermEp) > 0 {
                r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermEp, routeTable, -1, "")
                r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermEp, routeTable, -1, "")
+               r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1, "")
+               r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1, "")
        }
 }
 
index 958ad6d..2846173 100644 (file)
@@ -53,6 +53,7 @@ type Endpoint struct {
        Socket     interface{}
        IsReady    bool
        Keepalive  bool
+       Whid       int
 }
 
 type RouteTableEntry struct {
@@ -135,3 +136,7 @@ type XappList struct {
         SubscriptionID  uint16
        FqdnList []FqDn
 }
+
+var (
+       Rtmgr_ready bool
+)
diff --git a/pkg/sbi/control.go b/pkg/sbi/control.go
deleted file mode 100644 (file)
index 67d8eca..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-==================================================================================
-  Copyright (c) 2019 AT&T Intellectual Property.
-  Copyright (c) 2019 Nokia
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-
-   This source code is part of the near-RT RIC (RAN Intelligent Controller)
-   platform project (RICP).
-
-==================================================================================
-*/
-package sbi
-
-import "C"
-
-import (
-        "errors"
-        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-        "strconv"
-)
-
-
-func NewControl() Control {
-
-        return Control{make(chan *xapp.RMRParams)}
-}
-
-
-type Control struct {
-        rcChan      chan *xapp.RMRParams
-}
-
-
-func (c *Control) Run() {
-        go c.controlLoop()
-        xapp.Run(c)
-}
-
-func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
-        c.rcChan <- rp
-        return
-}
-
-func (c *Control) controlLoop() {
-        for {
-                msg := <-c.rcChan
-                switch msg.Mtype {
-                case xapp.RICMessageTypes["RIC_SUB_REQ"]:
-                       xapp.Logger.Info("Message handling when RMR instance queries for Routes")
-                default:
-                        err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
-                        xapp.Logger.Error("Unknown message type: %v", err)
-                }
-        }
-}
-
index b270abf..4451299 100644 (file)
@@ -43,6 +43,8 @@ package sbi
 import "C"
 
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "nanomsg.org/go/mangos/v2"
@@ -51,6 +53,7 @@ import (
        "routing-manager/pkg/rtmgr"
        "strconv"
        "time"
+       "fmt"
 )
 
 type NngPush struct {
@@ -59,6 +62,18 @@ type NngPush struct {
        rcChan    chan *xapp.RMRParams
 }
 
+type RMRParams struct {
+        *xapp.RMRParams
+}
+
+
+func (params *RMRParams) String() string {
+        var b bytes.Buffer
+        sum := md5.Sum(params.Payload)
+        fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
+        return b.String()
+}
+
 func NewNngPush() *NngPush {
        instance := new(NngPush)
        instance.NewSocket = createNewPushSocket
@@ -102,28 +117,25 @@ func (c *NngPush) Terminate() error {
 }
 
 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
-       var err error
-       var socket NngSocket
+
        xapp.Logger.Debug("Invoked sbi.AddEndpoint")
        xapp.Logger.Debug("args: %v", *ep)
-       socket, err = c.NewSocket()
-       if err != nil {
-               return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
-       }
-       ep.Socket = socket
-       err = c.dial(ep)
-       if err != nil {
-               return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
+       endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
+       ep.Whid = int(xapp.Rmr.Openwh(endpoint))
+       if ep.Whid < 0   {
+               return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
+       }else {
+               xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
        }
+
        return nil
 }
 
 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
        xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
        xapp.Logger.Debug("args: %v", *ep)
-       if err := ep.Socket.(NngSocket).Close(); err != nil {
-               return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
-       }
+
+       xapp.Rmr.Closewh(ep.Whid)
        return nil
 }
 
@@ -148,28 +160,40 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
 func (c *NngPush) DistributeAll(policies *[]string) error {
        xapp.Logger.Debug("Invoked: sbi.DistributeAll")
        xapp.Logger.Debug("args: %v", *policies)
+
        for _, ep := range rtmgr.Eps {
-               i := 1
-               for i < 5 {
-                       if ep.IsReady {
-                               go c.send(ep, policies)
-                               break
-                       } else {
-                               xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
-                               time.Sleep(10 * time.Millisecond)
-                               i++
-                       }
-               }
+               go c.send(ep, policies)
        }
+
        return nil
 }
 
 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
        xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+
        for _, pe := range *policies {
-               if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
-                       xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
-               }
+               params := &RMRParams{&xapp.RMRParams{}}
+               params.Mtype = 20
+               params.PayloadLen = len([]byte(pe))
+               params.Payload =[]byte(pe)
+               params.Mbuf = nil
+               params.Whid = ep.Whid
+               time.Sleep(1 * time.Millisecond)
+               xapp.Rmr.SendMsg(params.RMRParams)
        }
        xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
 }
+
+func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
+       return c.createEndpoint(payload, c)
+}
+
+func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
+       xapp.Logger.Debug("args: %v", *policies)
+
+       go c.send(ep, policies)
+
+       return nil
+}
+
index 9d1380e..58fe7d8 100644 (file)
@@ -171,3 +171,18 @@ func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine)
                 }
         }
 }
+
+func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) {
+       xapp.Logger.Debug("CreateEndPoint %v", payload)
+       stringSlice := strings.Split(payload, " ")
+       uuid := stringSlice[0]
+       xapp.Logger.Debug(">>> uuid %v",  stringSlice[0])
+
+
+       if _, ok := rtmgr.Eps[uuid]; ok {
+               ep := rtmgr.Eps[uuid]
+               return ep
+       }
+
+       return nil
+}
index d024e94..c0ab373 100644 (file)
@@ -45,6 +45,8 @@ type Engine interface {
        AddEndpoint(*rtmgr.Endpoint) error
        DeleteEndpoint(*rtmgr.Endpoint) error
        UpdateEndpoints(*rtmgr.RicComponents)
+       CreateEndpoint(string) (*rtmgr.Endpoint)
+       DistributeToEp(*[]string, *rtmgr.Endpoint) error
 }
 
 type NngSocket interface {