Add version v0.1.0 82/82/1
authorPeter Szilagyi <peter.3.szilagyi@nokia.com>
Wed, 24 Apr 2019 14:51:02 +0000 (14:51 +0000)
committerPeter Szilagyi <peter.3.szilagyi@nokia.com>
Wed, 24 Apr 2019 14:54:48 +0000 (14:54 +0000)
* Introduces NNGPush SBI module
* Bugfixes in argument handling
* different RMR policy generator for Push and Pub communication

Change-Id: I2907c0649619d1a02da6d15b41bc2ad668c6245b
Signed-off-by: Peter Szilagyi <peter.3.szilagyi@nokia.com>
17 files changed:
README.md
RELNOTES
build/container/run_rtmgr.sh
cmd/rtmgr.go
manifests/rtmgr/rtmgr-dep.yaml
pkg/glide.lock
pkg/nbi/nbi.go
pkg/rpe/rmr.go
pkg/rpe/rpe.go
pkg/rpe/types.go
pkg/rtmgr/rtmgr.go
pkg/rtmgr/types.go
pkg/sbi/nngpub.go
pkg/sbi/nngpush.go [new file with mode: 0644]
pkg/sbi/sbi.go
pkg/sbi/types.go
pkg/sdl/sdl.go

index 41a767d..2cc2000 100644 (file)
--- a/README.md
+++ b/README.md
-# Routing Manager\r
-\r
-## Table of contents\r
-* [Introduction](#introduction)\r
-* [Release notes](#release-notes)\r
-* [Prerequisites](#prerequisites)\r
-* [Project folders structure](#project-folders-structure)\r
-* [Installation guide](#installation-guide)\r
-  * [Compiling code](#compiling-code)\r
-  * [Building docker container](#building-docker-container)\r
-  * [Installing Routing Manager](#installing-routing-manager)\r
-  * [Testing and Troubleshoting](#testing-and-troubleshoting)\r
-* [Upcoming changes](#upcoming-changes)\r
-* [License](#license)\r
-\r
-## Introduction\r
-__Routing Manager__ is a basic platform serive of RIC. It is responsible for distributing routing policies among the other platform components and xApps.\r
-\r
-The implemented logic periodically queries the xApp Manager component for xApps' list. Stores the data then processes it to create routing policies and distributes them to all xApps.\r
-The architecture consists of the following five well defined functions:\r
-* NorthBound Interface (__NBI__): Maintains the communication channels towards RIC manager components \r
-* Routing Policy Engine (__RPE__): Provides the logic to calculate routing policies\r
-* Shared Data Layer (__SDL__): Provides access to different kind persistent data stores\r
-* SouthBound Interface (__SBI__): Maintains the communication channels towards RIC tenants and control components\r
-* Controll Logic (__RTMGR__): Controls the operatin of above functions\r
-\r
-Current implementation provides support for the followings:\r
-* NBI:\r
-  * __httpGet__: simple HTTP GET interface. Expects an URL where it gets the xApps' list in JSON format\r
-  * (WIP) __httRESTful__: provides REST API endpoints towards RIC manager components \r
-* RPE:\r
-  * __rmr__: creates routing policies formatted for RIC RMR\r
-* SDL:\r
-  * __file__: stores xApp data in container's local filesystem (or in a mountpoint)\r
-  * (backlog) __sdl__: Shared Data Library to Redis database\r
-* SBI:\r
-  * __nngpub__: distributes RPE created policies via NNG Pub channel\r
-  * (WIP) __nngpipe__: distributes RPE created policies via NNG Pipeline channel\r
-\r
-## Release notes\r
-Check the separated `RELNOTES` file.\r
-\r
-## Prerequisites\r
-* Healthy kubernetes cluster\r
-* Access to the common docker registry\r
-\r
-## Project folder structure\r
-* /api: contains swagger source files\r
-* /build: contains build tools (scripts, Dockerfiles, etc.)\r
-* /manifest: contains deployment files (kubernetes manifests, helm chart)\r
-* /cmd: contains go project's main file\r
-* /pkg: contains go project's internal packages\r
-* /test: contains CI/CD testing files (scripts, mocks, manifests)\r
-\r
-## Installation guide\r
-\r
-### Compiling code\r
-Enter the project root and execute `./build.sh` script.\r
-The build script has two main phases. First is the code compilation, where it creates a temporary container for downloading all dependencies then compiles the code. In the second phase it builds the production ready container and taggs it to `rtmgr:builder`\r
-**NOTE:** The script puts a copy of the binary into the `./bin` folder for further use cases\r
-\r
-### Installing Routing Manager\r
-#### Preparing environment\r
-Re-Tag the `rtmgr` container according to the project release and push it to a registry accessible from all minions of the kubernetes cluster.\r
-Edit the container image section of `rtmgr-dep.yaml` file according to the `rtmgr` image tag\r
-\r
-#### Deploying Routing Manager \r
-Issue the `kubectl create -f {manifest.yaml}` command in the following order\r
-  1. `manifests/namespaces.yaml`: creates the `example` namespace for routing-manager resources\r
-  2. `manifests/rtmgr/rtmgr-dep.yaml`: instantiates the `rtmgr` deployment in the `example` namespace\r
-  3. `manifests/rtmgr/rtmgr-svc.yaml`: creates the `rtmgr` service in `example` namespace\r
-\r
-### Testing and Troubleshoting\r
-Routing Manager's behaviour can be tested using the mocked xApp Manager, traffic generator xApp and receiver xApp.\r
-\r
-  1. Checkout and compile both xApp receiver and xApp Tx generator of RIC admission control project\r
-  2. Copy the `adm-ctrl-xapp` binary to `./test/docker/xapp.build` folder. Enter the folder and issue `docker build .`. Tag the recently created docker image and push it to the common registry.\r
-  3. Copy the `test-tx` binary to `./test/docker/xapp-tx.build` folder. Enter the folder and issue `docker build .`.  Tag the recently created docker image and push it to the common registry.\r
-  4. Enter the `./test/docker/xmgr.build` folder and issue `docker build .`.  Tag the recently created docker image and push it to the common registry.\r
-  5. Modify the the docker image version in each kuberbetes manifest files under `./test/kubernetes/` folder accordingly then issue the `kubectl create -f {manifest.yaml}` on each file.\r
-  6. [Compile](#compiling-code) and [Install routing manager](#installing-routing-manager)\r
-\r
-#### Command line arguments\r
-Routing manager binary can be called with `-h` flag when it displays the available command line arguments and it's default value.\r
-\r
-Example:\r
-\r
-```bash\r
-root@a3684ff4cdb0:/# ./rtmgr -h\r
-Usage of ./rtmgr:\r
-  -loglevel string\r
-        INFO | WARN | ERROR | DEBUG (default "INFO")\r
-  -nbi-httpget string\r
-        xApp Manager URL (default "http://localhost:3000/xapps")\r
-  -rpe string\r
-        Policy Engine Module name (default "rmr")\r
-  -sbi-nngsub string\r
-        NNG Subsciption Socket URI (default "tcp://0.0.0.0:4560")\r
-  -sdl-file string\r
-        Local file store location (default "/db/rt.json")\r
-```\r
-\r
-For troubleshooting purpose the default logging level can be increased to `DEBUG`.\r
-\r
-## Upcoming changes\r
-[] Add RESTful NBI based on swagger api definition\r
-\r
-[] Support RMR Pipeline\r
-\r
-[] Add unit tests\r
-\r
-## License\r
-This project is licensed under the Apache License, Version 2.0 - see the [LICENSE](LICENSE)\r
-\r
+# Routing Manager
+
+## Table of contents
+* [Introduction](#introduction)
+* [Release notes](#release-notes)
+* [Prerequisites](#prerequisites)
+* [Project folders structure](#project-folders-structure)
+* [Installation guide](#installation-guide)
+  * [Compiling code](#compiling-code)
+  * [Building docker container](#building-docker-container)
+  * [Installing Routing Manager](#installing-routing-manager)
+  * [Testing and Troubleshoting](#testing-and-troubleshoting)
+* [Upcoming changes](#upcoming-changes)
+* [License](#license)
+
+## Introduction
+__Routing Manager__ is a basic platform service of RIC. It is responsible for distributing routing policies among the other platform components and xApps.
+
+The implemented logic periodically queries the xApp Manager component for xApps' list. Stores the data then processes it to create routing policies and distributes them to all xApps.
+The architecture consists of the following five well defined functions:
+* NorthBound Interface (__NBI__): Maintains the communication channels towards RIC manager components 
+* Routing Policy Engine (__RPE__): Provides the logic to calculate routing policies
+* Shared Data Layer (__SDL__): Provides access to different kind persistent data stores
+* SouthBound Interface (__SBI__): Maintains the communication channels towards RIC tenants and control components
+* Control Logic (__RTMGR__): Controls the operatin of above functions
+
+Current implementation provides support for the followings:
+* NBI:
+  * __httpGet__: simple HTTP GET interface. Expects an URL where it gets the xApps' list in JSON format
+  * (WIP) __httRESTful__: provides REST API endpoints towards RIC manager components 
+* RPE:
+  * __rmr__: creates routing policies formatted for RIC RMR
+* SDL:
+  * __file__: stores xApp data in container's local filesystem (or in a mountpoint)
+  * (backlog) __sdl__: Shared Data Library to Redis database
+* SBI:
+  * __nngpub__: distributes RPE created policies via NNG Pub channel
+  * (WIP) __nngpipe__: distributes RPE created policies via NNG Pipeline channel
+
+## Release notes
+Check the separated `RELNOTES` file.
+
+## Prerequisites
+* Healthy kubernetes cluster
+* Access to the common docker registry
+
+## Project folder structure
+* /api: contains Swagger source files
+* /build: contains build tools (scripts, Dockerfiles, etc.)
+* /manifest: contains deployment files (Kubernetes manifests, Helm chart)
+* /cmd: contains go project's main file
+* /pkg: contains go project's internal packages
+* /test: contains CI/CD testing files (scripts, mocks, manifests)
+
+## Installation guide
+
+### Compiling code
+Enter the project root and execute `./build.sh` script.
+The build script has two main phases. First is the code compilation, where it creates a temporary container for downloading all dependencies then compiles the code. In the second phase it builds the production ready container and taggs it to `rtmgr:builder`
+
+**NOTE:** The script puts a copy of the binary into the `./bin` folder for further use cases
+
+### Installing Routing Manager
+#### Preparing environment
+Tag the `rtmgr` container according to the project release and push it to a registry accessible from all minions of the Kubernetes cluster.
+Edit the container image section of `rtmgr-dep.yaml` file according to the `rtmgr` image tag.
+
+#### Deploying Routing Manager 
+Issue the `kubectl create -f {manifest.yaml}` command in the following order
+  1. `manifests/namespace.yaml`: creates the `example` namespace for routing-manager resources
+  2. `manifests/rtmgr/rtmgr-dep.yaml`: instantiates the `rtmgr` deployment in the `example` namespace
+  3. `manifests/rtmgr/rtmgr-svc.yaml`: creates the `rtmgr` service in `example` namespace
+
+### Testing and Troubleshoting
+Routing Manager's behaviour can be tested using the mocked xApp Manager, traffic generator xApp and receiver xApp.
+
+  1. Checkout and compile both xApp receiver and xApp Tx generator of RIC admission control project
+  2. Copy the `adm-ctrl-xapp` binary to `./test/docker/xapp.build` folder furthermore copy all RMR related dinamycally linked library under `./test/docker/xapp.build/usr` folder. Issue `docker build ./test/docker/xapp.build` command. Tag the recently created docker image and push it to the common registry.
+  3. Copy the `test-tx` binary to `./test/docker/xapp-tx.build` folder furthermore copy all RMR related dinamycally linked library under `./test/docker/xapp.build/usr` folder. Issue `docker build ./test/docker/xapp-tx.build` command.  Tag the recently created docker image and push it to the common registry.
+  4. Enter the `./test/docker/xmgr.build` folder and issue `docker build .`.  Tag the recently created docker image and push it to the common registry.
+  5. Modify the docker image version in each kuberbetes manifest files under `./test/kubernetes/` folder accordingly then issue the `kubectl create -f {manifest.yaml}` on each file.
+  6. [Compile](#compiling-code) and [Install routing manager](#installing-routing-manager)
+
+#### Command line arguments
+Routing manager binary can be called with `-h` flag when it displays the available command line arguments and it's default value.
+
+Example:
+
+```bash
+Usage of ./rtmgr:
+  -filename string
+        Absolute path of file where the route information to be stored (default "/db/rt.json")
+  -loglevel string
+        INFO | WARN | ERROR | DEBUG (default "INFO")
+  -nbi string
+        Northbound interface module to be used. Valid values are: 'httpGetter' (default "httpGetter")
+  -rpe string
+        Route Policy Engine to be used. Valid values are: 'rmrpush | rmrpub' (default "rmrpush")
+  -sbi string
+        Southbound interface module to be used. Valid values are: 'nngpush | nngpub' (default "nngpush")
+  -sbi-if string
+        IPv4 address of interface where Southbound socket to be opened (default "0.0.0.0")
+  -sdl string
+        Datastore enginge to be used. Valid values are: 'file' (default "file")
+  -xm-url string
+        HTTP URL where xApp Manager exposes the entire xApp List (default "http://localhost:3000/xapps")
+```
+
+For troubleshooting purpose the default logging level can be increased to `DEBUG`.
+
+## Upcoming changes
+[] Add RESTful NBI based on swagger api definition
+
+[] Add unit tests
+
+## License
+This project is licensed under the Apache License, Version 2.0 - see the [LICENSE](LICENSE)
+
index 51808fb..7d797df 100644 (file)
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,8 @@
+### v0.1.0
+* Introduces NNGPush SBI module
+* Bugfixes in argument handling
+* different RMR policy generator for Push and Pub communication
+
 ### v0.0.3
 * RMR Policy Engine has been refactored to handle receiving groups properly
 * Debugging feature has been added
@@ -12,4 +17,4 @@
 ### v0.0.1
 * Initial version of Routing Manager for CI/CD pipeline 
 * Dummy solution: still has no real routing capability
-* Container is based on the _**tx**_ version of [admin xApp](http://gerrit.ranco-dev-tools.eastus.cloudapp.azure.com/ric-app/admin.git) which already implements [RMR library](        http://gerrit.ranco-dev-tools.eastus.cloudapp.azure.com/ric-plt/lib/rmr.git)
\ No newline at end of file
+* Container is based on the _**tx**_ version of [admin xApp] which already implements [RMR library]
\ No newline at end of file
index d248edd..f098eb5 100755 (executable)
@@ -22,4 +22,4 @@
 #      Abstract:       Runs the rtmgr executable with proper arguments
 #      Date:           19 March 2019
 #
-exec ./rtmgr -nbi-httpget=$XMURL -sbi-nngsub=$SOCKET -sdl-file=$RTFILE -rpe=$RPE -loglevel=INFO
+exec ./rtmgr -xm-url=$XMURL -sbi=$SBI -sbi-if=$IP -filename=$RTFILE -rpe=$RPE -loglevel=INFO
index e4f76d8..093da71 100644 (file)
@@ -41,41 +41,34 @@ const SERVICENAME = "rtmgr"
 const INTERVAL time.Duration = 2
 
 var (
-       args *map[string]string
+       args map[string]*string
 )
 
 func parseArgs() {
-       a := make(map[string]string)
-       xmgeturl := flag.String("nbi-httpget", "http://localhost:3000/xapps", "xApp Manager URL")
-       nngpubsock := flag.String("sbi-nngsub", "tcp://0.0.0.0:4560", "NNG Subsciption Socket URI")
-       file := flag.String("sdl-file", "/db/rt.json", "Local file store location")
-       rpename := flag.String("rpe", "rmr", "Policy Engine Module name")
-       loglevel := flag.String("loglevel", "INFO", "INFO | WARN | ERROR | DEBUG")
+       // TODO: arguments should be validated (filename; xm-url; sbi-if)
+       args = make(map[string]*string)
+       args["nbi"] = flag.String("nbi", "httpGetter", "Northbound interface module to be used. Valid values are: 'httpGetter'")
+       args["sbi"] = flag.String("sbi", "nngpush", "Southbound interface module to be used. Valid values are: 'nngpush | nngpub'")
+       args["rpe"] = flag.String("rpe", "rmrpush", "Route Policy Engine to be used. Valid values are: 'rmrpush | rmrpub'")
+       args["sdl"] = flag.String("sdl", "file", "Datastore enginge to be used. Valid values are: 'file'")
+       args["xm-url"] = flag.String("xm-url", "http://localhost:3000/xapps", "HTTP URL where xApp Manager exposes the entire xApp List")
+       args["sbi-if"] = flag.String("sbi-if", "0.0.0.0", "IPv4 address of interface where Southbound socket to be opened")
+       args["filename"] = flag.String("filename", "/db/rt.json", "Absolute path of file where the route information to be stored")
+       args["loglevel"] = flag.String("loglevel", "INFO", "INFO | WARN | ERROR | DEBUG")
        flag.Parse()
-       if (*xmgeturl) != "" {
-               a["xmurl"] = (*xmgeturl)
-               a["nbiname"] = "httpGetter"
-       }
-       if (*nngpubsock) != "" {
-               a["socketuri"] = (*nngpubsock)
-               a["sbiname"] = "nngpub"
-       }
-       if (*file) != "" {
-               a["file"] = (*file)
-               a["sdlname"] = "file"
-       }
-       a["rpename"] = (*rpename)
-       a["loglevel"] = (*loglevel)
-       args = &a
 }
 
 func initRtmgr() (*nbi.NbiEngineConfig, *sbi.SbiEngineConfig, *sdl.SdlEngineConfig, *rpe.RpeEngineConfig, error) {
        var err error
-       if nbi, err := nbi.GetNbi((*args)["nbiname"]); err == nil && nbi != nil {
-               if sbi, err := sbi.GetSbi((*args)["sbiname"]); err == nil && sbi != nil {
-                       if sdl, err := sdl.GetSdl((*args)["sdlname"]); err == nil && sdl != nil {
-                               if rpe, err := rpe.GetRpe((*args)["rpename"]); err == nil && rpe != nil {
-                                       return nbi, sbi, sdl, rpe, nil
+       var nbii *nbi.NbiEngineConfig
+       var sbii *sbi.SbiEngineConfig
+       var sdli *sdl.SdlEngineConfig
+       var rpei *rpe.RpeEngineConfig
+       if nbii, err = nbi.GetNbi(*args["nbi"]); err == nil && nbii != nil {
+               if sbii, err = sbi.GetSbi(*args["sbi"]); err == nil && sbii != nil {
+                       if sdli, err = sdl.GetSdl(*args["sdl"]); err == nil && sdli != nil {
+                               if rpei, err = rpe.GetRpe(*args["rpe"]); err == nil && rpei != nil {
+                                       return nbii, sbii, sdli, rpei, nil
                                }
                        }
                }
@@ -83,28 +76,29 @@ func initRtmgr() (*nbi.NbiEngineConfig, *sbi.SbiEngineConfig, *sdl.SdlEngineConf
        return nil, nil, nil, nil, err
 }
 
-func serve(nbi *nbi.NbiEngineConfig, sbi *sbi.SbiEngineConfig, sdl *sdl.SdlEngineConfig, rpe *rpe.RpeEngineConfig) {
-       err := sbi.OpenSocket((*args)["socketuri"])
+func serve(nbii *nbi.NbiEngineConfig, sbii *sbi.SbiEngineConfig, sdli *sdl.SdlEngineConfig, rpei *rpe.RpeEngineConfig) {
+       err := sbii.OpenSocket(*args["sbi-if"])
        if err != nil {
                rtmgr.Logger.Info("fail to open pub socket due to: " + err.Error())
                return
        }
-       defer sbi.CloseSocket()
+       defer sbii.CloseSocket()
        for {
                time.Sleep(INTERVAL * time.Second)
-               data, err := nbi.BatchFetch((*args)["xmurl"])
+               data, err := nbii.BatchFetch(*args["xm-url"])
                if err != nil {
-                       rtmgr.Logger.Error("cannot get data from " + nbi.Engine.Name + " interface dute to: " + err.Error())
+                       rtmgr.Logger.Error("cannot get data from " + nbii.Engine.Name + " interface dute to: " + err.Error())
                } else {
-                       sdl.WriteAll((*args)["file"], data)
+                       sdli.WriteAll(*args["filename"], data)
                }
-               data, err = sdl.ReadAll((*args)["file"])
+               data, err = sdli.ReadAll(*args["filename"])
                if err != nil || data == nil {
-                       rtmgr.Logger.Error("cannot get data from " + sdl.Engine.Name + " interface dute to: " + err.Error())
+                       rtmgr.Logger.Error("cannot get data from " + sdli.Engine.Name + " interface dute to: " + err.Error())
                        continue
                }
-               policies := rpe.GeneratePolicies(data)
-               err = sbi.DistributeAll(policies)
+               sbi.UpdateEndpointList(data, sbii)
+               policies := rpei.GeneratePolicies(rtmgr.Eps)
+               err = sbii.DistributeAll(policies)
                if err != nil {
                        rtmgr.Logger.Error("routing rable cannot be published due to: " + err.Error())
                }
@@ -113,13 +107,14 @@ func serve(nbi *nbi.NbiEngineConfig, sbi *sbi.SbiEngineConfig, sdl *sdl.SdlEngin
 
 func main() {
        parseArgs()
-       rtmgr.SetLogLevel((*args)["loglevel"])
-       nbi, sbi, sdl, rpe, err := initRtmgr()
+       rtmgr.SetLogLevel(*args["loglevel"])
+       nbii, sbii, sdli, rpei, err := initRtmgr()
        if err != nil {
                rtmgr.Logger.Error(err.Error())
                os.Exit(1)
        }
        rtmgr.Logger.Info("Start " + SERVICENAME + " service")
-       serve(nbi, sbi, sdl, rpe)
+       rtmgr.Eps = make(rtmgr.Endpoints)
+       serve(nbii, sbii, sdli, rpei)
        os.Exit(0)
 }
index 6916582..0a9982b 100644 (file)
@@ -37,16 +37,18 @@ spec:
     spec:
       containers:
       - name: rtmgr
-        image: cmaster:5000/rtmgr:0.0.3
+        image: cmaster:5000/rtmgr:0.1.0
         command: ["/run_rtmgr.sh"]
         env:
         - name: XMURL
           value: "http://xmgr:3000/xapps"
-        - name: SOCKET
-          value: "tcp://0.0.0.0:4560"
+        - name: IP
+          value: "0.0.0.0"
         - name: RTFILE
           value: "/db/rt.json"
         - name: RPE
-          value: "rmr"
+          value: "rmrpush"
+        - name: SBI
+          value: "nngpush"
         ports:
         - containerPort: 4560
index 1e8baeb..8c2f53c 100644 (file)
@@ -1,5 +1,5 @@
 hash: 8a251805c06cd6f4f20b276425eabe314968e2ad14fb32525fc93c53c2cfe845
-updated: 2019-03-26T04:13:15.650927387Z
+updated: 2019-04-04T20:26:54.485838082Z
 imports:
 - name: github.com/droundy/goopt
   version: 0b8effe182da161d81b011aba271507324ecb7ab
@@ -10,7 +10,7 @@ imports:
 - name: github.com/Microsoft/go-winio
   version: 1a8911d1ed007260465c3bfbbc785ac6915a0bb8
 - name: golang.org/x/sys
-  version: f49334f85ddcf0f08d7fb6dd7363e9e6d6b777eb
+  version: 81d4e9dc473e5e8c933f2aaeba2a3d81efb9aed2
   subpackages:
   - windows
 - name: nanomsg.org/go/mangos/v2
@@ -47,6 +47,6 @@ imports:
   - transport/wss
 testImports:
 - name: github.com/smartystreets/goconvey
-  version: 200a235640ff2643e3126834b67f3e93df76640a
+  version: 68dc04aab96ae4326137d6b77330c224063a927e
   subpackages:
   - convey
index cb19e71..84282bd 100644 (file)
@@ -77,5 +77,5 @@ func GetNbi(nbiName string) (*NbiEngineConfig, error) {
                        return nbi, nil
                }
        }
-       return nil, errors.New("NBI:" + nbiName + "is not supported or still not a available")
+       return nil, errors.New("NBI:" + nbiName + " is not supported or still not a available")
 }
index a4bf776..ca16772 100644 (file)
 
 package rpe
 
-import "rtmgr"
+import (
+       "rtmgr"
+       "strconv"
+)
 
 /*
 Produces the raw route message consumable by RMR
 */
-func generateRMRPolicies(xapps *[]rtmgr.XApp) *[]string {
+func generateRMRPolicies(eps rtmgr.Endpoints, key string) *[]string {
        rtmgr.Logger.Debug("Invoked rmr.generateRMRPolicies")
-       rtmgr.Logger.Debug("args: %v", (*xapps))
-       key := "00000           "
+       rtmgr.Logger.Debug("args: %v", eps)
        rawrt := []string{key + "newrt|start\n"}
-       rt := getRouteTable(xapps)
+       rt := getRouteTable(eps)
        for _, rte := range *rt {
                rawrte := key + "rte|" + rte.MessageType
                for _, tx := range rte.TxList {
-                       rawrte += "," + tx.IpSocket
+                       rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port))
                }
                rawrte += "|"
                group := ""
@@ -47,9 +49,9 @@ func generateRMRPolicies(xapps *[]rtmgr.XApp) *[]string {
                        member := ""
                        for _, rx := range rxg {
                                if member == "" {
-                                       member += rx.IpSocket
+                                       member += rx.Ip + ":" + strconv.Itoa(int(rx.Port))
                                } else {
-                                       member += "," + rx.IpSocket
+                                       member += "," + rx.Ip + ":" + strconv.Itoa(int(rx.Port))
                                }
                        }
                        if group == "" {
@@ -65,3 +67,11 @@ func generateRMRPolicies(xapps *[]rtmgr.XApp) *[]string {
        rtmgr.Logger.Debug("rmr.generateRMRPolicies returns: %v", rawrt)
        return &rawrt
 }
+
+func generateRMRPubPolicies(eps rtmgr.Endpoints) *[]string {
+       return generateRMRPolicies(eps, "00000           ")
+}
+
+func generateRMRPushPolicies(eps rtmgr.Endpoints) *[]string {
+       return generateRMRPolicies(eps, "")
+}
index f915ddd..636a439 100644 (file)
@@ -28,18 +28,26 @@ import (
        "errors"
        "fmt"
        "rtmgr"
-       "strconv"
 )
 
 var (
        SupportedRpes = []*RpeEngineConfig{
                &RpeEngineConfig{
                        RpeEngine{
-                               Name:     "rmr",
-                               Version:  "v1",
+                               Name:     "rmrpub",
+                               Version:  "pubsub",
                                Protocol: "rmruta",
                        },
-                       generatePolicies(generateRMRPolicies),
+                       generatePolicies(generateRMRPubPolicies),
+                       true,
+               },
+               &RpeEngineConfig{
+                       RpeEngine{
+                               Name:     "rmrpush",
+                               Version:  "push",
+                               Protocol: "rmruta",
+                       },
+                       generatePolicies(generateRMRPushPolicies),
                        true,
                },
        }
@@ -60,7 +68,7 @@ func GetRpe(rpeName string) (*RpeEngineConfig, error) {
                        return rpe, nil
                }
        }
-       return nil, errors.New("SBI:" + rpeName + "is not supported or still not a available")
+       return nil, errors.New("SBI:" + rpeName + " is not supported or still not a available")
 }
 
 /*
@@ -68,27 +76,21 @@ Gets the raw xApp list and generates the list of sender endpoints and receiver e
 Returns the Tx EndpointList map where the key is the messge type and also returns the nested map of Rx EndpointList's map where keys are message type and xapp type
 Endpoint object's message type already transcoded to integer id
 */
-func getEndpointLists(xapps *[]rtmgr.XApp) (*map[string]rtmgr.EndpointList, *map[string]map[string]rtmgr.EndpointList) {
+
+func getRouteRxTxLists(eps rtmgr.Endpoints) (*map[string]rtmgr.EndpointList, *map[string]map[string]rtmgr.EndpointList) {
        txlist := make(map[string]rtmgr.EndpointList)
        rxgroups := make(map[string]map[string]rtmgr.EndpointList)
-       for _, xapp := range *xapps {
-               for _, instance := range xapp.Instances {
-                       ep := rtmgr.Endpoint{
-                               instance.Name,
-                               xapp.Name,
-                               instance.Ip + ":" + strconv.Itoa(instance.Port),
-                       }
-                       for _, message := range instance.RxMessages {
-                               messageid := rtmgr.MESSAGETYPES[message]
-                               if _, ok := rxgroups[messageid]; !ok {
-                                       rxgroups[messageid] = make(map[string]rtmgr.EndpointList)
-                               }
-                               rxgroups[messageid][xapp.Name] = append(rxgroups[messageid][xapp.Name], ep)
-                       }
-                       for _, message := range instance.TxMessages {
-                               messageid := rtmgr.MESSAGETYPES[message]
-                               txlist[messageid] = append(txlist[messageid], ep)
+       for _, ep := range eps {
+               for _, message := range ep.RxMessages {
+                       messageid := rtmgr.MESSAGETYPES[message]
+                       if _, ok := rxgroups[messageid]; !ok {
+                               rxgroups[messageid] = make(map[string]rtmgr.EndpointList)
                        }
+                       rxgroups[messageid][ep.XAppType] = append(rxgroups[messageid][ep.XAppType], (*ep))
+               }
+               for _, message := range ep.TxMessages {
+                       messageid := rtmgr.MESSAGETYPES[message]
+                       txlist[messageid] = append(txlist[messageid], (*ep))
                }
        }
        return &txlist, &rxgroups
@@ -98,8 +100,8 @@ func getEndpointLists(xapps *[]rtmgr.XApp) (*map[string]rtmgr.EndpointList, *map
 Gets the raw xapp list and creates a route table for
 Returns the array of route table entries
 */
-func getRouteTable(xapps *[]rtmgr.XApp) *rtmgr.RouteTable {
-       tx, rx := getEndpointLists(xapps)
+func getRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
+       tx, rx := getRouteRxTxLists(eps)
        var rt rtmgr.RouteTable
        for _, messagetype := range rtmgr.MESSAGETYPES {
                if _, ok := (*tx)[messagetype]; !ok {
index 69f01c4..b227c0e 100644 (file)
@@ -26,7 +26,7 @@ package rpe
 
 import "rtmgr"
 
-type generatePolicies func(*[]rtmgr.XApp) (*[]string)
+type generatePolicies func(rtmgr.Endpoints) *[]string
 
 type RpeEngine struct {
        Name     string
index d5d3fc5..9dd956e 100644 (file)
@@ -53,6 +53,7 @@ var (
                "RIC_CONTROL_XAPP_CONFIG_RESPONSE": "100001",
        }
        Logger = lumber.NewConsoleLogger(lumber.INFO)
+       Eps Endpoints
 )
 
 func SetLogLevel(loglevel string) {
@@ -68,3 +69,4 @@ func SetLogLevel(loglevel string) {
                Logger.Level(lumber.DEBUG)
        }
 }
+
index 0fa9dd1..bb84804 100644 (file)
 
 package rtmgr
 
-type Endpoint struct {
-       Name     string
-       Type     string
-       IpSocket string
-}
-
 type XApps struct {
        XApplist []XApp
 }
 
 type RouteTable []RouteTableEntry
-
 type EndpointList []Endpoint
 
+type Endpoints map[string]*Endpoint
+
+//TODO: uuid is not a real UUID but a string of "ip:port"
+// this should be changed to real UUID later on which should come from xApp Manager // petszila
+type Endpoint struct {
+       Uuid       string
+       Name       string
+       XAppType   string
+       Ip         string
+       Port       uint16
+       TxMessages []string
+       RxMessages []string
+       Socket     interface{}
+       IsReady    bool
+       Keepalive  bool
+}
+
 type RouteTableEntry struct {
        MessageType string
        TxList      EndpointList
@@ -55,7 +65,7 @@ type XAppInstance struct {
        Name       string   `json:"name"`
        Status     string   `json:"status"`
        Ip         string   `json:"ip"`
-       Port       int      `json:"port"`
+       Port       uint16   `json:"port"`
        TxMessages []string `json:"txMessages"`
        RxMessages []string `json:"rxMessages"`
 }
index 02e4ebf..c48a40f 100644 (file)
@@ -33,25 +33,34 @@ import (
        "strconv"
 )
 
-var sock mangos.Socket
+var socket mangos.Socket
+
+func createNngPubEndpointSocket(ep *rtmgr.Endpoint) error {
+       return nil
+}
+
+func destroyNngPubEndpointSocket(ep *rtmgr.Endpoint) error {
+       return nil
+}
 
 /*
 Creates the NNG publication channel
 */
-func openNngPub(url string) error {
+func openNngPub(ip string) error {
        var err error
-       if sock, err = pub.NewSocket(); err != nil {
+       if socket, err = pub.NewSocket(); err != nil {
                return errors.New("can't get new pub socket due to:" + err.Error())
        }
-       rtmgr.Logger.Info("publishing on: " + url)
-       if err = sock.Listen(url); err != nil {
-               return errors.New("can't publish on socket " + url + " due to:" + err.Error())
+       uri := DEFAULT_NNG_PUBSUB_SOCKET_PREFIX + ip + ":" + strconv.Itoa(DEFAULT_NNG_PUBSUB_SOCKET_NUMBER)
+       rtmgr.Logger.Info("publishing on: " + uri)
+       if err = socket.Listen(uri); err != nil {
+               return errors.New("can't publish on socket " + uri + " due to:" + err.Error())
        }
        return nil
 }
 
 func closeNngPub() error {
-       if err := sock.Close(); err != nil {
+       if err := socket.Close(); err != nil {
                return errors.New("can't close socket due to:" + err.Error())
        }
        return nil
@@ -59,7 +68,7 @@ func closeNngPub() error {
 
 func publishAll(policies *[]string) error {
        for _, pe := range *policies {
-               if err := sock.Send([]byte(pe)); err != nil {
+               if err := socket.Send([]byte(pe)); err != nil {
                        return errors.New("Unable to send policy entry due to: " + err.Error())
                }
        }
diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go
new file mode 100644 (file)
index 0000000..01bde0c
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+/*
+  Mnemonic:    nngpipe.go
+  Abstract: mangos (NNG) Pipeline SBI implementation
+  Date:                12 March 2019
+*/
+
+package sbi
+
+import (
+       "errors"
+       "nanomsg.org/go/mangos/v2"
+       "nanomsg.org/go/mangos/v2/protocol/push"
+       _ "nanomsg.org/go/mangos/v2/transport/all"
+       "rtmgr"
+       "strconv"
+)
+
+func openNngPush(ip string) error {
+       return nil
+}
+
+func closeNngPush() error {
+       return nil
+}
+
+func createNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
+       rtmgr.Logger.Debug("Invoked sbi.createNngPushEndpointSocket")
+       rtmgr.Logger.Debug("args: %v", (*ep))
+       s, err := push.NewSocket()
+       if err != nil {
+               return errors.New("can't open push socket for endpoint: " + ep.Name +" due to:" + err.Error())
+       }
+       s.SetPipeEventHook(pipeEventHandler)
+       ep.Socket = s
+       dial(ep)
+       return nil
+}
+
+func destroyNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
+       rtmgr.Logger.Debug("Invoked sbi.destroyNngPushEndpointSocket")
+       rtmgr.Logger.Debug("args: %v", (*ep))
+       if err:= ep.Socket.(mangos.Socket).Close(); err != nil {
+                       return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to:" + err.Error())
+               }
+       return nil
+}
+
+func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
+       for _, ep := range rtmgr.Eps {
+               uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+               if uri == pipe.Address() {
+                       switch event {
+                       case 1:
+                               ep.IsReady = true
+                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully registered")
+                       default:
+                               ep.IsReady = false
+                               rtmgr.Logger.Debug("Endpoint " + uri + " has been deregistered")
+                       }
+               }       
+       }
+}
+
+/*
+NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
+*/
+func dial(ep *rtmgr.Endpoint) {
+       rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
+       uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+       options := make(map[string]interface{})
+       options[mangos.OptionDialAsynch] = true
+       if err := ep.Socket.(mangos.Socket).DialOptions(uri, options); err != nil {
+               rtmgr.Logger.Error("can't dial on push socket to " + uri + " due to:" + err.Error())
+       }
+}
+
+func pushAll(policies *[]string) error {
+       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
+       rtmgr.Logger.Debug("args: %v", (*policies))
+       for _, ep := range rtmgr.Eps {
+               if ep.IsReady {
+                       go send(ep, policies)
+               } else {
+                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + "is not ready")
+               }
+       }
+       return nil
+}
+
+func send(ep *rtmgr.Endpoint, policies *[]string) {
+       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
+       rtmgr.Logger.Debug("Push policy to endpoint: "+ ep.Uuid)
+       for _, pe := range *policies {
+               if err := ep.Socket.(mangos.Socket).Send([]byte(pe)); err != nil {
+                       rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
+               }
+       }
+       rtmgr.Logger.Info("NNG PUSH to ednpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+}
index a2da825..83b3790 100644 (file)
@@ -1,4 +1,5 @@
 /*
+w
 ==================================================================================
   Copyright (c) 2019 AT&T Intellectual Property.
   Copyright (c) 2019 Nokia
@@ -28,8 +29,14 @@ import (
        "errors"
        "fmt"
        "rtmgr"
+        "strconv"
 )
 
+const DEFAULT_NNG_PUBSUB_SOCKET_PREFIX = "tcp://"
+const DEFAULT_NNG_PUBSUB_SOCKET_NUMBER = 4560
+const DEFAULT_NNG_PIPELINE_SOCKET_PREFIX = "tcp://"
+const DEFAULT_NNG_PIPELINE_SOCKET_NUMBER = 4561
+
 var (
        SupportedSbis = []*SbiEngineConfig{
                &SbiEngineConfig{
@@ -40,6 +47,8 @@ var (
                        },
                        openSocket(openNngPub),
                        closeSocket(closeNngPub),
+                       createEndpointSocket(createNngPubEndpointSocket),
+                       destroyEndpointSocket(createNngPubEndpointSocket),
                        distributeAll(publishAll),
                        true,
                },
@@ -49,10 +58,12 @@ var (
                                Version:  "v1",
                                Protocol: "nngpipeline",
                        },
-                       openSocket(nil),
-                       closeSocket(nil),
-                       distributeAll(nil),
-                       false,
+                       openSocket(openNngPush),
+                       closeSocket(closeNngPush),
+                       createEndpointSocket(createNngPushEndpointSocket),
+                       destroyEndpointSocket(destroyNngPushEndpointSocket),
+                       distributeAll(pushAll),
+                       true,
                },
        }
 )
@@ -68,9 +79,50 @@ func ListSbis() {
 
 func GetSbi(sbiName string) (*SbiEngineConfig, error) {
        for _, sbi := range SupportedSbis {
-               if sbi.Engine.Name == sbiName && sbi.IsAvailable {
+               if (*sbi).Engine.Name == sbiName && (*sbi).IsAvailable {
                        return sbi, nil
                }
        }
-       return nil, errors.New("SBI:" + sbiName + "is not supported or still not a available")
+       return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
+}
+
+func pruneEndpointList(sbii *SbiEngineConfig) {
+        for _, ep := range rtmgr.Eps {
+                if !ep.Keepalive {
+                       sbii.DestroyEndpointSocket(ep)
+                        delete(rtmgr.Eps, ep.Uuid)
+                } else {
+                        rtmgr.Eps[ep.Uuid].Keepalive = false
+                }
+        }
+}
+
+func UpdateEndpointList(xapps *[]rtmgr.XApp, sbii *SbiEngineConfig) {
+        for _, xapp := range *xapps {
+                for _, instance := range xapp.Instances {
+                        uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
+                        if _, ok := rtmgr.Eps[uuid]; ok {
+                                rtmgr.Eps[uuid].Keepalive = true
+                        } else {
+                                ep := &rtmgr.Endpoint{
+                                        uuid,
+                                        instance.Name,
+                                        xapp.Name,
+                                        instance.Ip,
+                                        instance.Port,
+                                        instance.TxMessages,
+                                        instance.RxMessages,
+                                        nil,
+                                        false,
+                                        true,
+                                }
+                                if err := sbii.CreateEndpointSocket(ep); err != nil {
+                                        rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+                                        continue
+                                }
+                                rtmgr.Eps[uuid] = ep
+                        }
+                }
+        }
+        pruneEndpointList(sbii)
 }
index 1254e93..f971ff5 100644 (file)
 
 package sbi
 
+import "rtmgr"
+
 type distributeAll func(*[]string) error
 type openSocket func(string) error
 type closeSocket func() error
+type createEndpointSocket func(*rtmgr.Endpoint) error
+type destroyEndpointSocket func(*rtmgr.Endpoint) error
+
 
 type SbiEngine struct {
        Name     string
@@ -38,6 +43,8 @@ type SbiEngineConfig struct {
        Engine        SbiEngine
        OpenSocket    openSocket
        CloseSocket   closeSocket
+       CreateEndpointSocket createEndpointSocket
+       DestroyEndpointSocket destroyEndpointSocket
        DistributeAll distributeAll
        IsAvailable   bool
 }
index 097b983..198b05a 100644 (file)
@@ -70,5 +70,5 @@ func GetSdl(sdlName string) (*SdlEngineConfig, error) {
                        return sdl, nil
                }
        }
-       return nil, errors.New("SDL:" + sdlName + "is not supported or still not a available")
+       return nil, errors.New("SDL:" + sdlName + " is not supported or still not a available")
 }