+vendor
+bin
+pkg/sdl/ut.rt
port=29418
project=ric-plt/rtmgr
defaultbranch=master
+defaultremote=origin
# a Docker tag from the string in file container-tag.yaml
FROM golang:1.12 as rtmgrbuild
-ENV GOPATH /opt
+ENV GOPATH /go
RUN apt-get update \
- && apt-get install golang-glide
-COPY . /opt
-RUN mkdir -p $GOPATH/bin \
- && ln -s -f $GOPATH/pkg $GOPATH/src \
- && cd $GOPATH/src \
- && glide install --strip-vendor \
- && cd $GOPATH/cmd \
- && go build rtmgr.go \
- && mv $GOPATH/cmd/rtmgr $GOPATH/bin
+ && apt-get install -y golang-glide git wget
+RUN cd /go/bin \
+ && wget --quiet https://github.com/go-swagger/go-swagger/releases/download/v0.19.0/swagger_linux_amd64 \
+ && mv swagger_linux_amd64 swagger \
+ && chmod +x swagger
+
+COPY . /go/src/routing-manager
+
+WORKDIR /go/src/routing-manager
+
+RUN git clone "https://gerrit.o-ran-sc.org/r/ric-plt/appmgr" \
+ && cp appmgr/api/appmgr_rest_api.json api/ \
+ && rm -rf appmgr
+
+RUN swagger generate server -f api/routing_manager.yaml -t pkg/ --exclude-main -r LICENSE
+RUN swagger generate client -f api/appmgr_rest_api.json -t pkg/ -m appmgr_model -c appmgr_client -r LICENSE
+
+RUN glide install --strip-vendor
+
+RUN go build cmd/rtmgr.go \
+ && cp rtmgr /go/bin/rtmgr \
+ && cp run_rtmgr.sh /run_rtmgr.sh
+
+# UT intermediate container
+FROM rtmgrbuild as rtmgrut
+RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -cover -race
+
+# Final, executable container
FROM ubuntu:16.04
-COPY --from=rtmgrbuild /opt/bin/rtmgr /
-RUN mkdir /db && touch /db/rt.json
-CMD /rtmgr
+COPY --from=rtmgrbuild /go/bin/rtmgr /
+COPY --from=rtmgrbuild /run_rtmgr.sh /
+RUN mkdir /db && touch /db/rt.json && chmod 777 /db/rt.json
+CMD /run_rtmgr.sh
+
## Introduction
__Routing Manager__ is a basic platform service of RIC. It is responsible for distributing routing policies among the other platform components and xApps.
-The implemented logic periodically queries the xApp Manager component for xApps' list. Stores the data then processes it to create routing policies and distributes them to all xApps.
+The routing manager has two ways to get the xapp details from xapp manager - httpGetter or httpRESTful.
+In case of httpGetter, the implemented logic periodically queries the xApp Manager component for xApps' list.
+Where in httpRESTful, starts a http server and creates a webhook subscription in xapp manager to update about changes in xapps and waits changed data to arrive on the REST http server.
+Either ways, the xapp data received is stored and then processed to create routing policies and distributes them to all xApps.
+
The architecture consists of the following five well defined functions:
* NorthBound Interface (__NBI__): Maintains the communication channels towards RIC manager components
* Routing Policy Engine (__RPE__): Provides the logic to calculate routing policies
* Shared Data Layer (__SDL__): Provides access to different kind persistent data stores
* SouthBound Interface (__SBI__): Maintains the communication channels towards RIC tenants and control components
-* Control Logic (__RTMGR__): Controls the operatin of above functions
+* Control Logic (__RTMGR__): Controls the operation of above functions
Current implementation provides support for the followings:
* NBI:
* __httpGet__: simple HTTP GET interface. Expects an URL where it gets the xApps' list in JSON format
- * (WIP) __httRESTful__: provides REST API endpoints towards RIC manager components
+ * __httRESTful__: provides REST API endpoints towards RIC manager components. Expects REST port and url where the HTTP service will be started to listen on.
* RPE:
* __rmr__: creates routing policies formatted for RIC RMR
* SDL:
* (backlog) __sdl__: Shared Data Library to Redis database
* SBI:
* __nngpub__: distributes RPE created policies via NNG Pub channel
- * (WIP) __nngpipe__: distributes RPE created policies via NNG Pipeline channel
+ * __nngpipe__: distributes RPE created policies via NNG Pipeline channel
## Release notes
Check the separated `RELNOTES` file.
## Prerequisites
-* Healthy kubernetes cluster
-* Access to the common docker registry
+* Healthy kubernetes cluster (for Kubernetes testing)
+* Access to the common docker registry (alternatively, you can set up your own private registry for testing: https://docs.docker.com/registry/deploying/)
+* In case of non-Docker build: golang 11.1 at least, go-swagger (https://github.com/go-swagger/go-swagger, v0.19.0), glide (https://github.com/Masterminds/glide), XApp Manager spec file (available in ORAN: https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/appmgr under api folder)
## Project folder structure
-* /api: contains Swagger source files
-* /build: contains build tools (scripts, Dockerfiles, etc.)
+* /api: contains Swagger spec files
* /manifest: contains deployment files (Kubernetes manifests, Helm chart)
* /cmd: contains go project's main file
* /pkg: contains go project's internal packages
* /test: contains CI/CD testing files (scripts, mocks, manifests)
+* Dockerfile: contains main docker file
+* container-tag.yaml: contains CI specific container tag information
+* run_rtmgr.sh: shell script to run rtmgr (requires environment variables to be set)
## Installation guide
### Compiling code
-Enter the project root and execute `./build.sh` script.
-The build script has two main phases. First is the code compilation, where it creates a temporary container for downloading all dependencies then compiles the code. In the second phase it builds the production ready container and taggs it to `rtmgr:builder`
-
-**NOTE:** The script puts a copy of the binary into the `./bin` folder for further use cases
+#### Docker compile
+The Dockerfile located in the project root folder does the following three things:
+- As a first step, it creates a build container, fetches XApp Manager's spec file, generates rest api code from swagger spec and builds rtmgr.
+- As a second step, it executes UTs on rtmgr source code.
+- As a third step, it creates the final container from rtmgr binary (Ubuntu based).
+For a docker build execute `docker build --tag=rtmgr-build:test .` in the project root directory (feel free to replace the name:tag with your own)
+
+#### Compiling without docker
+Compiling without Docker involves some manual steps before compiling directly with "go build".
+The XApp manager's spec file must be fetched, then api generated with swagger. (these steps are included in the Dockerfile).
+After the code is generated, glide can install the dependencies of rtmgr.
+Make sure you set your GOPATH variable correctly (example: $HOME/go/src/routing-manager)
+Code generation and building example (from project root folder):
+```bash
+git clone "https://gerrit.o-ran-sc.org/r/ric-plt/appmgr" && cp appmgr/api/appmgr_rest_api.yaml api/
+swagger generate server -f api/routing_manager.yaml -t pkg/ --exclude-main -r LICENSE
+swagger generate client -f api/appmgr_rest_api.yaml -t pkg/ -m appmgr_model -c appmgr_client -r LICENSE
+glide install --strip-vendor
+go build cmd/rtmgr.go
+```
### Installing Routing Manager
#### Preparing environment
Edit the container image section of `rtmgr-dep.yaml` file according to the `rtmgr` image tag.
#### Deploying Routing Manager
-Issue the `kubectl create -f {manifest.yaml}` command in the following order
+Issue the `kubectl create -f {manifest.yaml}` command in the following order:
1. `manifests/namespace.yaml`: creates the `example` namespace for routing-manager resources
- 2. `manifests/rtmgr/rtmgr-dep.yaml`: instantiates the `rtmgr` deployment in the `example` namespace
- 3. `manifests/rtmgr/rtmgr-svc.yaml`: creates the `rtmgr` service in `example` namespace
+ 2. `manifests/rtmgr/rtmgr-cfg.yaml`: creates default routes config file for routing-manager
+ 3. `manifests/rtmgr/rtmgr-dep.yaml`: instantiates the `rtmgr` deployment in the `example` namespace
+ 4. `manifests/rtmgr/rtmgr-svc.yaml`: creates the `rtmgr` service in `example` namespace
+
+**NOTE:** The above manifest files will deploy routing manager with NBI as httpRESTful which would not succeed unless there is an xapp manager running at the defined xm-url. The solution is either to deploy a real XAPP manager before deploying routing-manager or start the mock xmgr as mentioned in [Testing](#testing-and-troubleshoting).
### Testing and Troubleshoting
+### Testing with Kubernetes
Routing Manager's behaviour can be tested using the mocked xApp Manager, traffic generator xApp and receiver xApp.
- 1. Checkout and compile both xApp receiver and xApp Tx generator of RIC admission control project
+ 1. Checkout and compile both xApp receiver and xApp Tx generator of RIC admission control project: `https://gerrit.o-ran-sc.org/r/admin/repos/ric-app/admin`
2. Copy the `adm-ctrl-xapp` binary to `./test/docker/xapp.build` folder furthermore copy all RMR related dinamycally linked library under `./test/docker/xapp.build/usr` folder. Issue `docker build ./test/docker/xapp.build` command. Tag the recently created docker image and push it to the common registry.
3. Copy the `test-tx` binary to `./test/docker/xapp-tx.build` folder furthermore copy all RMR related dinamycally linked library under `./test/docker/xapp.build/usr` folder. Issue `docker build ./test/docker/xapp-tx.build` command. Tag the recently created docker image and push it to the common registry.
4. Enter the `./test/docker/xmgr.build` folder and issue `docker build .`. Tag the recently created docker image and push it to the common registry.
5. Modify the docker image version in each kuberbetes manifest files under `./test/kubernetes/` folder accordingly then issue the `kubectl create -f {manifest.yaml}` on each file.
6. [Compile](#compiling-code) and [Install routing manager](#installing-routing-manager)
+ 7. Once the routing manager is started, it retrievs the initial xapp list from `xmgr` via HTTPGet additonaly it starts to listen on http://rtmgr:8888/v1/handles/xapp-handle endpoint and ready to receive xapp list updates.
+ 8. Edit the provided `test/data/xapp.json` file accordingly and issue the following curl command to update `rtmgr's` xapp list.
+ ``` curl --header "Content-Type: application/json" --request POST --data '@./test/data/xapps.json' http://10.244.2.104:8888/v1/handles/xapp-handle ```
+
+### Executing unit tests
+For running unit tests, execute the following command:
+ `go test ./pkg/nbi` (or any package - feel free to add your own parameters)
+If you wish to execute the full UT set with coverage:
+```bash
+ mkdir -p unit-test
+ go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -cover -race -coverprofile=$PWD/unit-test/c.out
+ go tool cover -html=$PWD/unit-test/c.out -o $PWD/unit-test/coverage.htm
+```
#### Command line arguments
Routing manager binary can be called with `-h` flag when it displays the available command line arguments and it's default value.
```bash
Usage of ./rtmgr:
+ -configfile string
+ Routing manager's configuration file path (default "/etc/rtmgrcfg.json")
-filename string
Absolute path of file where the route information to be stored (default "/db/rt.json")
-loglevel string
INFO | WARN | ERROR | DEBUG (default "INFO")
-nbi string
- Northbound interface module to be used. Valid values are: 'httpGetter' (default "httpGetter")
+ Northbound interface module to be used. Valid values are: 'httpGetter | httpRESTful' (default "httpGetter")
+ -nbi-if string
+ Base HTTP URL where routing manager will be listening on (default "http://localhost:8888")
-rpe string
Route Policy Engine to be used. Valid values are: 'rmrpush | rmrpub' (default "rmrpush")
-sbi string
Datastore enginge to be used. Valid values are: 'file' (default "file")
-xm-url string
HTTP URL where xApp Manager exposes the entire xApp List (default "http://localhost:3000/xapps")
+
```
-For troubleshooting purpose the default logging level can be increased to `DEBUG`.
+For troubleshooting purpose the default logging level can be increased to `DEBUG`. (by hand launch it's set to INFO, kubernetes manifest has DEBUG set by default).
## Upcoming changes
-[] Add RESTful NBI based on swagger api definition
-
[] Add unit tests
+
## License
This project is licensed under the Apache License, Version 2.0 - see the [LICENSE](LICENSE)
+### v0.3.0
+* Introduced platform-specific routes: basic components (e2term, ueman, subman, e2man) are stored in a json file (example in manifests/rtmgr/rtmgr-cfg.yaml)
+* Introduced subscription add functionality: /ric/v1/xapp-subscriptions endpoint is active, on a POST method it generates endpoint specific mse routing table entries
+
+### v0.2.0
+* Introduced http rest functionality
+* A way to get intial xapp data from xapp manager while booting the routing manager
+* A way to create a sinple webhook registration in the xapp-manager to listen on updates abpout changes in xapps
+
+### v0.1.1
+* Changes in swagger API definition
+* UTs for SBI, SDL and RPE modules
+* New message types according to RMR
+
### v0.1.0
* Introduces NNGPush SBI module
* Bugfixes in argument handling
### v0.0.1
* Initial version of Routing Manager for CI/CD pipeline
* Dummy solution: still has no real routing capability
-* Container is based on the _**tx**_ version of [admin xApp] which already implements [RMR library]
\ No newline at end of file
+* Container is based on the _**tx**_ version of [admin xApp] which already implements [RMR library]
info:
title: Routing Manager
description: "This is the Swagger/OpenAPI 2.0 definition of Routing Manager's Northbound API."
- version: "0.2.2"
+ version: "0.3.0"
license:
name: "Apache 2.0"
url: "http://www.apache.org/licenses/LICENSE-2.0.html"
# externalDocs:
# description: "Find out more"
# url: "http://127.0.0.1"
-- name: "route"
- description: "Available routes"
- name: "health"
description: "Health of the system"
schemes:
-- "https"
+#- "https"
- "http"
paths:
/health:
description: "Invalid data"
201:
description: "Callback received"
- /routes:
+ /handles/xapp-subscription-handle:
post:
tags:
- - "route"
- summary: "Add new route"
- description: "By performing a POST method on the routes resource, the API caller is able to create a new route."
- operationId: "add_route"
+ - "handle"
+ summary: "API for updating about new xapp subscription"
+ description: "By performing a POST method on the xapp-subscription-handle resource, the API caller is able to update the Routing manager about the creation of new subscription by an Xapp instance."
+ operationId: "provide_xapp_subscription_handle"
consumes:
- "application/json"
# - "application/yaml"
# - "application/yaml"
parameters:
- in: "body"
- name: "route"
- description: "Route object that needs to be created"
+ name: "xapp-subscription-data"
+ description: "xApp related subscription data"
required: true
schema:
- $ref: "#/definitions/route"
+ $ref: "#/definitions/xapp-subscription-data"
responses:
400:
- description: "Invalid route"
+ description: "Invalid data"
201:
- description: "Route created"
- put:
- tags:
- - "route"
- summary: "Update an existing route"
- description: "By performing a PUT method on the routes resource, the API caller is able to update an already existing route."
- operationId: "update_route"
- consumes:
- - "application/json"
-# - "application/yaml"
- produces:
- - "application/json"
-# - "application/yaml"
- parameters:
- - in: "body"
- name: "route"
- description: "Route object that needs to be updated or created"
- required: true
- schema:
- $ref: "#/definitions/route"
- responses:
- 400:
- description: "Invalid route ID supplied"
- 404:
- description: "Route not found"
- 204:
- description: "Route updated"
- get:
- tags:
- - "route"
- summary: "Retrieve the list of routes"
- description: "By performing a GET method on the routes resource, the API caller is able to retrieve all routes"
- operationId: "get_routes"
- consumes:
- - "application/json"
-# - "application/yaml"
- produces:
- - "application/json"
-# - "application/yaml"
- responses:
- 200:
- description: "All the routes"
- schema:
- $ref: "#/definitions/routes"
-
- /routes/{route-id}:
- get:
- tags:
- - "route"
- summary: "Find route by ID"
- description: "Returns a single route"
- operationId: "get_route_by_id"
- produces:
- - "application/json"
- parameters:
- - name: "route-id"
- in: "path"
- description: "ID of route to return"
- required: true
- type: "integer"
- format: "int64"
- responses:
- 200:
- description: "successful operation"
- schema:
- $ref: "#/definitions/route"
- 400:
- description: "Invalid route ID supplied"
- 404:
- description: "Route not found"
- put:
- tags:
- - "route"
- summary: "Updates a route by explicitly referencing it by route-id"
- description: "By performing a PUT method on a specific route referenced by the route-id the API caller is able to update that specific route"
- operationId: "update-route-by-id"
- consumes:
- - "application/json"
-# - "application/yaml"
- produces:
- - "application/json"
-# - "application/yaml"
- parameters:
- - name: "route-id"
- in: "path"
- description: "ID of route that needs to be updated"
- required: true
- type: "integer"
- format: "int64"
- - name: "route"
- in: "body"
- description: "The updated route instance"
- required: false
- schema:
- $ref: "#/definitions/route"
- responses:
- 400:
- description: "Invalid route ID supplied"
- 204:
- description: "Route updated"
- delete:
- tags:
- - "route"
- summary: "Deletes a route"
- description: "By performing a DELETE method on a specific route referenced by the route-id, the API caller is able to delete that specific route"
- operationId: "delete_route_by_id"
- produces:
- - "application/json"
-# - "application/yaml"
- parameters:
- - name: "route-id"
- in: "path"
- description: "ID of the route that needs to be deleted"
- required: true
- type: "integer"
- format: "int64"
- responses:
- 400:
- description: "Invalid route ID supplied"
- 404:
- description: "Route not found"
- 204:
- description: "Route deleted"
+ description: "Xapp Subscription data received"
definitions:
health-status:
type: "object"
enum:
- healthy
- unhealthy
- routes:
- type: "array"
- items:
- $ref: "#/definitions/route"
- route:
+ xapp-callback-data:
type: "object"
- required:
- - "id" # not so sure about that
properties:
id:
type: "integer"
format: "int64"
- senders:
- type: "array"
- items:
- "$ref": "#/definitions/xapp-instance"
- message-type:
+ event:
+ type: "string"
+ data-version:
type: "integer"
format: "int64"
- receiver-groups:
- type: "array"
- items:
- "$ref": "#/definitions/xapp-group"
- xapp-instance:
+ data:
+ type: "string" #This should be a JSON object, array of xapps
+ xapp-subscription-data:
type: "object"
required:
- - "address"
- - "port"
+ - "address"
+ - "port"
+ - "subscription_id"
properties:
address:
- type: "string" # I know...
- port:
+ type: "string" #This is the xapp instance hostname or ip address
+ port: #xapp instance port address
type: "integer"
- format: "int32"
+ format: "uint16"
minimum: 0
maximum: 65535
- xapp-group:
- type: "array"
- items:
- "$ref": "#/definitions/xapp-instance"
- xapp-callback-data:
- type: "object"
- properties:
- id:
+ subscription_id: #subscription sequence number
type: "integer"
- format: "int64"
- event:
- type: "string"
- data-version:
- type: "integer"
- format: "int64"
- data:
- type: "string" #This should be a JSON object, array of xapps
+ format: "int16"
+
externalDocs:
description: "Routing Manager"
url: "http://placeholder"
+
+++ /dev/null
-#!/bin/sh -e
-#
-#==================================================================================
-# Copyright (c) 2019 AT&T Intellectual Property.
-# Copyright (c) 2019 Nokia
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#==================================================================================
-#
-#
-# Mnemonic: build.sh
-# Abstract: Compiles the rtmgr source and builds the docker container
-# Date: 12 March 2019
-#
-
-echo 'Creating compiler container'
-docker build --no-cache --tag=rtmgr_compiler:0.1 build/binary/
-
-echo 'Running rtmgr compiler'
-docker run --rm --name=rtmgr_compiler -v ${PWD}:/opt/ rtmgr_compiler:0.1
-
-echo 'Cleaning up compiler container'
-docker rmi -f rtmgr_compiler:0.1
-
-echo 'rtmgr binary successfully built!'
-
-echo 'Creating rtmgr container'
-cp ${PWD}/bin/* ${PWD}/build/container/
-docker build --no-cache --tag=rtmgr:builder build/container/
import (
"flag"
- "nbi"
"os"
- "rpe"
- "rtmgr"
- "sbi"
- "sdl"
+ "os/signal"
+ "routing-manager/pkg/nbi"
+ "routing-manager/pkg/rpe"
+ "routing-manager/pkg/rtmgr"
+ "routing-manager/pkg/sbi"
+ "routing-manager/pkg/sdl"
+ "syscall"
"time"
)
)
func parseArgs() {
- // TODO: arguments should be validated (filename; xm-url; sbi-if)
+ // TODO: arguments should be validated (filename; xm-url; sbi-if; rest-url; rest-port)
args = make(map[string]*string)
- args["nbi"] = flag.String("nbi", "httpGetter", "Northbound interface module to be used. Valid values are: 'httpGetter'")
+ args["configfile"] = flag.String("configfile", "/etc/rtmgrcfg.json", "Routing manager's configuration file path")
+ args["nbi"] = flag.String("nbi", "httpGetter", "Northbound interface module to be used. Valid values are: 'httpGetter | httpRESTful'")
args["sbi"] = flag.String("sbi", "nngpush", "Southbound interface module to be used. Valid values are: 'nngpush | nngpub'")
args["rpe"] = flag.String("rpe", "rmrpush", "Route Policy Engine to be used. Valid values are: 'rmrpush | rmrpub'")
args["sdl"] = flag.String("sdl", "file", "Datastore enginge to be used. Valid values are: 'file'")
args["xm-url"] = flag.String("xm-url", "http://localhost:3000/xapps", "HTTP URL where xApp Manager exposes the entire xApp List")
+ args["nbi-if"] = flag.String("nbi-if", "http://localhost:8888", "Base HTTP URL where routing manager will be listening on")
args["sbi-if"] = flag.String("sbi-if", "0.0.0.0", "IPv4 address of interface where Southbound socket to be opened")
args["filename"] = flag.String("filename", "/db/rt.json", "Absolute path of file where the route information to be stored")
args["loglevel"] = flag.String("loglevel", "INFO", "INFO | WARN | ERROR | DEBUG")
flag.Parse()
}
-func initRtmgr() (*nbi.NbiEngineConfig, *sbi.SbiEngineConfig, *sdl.SdlEngineConfig, *rpe.RpeEngineConfig, error) {
+func initRtmgr() (nbi.NbiEngine, sbi.SbiEngine, sdl.SdlEngine, rpe.RpeEngine, error) {
var err error
- var nbii *nbi.NbiEngineConfig
- var sbii *sbi.SbiEngineConfig
- var sdli *sdl.SdlEngineConfig
- var rpei *rpe.RpeEngineConfig
+ var nbii nbi.NbiEngine
+ var sbii sbi.SbiEngine
+ var sdli sdl.SdlEngine
+ var rpei rpe.RpeEngine
if nbii, err = nbi.GetNbi(*args["nbi"]); err == nil && nbii != nil {
if sbii, err = sbi.GetSbi(*args["sbi"]); err == nil && sbii != nil {
if sdli, err = sdl.GetSdl(*args["sdl"]); err == nil && sdli != nil {
return nil, nil, nil, nil, err
}
-func serve(nbii *nbi.NbiEngineConfig, sbii *sbi.SbiEngineConfig, sdli *sdl.SdlEngineConfig, rpei *rpe.RpeEngineConfig) {
- err := sbii.OpenSocket(*args["sbi-if"])
+func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.SbiEngine, sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine) {
+ for {
+ if <-triggerSBI {
+ data, err := sdlEngine.ReadAll(*args["filename"])
+ if err != nil || data == nil {
+ rtmgr.Logger.Error("cannot get data from sdl interface due to: " + err.Error())
+ continue
+ }
+ sbiEngine.UpdateEndpoints(data)
+ policies := rpeEngine.GeneratePolicies(rtmgr.Eps)
+ err = sbiEngine.DistributeAll(policies)
+ if err != nil {
+ rtmgr.Logger.Error("routing rable cannot be published due to: " + err.Error())
+ }
+ }
+ }
+}
+
+func serve(nbiEngine nbi.NbiEngine, sbiEngine sbi.SbiEngine, sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine) {
+
+ triggerSBI := make(chan bool)
+
+ nbiErr := nbiEngine.Initialize(*args["xm-url"], *args["nbi-if"], *args["filename"], *args["configfile"],
+ sdlEngine, rpeEngine, triggerSBI)
+ if nbiErr != nil {
+ rtmgr.Logger.Error("fail to initialize nbi due to: " + nbiErr.Error())
+ return
+ }
+
+ err := sbiEngine.Initialize(*args["sbi-if"])
if err != nil {
rtmgr.Logger.Info("fail to open pub socket due to: " + err.Error())
return
}
- defer sbii.CloseSocket()
+ defer nbiEngine.Terminate()
+ defer sbiEngine.Terminate()
+
+ // This SBI Go routine is trtiggered by periodic main loop and when data is recieved on REST interface.
+ go serveSBI(triggerSBI, sbiEngine, sdlEngine, rpeEngine)
+
for {
time.Sleep(INTERVAL * time.Second)
- data, err := nbii.BatchFetch(*args["xm-url"])
- if err != nil {
- rtmgr.Logger.Error("cannot get data from " + nbii.Engine.Name + " interface dute to: " + err.Error())
- } else {
- sdli.WriteAll(*args["filename"], data)
- }
- data, err = sdli.ReadAll(*args["filename"])
- if err != nil || data == nil {
- rtmgr.Logger.Error("cannot get data from " + sdli.Engine.Name + " interface dute to: " + err.Error())
- continue
- }
- sbi.UpdateEndpointList(data, sbii)
- policies := rpei.GeneratePolicies(rtmgr.Eps)
- err = sbii.DistributeAll(policies)
- if err != nil {
- rtmgr.Logger.Error("routing rable cannot be published due to: " + err.Error())
+ if *args["nbi"] == "httpGetter" {
+ data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXapps(*args["xm-url"])
+ if err != nil {
+ rtmgr.Logger.Error("cannot fetch xapp data dute to: " + err.Error())
+ } else if data != nil {
+ sdlEngine.WriteXapps(*args["filename"], data)
+ }
}
+
+ triggerSBI <- true
}
}
+func SetupCloseHandler() {
+ c := make(chan os.Signal, 2)
+ signal.Notify(c, os.Interrupt, syscall.SIGTERM)
+ go func() {
+ <-c
+ rtmgr.Logger.Info("\r- Ctrl+C pressed in Terminal")
+ os.Exit(0)
+ }()
+}
+
func main() {
parseArgs()
rtmgr.SetLogLevel(*args["loglevel"])
- nbii, sbii, sdli, rpei, err := initRtmgr()
+ nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr()
if err != nil {
rtmgr.Logger.Error(err.Error())
os.Exit(1)
}
+ SetupCloseHandler()
rtmgr.Logger.Info("Start " + SERVICENAME + " service")
rtmgr.Eps = make(rtmgr.Endpoints)
- serve(nbii, sbii, sdli, rpei)
+ serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine)
os.Exit(0)
}
# By default this file is in the docker build directory,
# but the location can configured in the JJB template.
---
-tag: 0.1.0
+tag: 0.3.0
--- /dev/null
+hash: f46d5840e1625820d08f8be64aad2627deaca8c33a33d5b90234a4212d01cf26
+updated: 2019-04-17T22:45:16.134234092+03:00
+imports:
+- name: github.com/asaskevich/govalidator
+ version: f9ffefc3facfbe0caee3fea233cbb6e8208f4541
+- name: github.com/docker/go-units
+ version: 2fb04c6466a548a03cb009c5569ee1ab1e35398e
+- name: github.com/droundy/goopt
+ version: 0b8effe182da161d81b011aba271507324ecb7ab
+- name: github.com/globalsign/mgo
+ version: eeefdecb41b842af6dc652aaea4026e8403e62df
+ subpackages:
+ - bson
+ - internal/json
+- name: github.com/go-openapi/analysis
+ version: e2f3fdbb7ed0e56e070ccbfb6fc75b288a33c014
+ subpackages:
+ - internal
+- name: github.com/go-openapi/errors
+ version: 7a7ff1b7b8020f22574411a32f28b4d168d69237
+- name: github.com/go-openapi/jsonpointer
+ version: ef5f0afec364d3b9396b7b77b43dbe26bf1f8004
+- name: github.com/go-openapi/jsonreference
+ version: 8483a886a90412cd6858df4ea3483dce9c8e35a3
+- name: github.com/go-openapi/loads
+ version: 74628589c3b94e3526a842d24f46589980f5ab22
+- name: github.com/go-openapi/runtime
+ version: 109737172424d8a656fd1199e28c9f5cc89b0cca
+ subpackages:
+ - flagext
+ - logger
+ - middleware
+ - middleware/denco
+ - middleware/header
+ - middleware/untyped
+ - security
+- name: github.com/go-openapi/spec
+ version: 53d776530bf78a11b03a7b52dd8a083086b045e5
+- name: github.com/go-openapi/strfmt
+ version: 29177d4b5db488583bb97ebc05d3842ebeda91a8
+- name: github.com/go-openapi/swag
+ version: b3e2804c8535ee0d1b89320afd98474d5b8e9e3b
+- name: github.com/go-openapi/validate
+ version: 5b1623be7460f5a3967a82c00d518048fb190f5e
+- name: github.com/gorilla/websocket
+ version: 66b9c49e59c6c48f0ffce28c2d8b8a5678502c6d
+- name: github.com/jcelliott/lumber
+ version: dd349441af25132d146d7095c6693a15431fc9b1
+- name: github.com/jessevdk/go-flags
+ version: c6ca198ec95c841fdb89fc0de7496fed11ab854e
+- name: github.com/mailru/easyjson
+ version: 1ea4449da9834f4d333f1cc461c374aea217d249
+ subpackages:
+ - buffer
+ - jlexer
+ - jwriter
+- name: github.com/Microsoft/go-winio
+ version: 1a8911d1ed007260465c3bfbbc785ac6915a0bb8
+- name: github.com/mitchellh/mapstructure
+ version: 3536a929edddb9a5b34bd6861dc4a9647cb459fe
+- name: github.com/PuerkitoBio/purell
+ version: 44968752391892e1b0d0b821ee79e9a85fa13049
+- name: github.com/PuerkitoBio/urlesc
+ version: de5bf2ad457846296e2031421a34e2568e304e35
+- name: golang.org/x/net
+ version: 1da14a5a36f220ea3f03470682b737b1dfd5de22
+ subpackages:
+ - idna
+ - netutil
+- name: golang.org/x/sys
+ version: 12500544f89f9420afe9529ba8940bf72d294972
+ subpackages:
+ - windows
+- name: golang.org/x/text
+ version: f4905fbd45b6790792202848439271c74074bbfd
+ subpackages:
+ - secure/bidirule
+ - transform
+ - unicode/bidi
+ - unicode/norm
+ - width
+- name: gopkg.in/yaml.v2
+ version: 51d6538a90f86fe93ac480b35f37b2be17fef232
+- name: nanomsg.org/go/mangos/v2
+ version: 63f66a65137b9a648ac9f7bf0160b4a4d17d7999
+ subpackages:
+ - errors
+ - internal/core
+ - protocol
+ - protocol/bus
+ - protocol/pair
+ - protocol/pub
+ - protocol/pull
+ - protocol/push
+ - protocol/rep
+ - protocol/req
+ - protocol/respondent
+ - protocol/star
+ - protocol/sub
+ - protocol/surveyor
+ - protocol/xbus
+ - protocol/xpair
+ - protocol/xpub
+ - protocol/xpull
+ - protocol/xpush
+ - protocol/xrep
+ - protocol/xstar
+ - transport
+ - transport/all
+ - transport/inproc
+ - transport/ipc
+ - transport/tcp
+ - transport/tlstcp
+ - transport/ws
+ - transport/wss
+testImports:
+- name: github.com/smartystreets/goconvey
+ version: 68dc04aab96ae4326137d6b77330c224063a927e
+ subpackages:
+ - convey
- transport/tlstcp
- transport/ws
- transport/wss
+- package: github.com/go-openapi/errors
+ version: v0.19.0
+- package: github.com/go-openapi/loads
+ version: v0.19.0
+- package: github.com/go-openapi/runtime
+ version: v0.19.0
+ subpackages:
+ - flagext
+ - middleware
+ - security
+- package: github.com/go-openapi/spec
+ version: v0.19.0
+- package: github.com/go-openapi/strfmt
+ version: v0.19.0
+- package: github.com/go-openapi/swag
+ version: v0.19.0
+- package: github.com/go-openapi/validate
+ version: v0.19.0
+- package: github.com/jessevdk/go-flags
+ version: v1.4.0
+- package: github.com/jcelliott/lumber
ignore:
+- models
- rtmgr
+- restapi
+- nbi
+- sbi
+- rpe
+- sdl
+- appmgr_model
+- appmgr_client
+- stub
testImport:
- package: github.com/smartystreets/goconvey
subpackages:
--- /dev/null
+#
+#==================================================================================
+# Copyright (c) 2019 AT&T Intellectual Property.
+# Copyright (c) 2019 Nokia
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+#
+# Abstract: Configuration values for the routing manager
+# Date: 29 May 2019
+#
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: rtmgrcfg
+data:
+ # FQDN and port info of the platform components for routing manager to form and distribute corresponding routes to them
+ rtmgrcfg: |
+ {
+ "PlatformComponents":
+ [
+ {
+ "name": "E2TERM",
+ "fqdn": "e2term",
+ "port": 4561
+ },
+ {
+ "name": "SUBMAN",
+ "fqdn": "subman",
+ "port": 4561
+ },
+ {
+ "name": "E2MAN",
+ "fqdn": "e2man",
+ "port": 4561
+ },
+ {
+ "name": "UEMAN",
+ "fqdn": "ueman",
+ "port": 4561
+ }
+ ]
+ }
kind: Deployment
metadata:
name: rtmgr
- namespace: example
spec:
replicas: 1
selector:
spec:
containers:
- name: rtmgr
- image: cmaster:5000/rtmgr:0.1.0
+ image: cmaster:5000/rtmgr:latest
command: ["/run_rtmgr.sh"]
env:
- name: XMURL
- value: "http://xmgr:3000/xapps"
- - name: IP
- value: "0.0.0.0"
+ value: "http://appmgr-service:8080/ric/v1/xapps"
- name: RTFILE
value: "/db/rt.json"
- name: RPE
value: "rmrpush"
- name: SBI
value: "nngpush"
+ - name: SBIURL
+ value: "0.0.0.0"
+ - name: NBI
+ value: "httpRESTful"
+ - name: NBIURL
+ value: "http://0.0.0.0:8888"
+ - name: CFGFILE
+ value: "/cfg/rtmgr-config.json"
+ - name: LOGLEVEL
+ value: "DEBUG"
ports:
+ - containerPort: 8888
- containerPort: 4560
+ volumeMounts:
+ - mountPath: /cfg
+ name: rtmgrcfg
+ readOnly: true
+ volumes:
+ - name: rtmgrcfg
+ configMap:
+ name: rtmgrcfg
+ items:
+ - key: rtmgrcfg
+ path: rtmgr-config.json
+ mode: 0644
apiVersion: v1
metadata:
name: rtmgr
- namespace: example
spec:
selector:
app: rtmgr
+++ /dev/null
-hash: 8a251805c06cd6f4f20b276425eabe314968e2ad14fb32525fc93c53c2cfe845
-updated: 2019-04-04T20:26:54.485838082Z
-imports:
-- name: github.com/droundy/goopt
- version: 0b8effe182da161d81b011aba271507324ecb7ab
-- name: github.com/gorilla/websocket
- version: 66b9c49e59c6c48f0ffce28c2d8b8a5678502c6d
-- name: github.com/jcelliott/lumber
- version: dd349441af25132d146d7095c6693a15431fc9b1
-- name: github.com/Microsoft/go-winio
- version: 1a8911d1ed007260465c3bfbbc785ac6915a0bb8
-- name: golang.org/x/sys
- version: 81d4e9dc473e5e8c933f2aaeba2a3d81efb9aed2
- subpackages:
- - windows
-- name: nanomsg.org/go/mangos/v2
- version: 63f66a65137b9a648ac9f7bf0160b4a4d17d7999
- subpackages:
- - errors
- - internal/core
- - protocol
- - protocol/bus
- - protocol/pair
- - protocol/pub
- - protocol/pull
- - protocol/push
- - protocol/rep
- - protocol/req
- - protocol/respondent
- - protocol/star
- - protocol/sub
- - protocol/surveyor
- - protocol/xbus
- - protocol/xpair
- - protocol/xpub
- - protocol/xpull
- - protocol/xpush
- - protocol/xrep
- - protocol/xstar
- - transport
- - transport/all
- - transport/inproc
- - transport/ipc
- - transport/tcp
- - transport/tlstcp
- - transport/ws
- - transport/wss
-testImports:
-- name: github.com/smartystreets/goconvey
- version: 68dc04aab96ae4326137d6b77330c224063a927e
- subpackages:
- - convey
import (
"encoding/json"
"net/http"
- "rtmgr"
+ "routing-manager/pkg/rtmgr"
+ "routing-manager/pkg/rpe"
+ "routing-manager/pkg/sdl"
"time"
)
-var myClient = &http.Client{Timeout: 1 * time.Second}
+type HttpGetter struct {
+ NbiEngine
+ FetchAllXapps FetchAllXappsHandler
+}
+
+func NewHttpGetter() *HttpGetter {
+ instance := new(HttpGetter)
+ instance.FetchAllXapps = fetchAllXapps
+ return instance
+}
-func fetchXappList(url string) (*[]rtmgr.XApp, error) {
- rtmgr.Logger.Debug("Invoked httpgetter.fetchXappList")
- r, err := myClient.Get(url)
+var myClient = &http.Client{Timeout: 5 * time.Second}
+
+func fetchAllXapps(xmurl string) (*[]rtmgr.XApp, error) {
+ rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
+ r, err := myClient.Get(xmurl)
if err != nil {
return nil, err
}
defer r.Body.Close()
- rtmgr.Logger.Debug("http client raw response: %v", r)
- var xapps []rtmgr.XApp
- json.NewDecoder(r.Body).Decode(&xapps)
- rtmgr.Logger.Info("HTTP GET: OK")
- rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
- return &xapps, err
+
+ if r.StatusCode == 200 {
+ rtmgr.Logger.Debug("http client raw response: %v", r)
+ var xapps []rtmgr.XApp
+ err = json.NewDecoder(r.Body).Decode(&xapps)
+ if err != nil {
+ rtmgr.Logger.Warn("Json decode failed: " + err.Error())
+ }
+ rtmgr.Logger.Info("HTTP GET: OK")
+ rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
+ return &xapps, err
+ }
+ rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
+ return nil, nil
+}
+
+func (g *HttpGetter) Initialize(xmurl string, nbiif string, fileName string, configfile string,
+ sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
+ return nil
+}
+
+func (g *HttpGetter) Terminate() error {
+ return nil
}
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: httpgetter.go
+ Abstract: HTTPgetter unit tests
+ Date: 14 May 2019
+*/
+
+package nbi
+
+import (
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "routing-manager/pkg/rtmgr"
+ "testing"
+)
+
+var (
+ XMURL string = "http://127.0.0.1:3000/ric/v1/xapps"
+)
+
+
+func TestFetchXappListInvalidData(t *testing.T) {
+ var httpGetter = NewHttpGetter()
+ _, err := httpGetter.FetchAllXapps(XMURL)
+ if err == nil {
+ t.Error("No XApp data received: "+err.Error())
+ }
+}
+
+
+func TestFetchXappListWithInvalidData(t *testing.T) {
+ var expected int = 0
+ rtmgr.SetLogLevel("debug")
+ b := []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventType":"all"}`)
+ l, err := net.Listen("tcp", "127.0.0.1:3000")
+ if err != nil {
+ t.Error("Failed to create listener: " + err.Error())
+ }
+ ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ //t.Log(r.Method)
+ //t.Log(r.URL)
+ if r.Method == "GET" && r.URL.String() == "/ric/v1/xapps" {
+ //t.Log("Sending reply")
+ w.Header().Add("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(b)
+ }
+ }))
+ ts.Listener.Close()
+ ts.Listener = l
+
+ ts.Start()
+ defer ts.Close()
+ var httpGetter = NewHttpGetter()
+ xapplist, err := httpGetter.FetchAllXapps(XMURL)
+ if err == nil {
+ t.Error("Error occured: " + err.Error())
+ } else {
+ //t.Log(len(*xapplist))
+ if len(*xapplist) != expected {
+ t.Error("Invalid XApp data: got " + string(len(*xapplist)) + ", expected " + string(expected))
+ }
+ }
+}
+
+func TestFetchAllXappsWithValidData(t *testing.T) {
+ var expected int = 1
+ b := []byte(`[
+ {
+ "name":"xapp-01","status":"unknown","version":"1.2.3",
+ "instances":[
+ {"name":"xapp-01-instance-01","status":"pending","ip":"172.16.1.103","port":4555,
+ "txMessages":["ControlIndication"],
+ "rxMessages":["LoadIndication","Reset"]
+ },
+ {"name":"xapp-01-instance-02","status":"pending","ip":"10.244.1.12","port":4561,
+ "txMessages":["ControlIndication","SNStatusTransfer"],
+ "rxMessages":["LoadIndication","HandoverPreparation"]
+ }
+ ]
+}
+]`)
+ l, err := net.Listen("tcp", "127.0.0.1:3000")
+ if err != nil {
+ t.Error("Failed to create listener: " + err.Error())
+ }
+ ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ //t.Log(r.Method)
+ //t.Log(r.URL)
+ if r.Method == "GET" && r.URL.String() == "/ric/v1/xapps" {
+ //t.Log("Sending reply")
+ w.Header().Add("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(b)
+ }
+ }))
+ ts.Listener.Close()
+ ts.Listener = l
+
+ ts.Start()
+ defer ts.Close()
+ var httpGetter = NewHttpGetter()
+ xapplist, err := httpGetter.FetchAllXapps(XMURL)
+ if err != nil {
+ t.Error("Error occured: " + err.Error())
+ } else {
+ if len(*xapplist) != expected {
+ t.Error("Invalid XApp data: got " + string(len(*xapplist)) + ", expected " + string(expected))
+ }
+ }
+}
+
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: httprestful.go
+ Abstract: HTTP Restful API NBI implementation
+ Based on Swagger generated code
+ Date: 25 March 2019
+*/
+
+package nbi
+
+import (
+ "fmt"
+ "os"
+ "time"
+ "net/url"
+ "strconv"
+ "errors"
+ "encoding/json"
+ "routing-manager/pkg/rtmgr"
+ "routing-manager/pkg/rpe"
+ "routing-manager/pkg/sdl"
+ "routing-manager/pkg/models"
+ "routing-manager/pkg/restapi"
+ "routing-manager/pkg/restapi/operations"
+ "github.com/go-openapi/runtime/middleware"
+ "routing-manager/pkg/restapi/operations/handle"
+ loads "github.com/go-openapi/loads"
+)
+
+//var myClient = &http.Client{Timeout: 1 * time.Second}
+
+type HttpRestful struct {
+ NbiEngine
+ LaunchRest LaunchRestHandler
+ RecvXappCallbackData RecvXappCallbackDataHandler
+ ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
+ RetrieveStartupData RetrieveStartupDataHandler
+}
+
+func NewHttpRestful() *HttpRestful {
+ instance := new(HttpRestful)
+ instance.LaunchRest = launchRest
+ instance.RecvXappCallbackData = recvXappCallbackData
+ instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
+ instance.RetrieveStartupData = retrieveStartupData
+ return instance
+}
+
+// ToDo: Use Range over channel. Read and return only the latest one.
+func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
+ var xappData *models.XappCallbackData
+ // Drain the channel as we are only looking for the latest value until
+ // xapp manager sends all xapp data with every request.
+ length := len(dataChannel)
+ //rtmgr.Logger.Info(length)
+ for i := 0; i <= length; i++ {
+ rtmgr.Logger.Info("data received")
+ // If no data received from the REST, it blocks.
+ xappData = <-dataChannel
+ }
+ if nil != xappData {
+ var xapps []rtmgr.XApp
+ err := json.Unmarshal([]byte(xappData.Data), &xapps)
+ return &xapps, err
+ } else {
+ rtmgr.Logger.Info("No data")
+ }
+
+ rtmgr.Logger.Debug("Nothing received on the Http interface")
+ return nil, nil
+
+}
+
+func validateXappCallbackData(callbackData *models.XappCallbackData) error {
+ if len(callbackData.Data) == 0 {
+ return fmt.Errorf("Invalid Data field: \"%s\"", callbackData.Data)
+ }
+ var xapps []rtmgr.XApp
+ err := json.Unmarshal([]byte(callbackData.Data), &xapps)
+ if err != nil {
+ return fmt.Errorf("Unmarshal failed: \"%s\"", err.Error())
+ }
+ return nil
+}
+
+func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
+ if data != nil {
+ rtmgr.Logger.Debug("Received callback data")
+ }
+ err := validateXappCallbackData(data)
+ if err != nil {
+ rtmgr.Logger.Debug("XApp callback data validation failed: "+err.Error())
+ return err
+ } else {
+ datach<-data
+ return nil
+ }
+}
+
+func validateXappSubscriptionData(data *models.XappSubscriptionData) error {
+ var err = fmt.Errorf("XApp instance not found: %v:%v", *data.Address, *data.Port)
+ for _, ep := range rtmgr.Eps {
+ if ep.Ip == *data.Address && ep.Port == *data.Port {
+ err = nil
+ break
+ }
+ }
+ return err
+}
+
+func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
+ data *models.XappSubscriptionData) error {
+ rtmgr.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
+ err := validateXappSubscriptionData(data)
+ if err != nil {
+ rtmgr.Logger.Error(err.Error())
+ return err
+ }
+ subchan <- data
+ //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
+ rtmgr.Logger.Debug("Endpoints: %v", rtmgr.Eps)
+ return nil
+}
+
+func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData) {
+ swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
+ if err != nil {
+ //log.Fatalln(err)
+ rtmgr.Logger.Error(err.Error())
+ os.Exit(1)
+ }
+ nbiUrl, err := url.Parse(*nbiif)
+ if err != nil {
+ rtmgr.Logger.Error(err.Error())
+ os.Exit(1)
+ }
+ api := operations.NewRoutingManagerAPI(swaggerSpec)
+ server := restapi.NewServer(api)
+ defer server.Shutdown()
+
+ server.Port, err = strconv.Atoi(nbiUrl.Port())
+ if err != nil {
+ rtmgr.Logger.Error("Invalid NBI RestAPI port")
+ os.Exit(1)
+ }
+ server.Host = nbiUrl.Hostname()
+ // set handlers
+ api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
+ func(params handle.ProvideXappHandleParams) middleware.Responder {
+ rtmgr.Logger.Info("Data received on Http interface")
+ err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
+ if err != nil {
+ rtmgr.Logger.Error("Invalid XApp callback data: "+err.Error())
+ return handle.NewProvideXappHandleBadRequest()
+ } else {
+ return handle.NewGetHandlesOK()
+ }
+ })
+ api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
+ func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
+ err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
+ if err != nil {
+ return handle.NewProvideXappSubscriptionHandleBadRequest()
+ } else {
+ return handle.NewGetHandlesOK()
+ }
+ })
+ // start to serve API
+ rtmgr.Logger.Info("Starting the HTTP Rest service")
+ if err := server.Serve(); err != nil {
+ rtmgr.Logger.Error(err.Error())
+ }
+}
+
+func httpGetXapps(xmurl string) (*[]rtmgr.XApp, error) {
+ rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
+ r, err := myClient.Get(xmurl)
+ if err != nil {
+ return nil, err
+ }
+ defer r.Body.Close()
+
+ if r.StatusCode == 200 {
+ rtmgr.Logger.Debug("http client raw response: %v", r)
+ var xapps []rtmgr.XApp
+ err = json.NewDecoder(r.Body).Decode(&xapps)
+ if err != nil {
+ rtmgr.Logger.Warn("Json decode failed: " + err.Error())
+ }
+ rtmgr.Logger.Info("HTTP GET: OK")
+ rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
+ return &xapps, err
+ }
+ rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
+ return nil, nil
+}
+
+func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, sdlEngine sdl.SdlEngine) error {
+ var readErr error
+ var maxRetries = 10
+
+ for i := 1; i <= maxRetries; i++ {
+ time.Sleep(2 * time.Second)
+
+ xappData, err := httpGetXapps(xmurl)
+
+ if xappData != nil && err == nil {
+ pcData, confErr := rtmgr.GetPlatformComponents(configfile)
+ if confErr != nil {
+ rtmgr.Logger.Error(confErr.Error())
+ return confErr
+ }
+
+ rtmgr.Logger.Info("Recieved intial xapp data and platform data, writing into SDL.")
+ // Combine the xapps data and platform data before writing to the SDL
+ ricData := &rtmgr.RicComponents{Xapps: *xappData, Pcs: *pcData}
+
+ writeErr := sdlEngine.WriteAll(fileName, ricData)
+ if writeErr != nil {
+ rtmgr.Logger.Error(writeErr.Error())
+ }
+ // post subscription req to appmgr
+ readErr = PostSubReq(xmurl, nbiif)
+ if readErr == nil {
+ return nil
+ }
+ } else if err == nil {
+ readErr = errors.New("Unexpected HTTP status code")
+ } else {
+ rtmgr.Logger.Warn("cannot get xapp data due to: " + err.Error())
+ readErr = err
+ }
+ }
+ return readErr
+}
+
+func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string,
+ sdlEngine sdl.SdlEngine, rpeEngine rpe.RpeEngine, triggerSBI chan<- bool) error {
+ err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, sdlEngine)
+ if err != nil {
+ rtmgr.Logger.Error("Exiting as nbi failed to get the intial startup data from the xapp manager: " + err.Error())
+ return err
+ }
+
+ datach := make(chan *models.XappCallbackData, 10)
+ subschan := make(chan *models.XappSubscriptionData, 10)
+ rtmgr.Logger.Info("Launching Rest Http service")
+ go func() {
+ r.LaunchRest(&nbiif, datach, subschan)
+ }()
+
+ go func() {
+ for {
+ data, err := r.RecvXappCallbackData(datach)
+ if err != nil {
+ rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
+ } else if data != nil {
+ sdlEngine.WriteXapps(fileName, data)
+ triggerSBI <- true
+ }
+ }
+ }()
+ go func() {
+ for {
+ data := <-subschan
+ rtmgr.Logger.Debug("received XApp subscription data")
+ addSubscription(&rtmgr.Subs, data)
+ triggerSBI <- true
+ }
+ }()
+
+ return nil
+}
+
+func (r *HttpRestful) Terminate() error {
+ return nil
+}
+
+func addSubscription(subs *rtmgr.SubscriptionList, xappSubData *models.XappSubscriptionData) bool {
+ var b bool = false
+ sub := rtmgr.Subscription{SubID:*xappSubData.SubscriptionID, Fqdn:*xappSubData.Address, Port:*xappSubData.Port,}
+ for _, elem := range *subs {
+ if elem == sub {
+ rtmgr.Logger.Warn("rtmgr.addSubscription: Subscription already present: %v", elem)
+ b = true
+ }
+ }
+ if b == false {
+ *subs = append(*subs, sub)
+ }
+ return b
+}
+
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: httprestful_test.go
+ Abstract: HTTPRestful unit tests
+ Date: 15 May 2019
+*/
+
+package nbi
+
+import (
+ "routing-manager/pkg/stub"
+ "routing-manager/pkg/models"
+ "routing-manager/pkg/sdl"
+ "github.com/go-openapi/swag"
+ "testing"
+ "time"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "fmt"
+ "os"
+ "io/ioutil"
+ "encoding/json"
+)
+
+var BASIC_XAPPLIST = []byte(`[
+ {
+ "name":"xapp-01","status":"unknown","version":"1.2.3",
+ "instances":[
+ {"name":"xapp-01-instance-01","status":"pending","ip":"172.16.1.103","port":4555,
+ "txMessages":["ControlIndication"],
+ "rxMessages":["LoadIndication","Reset"]
+ },
+ {"name":"xapp-01-instance-02","status":"pending","ip":"10.244.1.12","port":4561,
+ "txMessages":["ControlIndication","SNStatusTransfer"],
+ "rxMessages":["LoadIndication","HandoverPreparation"]
+ }
+ ]
+}
+]`)
+
+var SUBSCRIPTION_RESP = []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventType":"all"}`)
+
+
+var INVALID_SUB_RESP = []byte(`{"Version":0, "EventType":all}`)
+
+
+func createMockAppmgrWithData(url string, g []byte, p []byte) *httptest.Server {
+ l, err := net.Listen("tcp", url)
+ if err != nil {
+ fmt.Println("Failed to create listener: " + err.Error())
+ }
+ ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "GET" && r.URL.String() == "/ric/v1/xapps" {
+ w.Header().Add("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(g)
+ }
+ if r.Method == "POST" && r.URL.String() == "/ric/v1/subscriptions" {
+ w.Header().Add("Content-Type", "application/json")
+ w.WriteHeader(http.StatusCreated)
+ w.Write(p)
+ }
+
+ }))
+ ts.Listener.Close()
+ ts.Listener = l
+ return ts
+}
+
+func createMockPlatformComponents() {
+ var filename = string("config.json")
+ file, _ := json.MarshalIndent(stub.ValidPlatformComponents, "", "")
+ filestr := string(file)
+ filestr = "{\"PlatformComponents\":"+filestr+"}"
+ file = []byte(filestr)
+ _ = ioutil.WriteFile(filename, file, 644)
+}
+
+func TestRecvXappCallbackData(t *testing.T) {
+ data := models.XappCallbackData {
+ Data: *swag.String("[]"),
+ DataVersion: *swag.Int64(1),
+ Event: *swag.String("any"),
+ ID: *swag.Int64(123456),
+ }
+
+ ch := make(chan *models.XappCallbackData)
+ defer close(ch)
+ httpRestful := NewHttpRestful()
+ go func() {ch<- &data}()
+ time.Sleep(1 * time.Second)
+ t.Log(string(len(ch)))
+ xappList, err := httpRestful.RecvXappCallbackData(ch)
+ if err != nil {
+ t.Error("Receive failed: "+err.Error())
+ } else {
+ if xappList == nil {
+ t.Error("Expected an XApp notification list")
+ } else {
+ t.Log("whatever")
+ }
+ }
+}
+
+func TestProvideXappHandleHandlerImpl(t *testing.T) {
+ datach := make(chan *models.XappCallbackData, 10)
+ defer close(datach)
+ data := models.XappCallbackData{
+ Data: *swag.String("[]"),
+ DataVersion: *swag.Int64(1),
+ Event: *swag.String("someevent"),
+ ID: *swag.Int64(123456)}
+ var httpRestful, _ = GetNbi("httpRESTful")
+ err := httpRestful.(*HttpRestful).ProvideXappHandleHandlerImpl(datach, &data)
+ if err != nil {
+ t.Error("Error occured: "+err.Error())
+ } else {
+ recv := <-datach
+ if recv == nil {
+ t.Error("Something gone wrong: "+err.Error())
+ } else {
+ if recv != &data {
+ t.Error("Malformed data on channel")
+ }
+ }
+ }
+}
+
+func TestValidateXappCallbackData(t *testing.T) {
+ data := models.XappCallbackData{
+ Data: *swag.String("[]"),
+ DataVersion: *swag.Int64(1),
+ Event: *swag.String("someevent"),
+ ID: *swag.Int64(123456)}
+
+ err := validateXappCallbackData(&data)
+ if err != nil {
+ t.Error("Invalid XApp callback data: "+err.Error())
+ }
+}
+
+func TestValidateXappCallbackDataWithInvalidData(t *testing.T) {
+ data := models.XappCallbackData{
+ Data: *swag.String("{}"),
+ DataVersion: *swag.Int64(1),
+ Event: *swag.String("someevent"),
+ ID: *swag.Int64(123456)}
+
+ err := validateXappCallbackData(&data)
+ if err == nil {
+ t.Error("Invalid XApp callback data: "+err.Error())
+ }
+}
+
+
+func TestHttpGetXappsInvalidData(t *testing.T) {
+ _, err := httpGetXapps(XMURL)
+ if err == nil {
+ t.Error("No XApp data received: "+err.Error())
+ }
+}
+
+func TestHttpGetXappsWithValidData(t *testing.T) {
+ var expected int = 1
+ ts := createMockAppmgrWithData("127.0.0.1:3000", BASIC_XAPPLIST, nil)
+
+ ts.Start()
+ defer ts.Close()
+ xapplist, err := httpGetXapps(XMURL)
+ if err != nil {
+ t.Error("Error occured: " + err.Error())
+ } else {
+ if len(*xapplist) != expected {
+ t.Error("Invalid XApp data: got " + string(len(*xapplist)) + ", expected " + string(expected))
+ }
+ }
+}
+
+func TestRetrieveStartupDataTimeout(t *testing.T) {
+ sdlEngine, _ := sdl.GetSdl("file")
+ createMockPlatformComponents()
+ err := retrieveStartupData(XMURL, "httpgetter","rt.json", "config.json", sdlEngine)
+ if err == nil {
+ t.Error("Cannot retrieve startup data: "+err.Error())
+ }
+ os.Remove("rt.json")
+ os.Remove("config.json")
+}
+
+func TestRetrieveStartupData(t *testing.T) {
+ ts := createMockAppmgrWithData("127.0.0.1:3000", BASIC_XAPPLIST, SUBSCRIPTION_RESP)
+ ts.Start()
+ defer ts.Close()
+ sdlEngine, _ := sdl.GetSdl("file")
+ var httpRestful, _ = GetNbi("httpRESTful")
+ createMockPlatformComponents()
+ err := httpRestful.(*HttpRestful).RetrieveStartupData(XMURL, "httpgetter", "rt.json","config.json", sdlEngine)
+ //err := retrieveStartupData(XMURL, "httpgetter", "rt.json", "config.json", sdlEngine)
+ if err != nil {
+ t.Error("Cannot retrieve startup data: "+err.Error())
+ }
+ os.Remove("rt.json")
+ os.Remove("config.json")
+}
+
+func TestRetrieveStartupDataWithInvalidSubResp(t *testing.T) {
+ ts := createMockAppmgrWithData("127.0.0.1:3000", BASIC_XAPPLIST, INVALID_SUB_RESP)
+ ts.Start()
+ defer ts.Close()
+ sdlEngine, _ := sdl.GetSdl("file")
+ var httpRestful, _ = GetNbi("httpRESTful")
+ createMockPlatformComponents()
+ err := httpRestful.(*HttpRestful).RetrieveStartupData(XMURL, "httpgetter", "rt.json", "config.json", sdlEngine)
+ if err == nil {
+ t.Error("Cannot retrieve startup data: "+err.Error())
+ }
+ os.Remove("rt.json")
+ os.Remove("config.json")
+}
import (
"errors"
- "fmt"
- "rtmgr"
+ "routing-manager/pkg/rtmgr"
+ "net/url"
+ apiclient "routing-manager/pkg/appmgr_client"
+ "routing-manager/pkg/appmgr_client/operations"
+ "routing-manager/pkg/appmgr_model"
+ httptransport "github.com/go-openapi/runtime/client"
+ "github.com/go-openapi/strfmt"
+ "github.com/go-openapi/swag"
+ "time"
+
)
var (
SupportedNbis = []*NbiEngineConfig{
&NbiEngineConfig{
- NbiEngine{
- Name: "httpGetter",
- Version: "v1",
- Protocol: "http",
- },
- batchFetch(fetchXappList),
- true,
- },
- &NbiEngineConfig{
- NbiEngine{
- Name: "httpRESTful",
- Version: "v1",
- Protocol: "http",
- },
- batchFetch(nil),
- false,
+ Name: "httpGetter",
+ Version: "v1",
+ Protocol: "http",
+ Instance: NewHttpGetter(),
+ IsAvailable: true,
},
&NbiEngineConfig{
- NbiEngine{
- Name: "gRPC",
- Version: "v1",
- Protocol: "http2",
- },
- batchFetch(nil),
- false,
+ Name: "httpRESTful",
+ Version: "v1",
+ Protocol: "http",
+ Instance: NewHttpRestful(),
+ IsAvailable: true,
},
}
)
-func ListNbis() {
- fmt.Printf("NBI:\n")
- for _, nbi := range SupportedNbis {
- if nbi.IsAvailable {
- rtmgr.Logger.Info(nbi.Engine.Name + "/" + nbi.Engine.Version)
- }
- }
+type Nbi struct {
+
}
-func GetNbi(nbiName string) (*NbiEngineConfig, error) {
+func GetNbi(nbiName string) (NbiEngine, error) {
for _, nbi := range SupportedNbis {
- if nbi.Engine.Name == nbiName && nbi.IsAvailable {
- return nbi, nil
+ if nbi.Name == nbiName && nbi.IsAvailable {
+ return nbi.Instance, nil
}
}
return nil, errors.New("NBI:" + nbiName + " is not supported or still not a available")
}
+
+func CreateSubReq(restUrl string, restPort string) *appmgr_model.SubscriptionRequest {
+ // TODO: parametize function
+ subReq := appmgr_model.SubscriptionRequest{
+ TargetURL: swag.String(restUrl + ":" + restPort + "/ric/v1/handles/xapp-handle/"),
+ EventType: swag.String("all"),
+ MaxRetries: swag.Int64(5),
+ RetryTimer: swag.Int64(10),
+ }
+
+ return &subReq
+}
+
+func PostSubReq(xmUrl string, nbiif string) error {
+ // setting up POST request to Xapp Manager
+ appmgrUrl, err := url.Parse(xmUrl)
+ if err != nil {
+ rtmgr.Logger.Error("Invalid XApp manager url/hostname: " + err.Error())
+ return err
+ }
+ nbiifUrl, err := url.Parse(nbiif)
+ if err != nil {
+ rtmgr.Logger.Error("Invalid NBI address/port: " + err.Error())
+ return err
+ }
+ transport := httptransport.New(appmgrUrl.Hostname()+":"+appmgrUrl.Port(), "/ric/v1", []string{"http"})
+ client := apiclient.New(transport, strfmt.Default)
+ addSubParams := operations.NewAddSubscriptionParamsWithTimeout(10 * time.Second)
+ // create sub req with rest url and port
+ subReq := CreateSubReq(nbiifUrl.Hostname(), nbiifUrl.Port())
+ resp, postErr := client.Operations.AddSubscription(addSubParams.WithSubscriptionRequest(subReq))
+ if postErr != nil {
+ rtmgr.Logger.Error("POST unsuccessful:"+postErr.Error())
+ return postErr
+ } else {
+ // TODO: use the received ID
+ rtmgr.Logger.Info("POST received: "+string(resp.Payload.ID))
+ return nil
+ }
+}
+
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: nbi_test.go
+ Abstract: NBI unit tests
+ Date: 21 May 2019
+*/
+
+package nbi
+
+import (
+ "testing"
+ "reflect"
+ "errors"
+ "routing-manager/pkg/appmgr_model"
+ "github.com/go-openapi/swag"
+ "net"
+ "net/http"
+ "net/http/httptest"
+)
+
+func TestGetNbi(t *testing.T) {
+ var errtype = errors.New("")
+ var nbitype = new(HttpGetter)
+ var invalids = []string{"httpgetter", ""}
+
+ nbii, err := GetNbi("httpGetter")
+ if err != nil {
+ t.Errorf("GetNbi(HttpGetter) was incorrect, got: %v, want: %v.", reflect.TypeOf(err), nil)
+ }
+ if reflect.TypeOf(nbii) != reflect.TypeOf(nbitype) {
+ t.Errorf("GetNbi(HttpGetter) was incorrect, got: %v, want: %v.", reflect.TypeOf(nbii), reflect.TypeOf(nbitype))
+ }
+
+ for _, arg := range invalids {
+ _, err := GetNbi(arg)
+ if err == nil {
+ t.Errorf("GetNbi("+arg+") was incorrect, got: %v, want: %v.", reflect.TypeOf(err), reflect.TypeOf(errtype))
+ }
+ }
+}
+
+func TestCreateSubReq(t *testing.T) {
+ var subReq = appmgr_model.SubscriptionRequest{
+ TargetURL: swag.String("localhost:8000/ric/v1/handles/xapp-handle/"),
+ EventType: swag.String("all"),
+ MaxRetries: swag.Int64(5),
+ RetryTimer: swag.Int64(10),
+ }
+ subReq2 := CreateSubReq("localhost","8000")
+ if reflect.TypeOf(subReq) != reflect.TypeOf(*subReq2) {
+ t.Errorf("Invalid type, got: %v, want: %v.", reflect.TypeOf(subReq), reflect.TypeOf(*subReq2))
+ }
+ if *(subReq.TargetURL) != *(subReq2.TargetURL) {
+ t.Errorf("Invalid TargetURL generated, got %v, want %v", *subReq.TargetURL, *subReq2.TargetURL)
+ }
+ if *(subReq.EventType) != *(subReq2.EventType) {
+ t.Errorf("Invalid EventType generated, got %v, want %v", *subReq.EventType, *subReq2.EventType)
+ }
+ if *(subReq.MaxRetries) != *(subReq2.MaxRetries) {
+ t.Errorf("Invalid MaxRetries generated, got %v, want %v", *subReq.MaxRetries, *subReq2.MaxRetries)
+ }
+ if *(subReq.RetryTimer) != *(subReq2.RetryTimer) {
+ t.Errorf("Invalid RetryTimer generated, got %v, want %v", *subReq.RetryTimer, *subReq2.RetryTimer)
+ }
+}
+
+func TestPostSubReq(t *testing.T) {
+ b := []byte(`{"ID":"deadbeef1234567890", "Version":0, "EventType":"all"}`)
+ l, err := net.Listen("tcp", "127.0.0.1:3000")
+ if err != nil {
+ t.Error("Failed to create listener: " + err.Error())
+ }
+ ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ t.Log(r.Method)
+ t.Log(r.URL)
+ if r.Method == "POST" && r.URL.String() == "/ric/v1/subscriptions" {
+ t.Log("Sending reply")
+ w.Header().Add("Content-Type", "application/json")
+ w.WriteHeader(http.StatusCreated)
+ w.Write(b)
+ }
+ }))
+ ts.Listener.Close()
+ ts.Listener = l
+
+ ts.Start()
+ defer ts.Close()
+ err = PostSubReq("http://127.0.0.1:3000/ric/v1/subscription","localhost:8888")
+ if err != nil {
+ t.Error("Error occured: "+err.Error())
+ }
+}
+
+func TestPostSubReqWithInvalidUrls(t *testing.T) {
+ // invalid Xapp Manager URL
+ err := PostSubReq("http://127.0","http://localhost:8888")
+ if err == nil {
+ t.Error("Error occured: "+err.Error())
+ }
+ // invalid rest api url
+ err = PostSubReq("http://127.0.0.1:3000/","localhost:8888")
+ if err == nil {
+ t.Error("Error occured: "+err.Error())
+ }
+}
+
package nbi
-import "rtmgr"
-
-type batchFetch func(url string) (*[]rtmgr.XApp, error)
-
-type NbiEngine struct {
- Name string
- Version string
- Protocol string
-}
+import (
+ "routing-manager/pkg/rtmgr"
+ "routing-manager/pkg/models"
+ "routing-manager/pkg/rpe"
+ "routing-manager/pkg/sdl"
+)
+
+type FetchAllXappsHandler func(string) (*[]rtmgr.XApp, error)
+type RecvXappCallbackDataHandler func(<-chan *models.XappCallbackData) (*[]rtmgr.XApp, error)
+type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData)
+type ProvideXappHandleHandlerImpl func(chan<- *models.XappCallbackData, *models.XappCallbackData) (error)
+type RetrieveStartupDataHandler func(string, string, string, string, sdl.SdlEngine) error
type NbiEngineConfig struct {
- Engine NbiEngine
- BatchFetch batchFetch
+ Name string
+ Version string
+ Protocol string
+ Instance NbiEngine
IsAvailable bool
}
+
+type NbiEngine interface {
+ Initialize(string, string, string, string, sdl.SdlEngine, rpe.RpeEngine, chan<- bool) error
+ Terminate() error
+}
+
package rpe
import (
- "rtmgr"
+ "routing-manager/pkg/rtmgr"
"strconv"
)
+type Rmr struct {
+ Rpe
+}
+
+type RmrPub struct {
+ Rmr
+}
+
+type RmrPush struct {
+ Rmr
+}
+
+func NewRmrPub() *RmrPub {
+ instance := new(RmrPub)
+ return instance
+}
+
+func NewRmrPush() *RmrPush {
+ instance := new(RmrPush)
+ return instance
+}
+
/*
Produces the raw route message consumable by RMR
*/
-func generateRMRPolicies(eps rtmgr.Endpoints, key string) *[]string {
- rtmgr.Logger.Debug("Invoked rmr.generateRMRPolicies")
- rtmgr.Logger.Debug("args: %v", eps)
+func (r *Rmr) generateRMRPolicies(eps rtmgr.Endpoints, key string) *[]string {
rawrt := []string{key + "newrt|start\n"}
- rt := getRouteTable(eps)
+ rt := r.getRouteTable(eps)
for _, rte := range *rt {
- rawrte := key + "rte|" + rte.MessageType
+ rawrte := key //+ "rte|" + rte.MessageType
+ if rte.SubID == -1 {
+ rawrte += "rte|"
+ } else {
+ rawrte += "mse|"
+ }
+ rawrte += rte.MessageType
for _, tx := range rte.TxList {
rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port))
}
- rawrte += "|"
+ rawrte += "|" + strconv.Itoa(int(rte.SubID)) + "|"
group := ""
for _, rxg := range rte.RxGroups {
member := ""
rawrt = append(rawrt, rawrte+"\n")
}
rawrt = append(rawrt, key+"newrt|end\n")
- rtmgr.Logger.Debug("rmr.generateRMRPolicies returns: %v", rawrt)
+ rtmgr.Logger.Debug("rmr.GeneratePolicies returns: %v", rawrt)
return &rawrt
}
-func generateRMRPubPolicies(eps rtmgr.Endpoints) *[]string {
- return generateRMRPolicies(eps, "00000 ")
+func (r *RmrPub) GeneratePolicies(eps rtmgr.Endpoints) *[]string {
+ rtmgr.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps)
+ return r.generateRMRPolicies(eps, "00000 ")
+}
+
+func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints) *[]string {
+ rtmgr.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps)
+ return r.generateRMRPolicies(eps, "")
+}
+
+func (r *RmrPub) GetRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
+ return r.getRouteTable(eps)
}
-func generateRMRPushPolicies(eps rtmgr.Endpoints) *[]string {
- return generateRMRPolicies(eps, "")
+func (r *RmrPush) GetRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
+ return r.getRouteTable(eps)
}
+
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: nngpub_test.go
+ Abstract:
+ Date: 25 April 2019
+*/
+package rpe
+
+import (
+ "routing-manager/pkg/rtmgr"
+ "routing-manager/pkg/stub"
+ "testing"
+)
+
+func resetTestDataset(testdata []rtmgr.Endpoint) {
+ rtmgr.Eps = make(map[string]*rtmgr.Endpoint)
+ for _, endpoint := range stub.ValidEndpoints {
+ ep := endpoint
+ rtmgr.Eps[ep.Uuid] = &ep
+ }
+}
+
+/*
+RmrPub.GeneratePolicies() method is tested for happy path case
+*/
+func TestRmrPubGeneratePolicies(t *testing.T) {
+ var rmrpub = RmrPub{}
+ resetTestDataset(stub.ValidEndpoints)
+
+ rawrt := rmrpub.GeneratePolicies(rtmgr.Eps)
+ t.Log(rawrt)
+}
+
+/*
+RmrPush.GeneratePolicies() method is tested for happy path case
+*/
+func TestRmrPushGeneratePolicies(t *testing.T) {
+ var rmrpush = RmrPush{}
+ resetTestDataset(stub.ValidEndpoints)
+
+ rawrt := rmrpush.GeneratePolicies(rtmgr.Eps)
+ t.Log(rawrt)
+}
import (
"errors"
- "fmt"
- "rtmgr"
+ "routing-manager/pkg/rtmgr"
+ "strconv"
)
var (
SupportedRpes = []*RpeEngineConfig{
&RpeEngineConfig{
- RpeEngine{
- Name: "rmrpub",
- Version: "pubsub",
- Protocol: "rmruta",
- },
- generatePolicies(generateRMRPubPolicies),
- true,
+ Name: "rmrpub",
+ Version: "pubsub",
+ Protocol: "rmruta",
+ Instance: NewRmrPub(),
+ IsAvailable: true,
},
&RpeEngineConfig{
- RpeEngine{
- Name: "rmrpush",
- Version: "push",
- Protocol: "rmruta",
- },
- generatePolicies(generateRMRPushPolicies),
- true,
+ Name: "rmrpush",
+ Version: "pubsush",
+ Protocol: "rmruta",
+ Instance: NewRmrPush(),
+ IsAvailable: true,
},
}
)
-func ListRpes() {
- fmt.Printf("RPE:\n")
+func GetRpe(rpeName string) (RpeEngine, error) {
for _, rpe := range SupportedRpes {
- if rpe.IsAvailable {
- rtmgr.Logger.Info(rpe.Engine.Name + "/" + rpe.Engine.Version)
+ if rpe.Name == rpeName && rpe.IsAvailable {
+ return rpe.Instance, nil
}
}
+ return nil, errors.New("SBI:" + rpeName + " is not supported or still not a available")
}
-func GetRpe(rpeName string) (*RpeEngineConfig, error) {
- for _, rpe := range SupportedRpes {
- if rpe.Engine.Name == rpeName && rpe.IsAvailable {
- return rpe, nil
- }
- }
- return nil, errors.New("SBI:" + rpeName + " is not supported or still not a available")
+type Rpe struct {
}
/*
Endpoint object's message type already transcoded to integer id
*/
-func getRouteRxTxLists(eps rtmgr.Endpoints) (*map[string]rtmgr.EndpointList, *map[string]map[string]rtmgr.EndpointList) {
+func (r *Rpe) getRouteRxTxLists(eps rtmgr.Endpoints) (*map[string]rtmgr.EndpointList, *map[string]map[string]rtmgr.EndpointList) {
txlist := make(map[string]rtmgr.EndpointList)
rxgroups := make(map[string]map[string]rtmgr.EndpointList)
for _, ep := range eps {
Gets the raw xapp list and creates a route table for
Returns the array of route table entries
*/
-func getRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
- tx, rx := getRouteRxTxLists(eps)
+func (r *Rpe) getRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
+ tx, rx := r.getRouteRxTxLists(eps)
var rt rtmgr.RouteTable
for _, messagetype := range rtmgr.MESSAGETYPES {
- if _, ok := (*tx)[messagetype]; !ok {
+ /*if _, ok := (*tx)[messagetype]; !ok {
continue
}
if _, ok := (*rx)[messagetype]; !ok {
continue
+ }*/
+ txList, ok := (*tx)[messagetype]
+ if !ok {
+ txList = rtmgr.EndpointList{}
}
var rxgroups []rtmgr.EndpointList
for _, endpointlist := range (*rx)[messagetype] {
rxgroups = append(rxgroups, endpointlist)
}
- rte := rtmgr.RouteTableEntry{
- messagetype,
- (*tx)[messagetype],
- rxgroups,
+ if len(txList) > 0 || len(rxgroups) > 0 {
+ rte := rtmgr.RouteTableEntry{
+ messagetype,
+ txList,
+ rxgroups,
+ -1,
+ }
+ rt = append(rt, rte)
}
- rt = append(rt, rte)
}
+ r.addStaticRoutes(eps, &rt)
+ r.addSubscriptionRoutes(eps, &rt, &rtmgr.Subs)
return &rt
}
+
+/*
+Adds specific static routes to the route table
+which cannot be calculated with endpoint tx/rx message types.
+*/
+func (r *Rpe) addStaticRoutes(eps rtmgr.Endpoints, rt *rtmgr.RouteTable) {
+ var uemanep, submanep *rtmgr.Endpoint
+ for _, ep := range eps {
+ if ep.Name == "UEMAN" {
+ uemanep = ep
+ }
+ if ep.Name == "SUBMAN" {
+ submanep = ep
+ }
+ }
+
+ if uemanep != nil && submanep != nil {
+ txlist := rtmgr.EndpointList{*uemanep}
+ rxlist := []rtmgr.EndpointList{[]rtmgr.Endpoint{*submanep}}
+ rte1 := rtmgr.RouteTableEntry{
+ rtmgr.MESSAGETYPES["RIC_SUB_REQ"],
+ txlist,
+ rxlist,
+ -1,
+ }
+ rte2 := rtmgr.RouteTableEntry{
+ rtmgr.MESSAGETYPES["RIC_SUB_DEL_REQ"],
+ txlist,
+ rxlist,
+ -1,
+ }
+ *rt = append(*rt, rte1)
+ *rt = append(*rt, rte2)
+ } else {
+ rtmgr.Logger.Warn("Cannot get the static route details of the platform components UEMAN/SUBMAN")
+ }
+}
+
+func getEndpointByName(eps *rtmgr.Endpoints, name string) *rtmgr.Endpoint {
+ for _, ep := range *eps {
+ if ep.Name == name {
+ rtmgr.Logger.Debug("name: %s", ep.Name)
+ rtmgr.Logger.Debug("ep: %v", ep)
+ return ep
+ }
+ }
+ return nil
+}
+
+func getEndpointByUuid(eps *rtmgr.Endpoints, uuid string) *rtmgr.Endpoint {
+ for _, ep := range *eps {
+ if ep.Uuid == uuid {
+ rtmgr.Logger.Debug("name: %s", ep.Uuid)
+ rtmgr.Logger.Debug("ep: %v", ep)
+ return ep
+ }
+ }
+ return nil
+}
+func (r *Rpe) addSubscriptionRoutes(eps rtmgr.Endpoints, rt *rtmgr.RouteTable, subs *rtmgr.SubscriptionList) {
+ rtmgr.Logger.Debug("rpe.addSubscriptionRoutes invoked")
+ rtmgr.Logger.Debug("params: %v", eps)
+ var e2termep, submanep, xappEp *rtmgr.Endpoint
+ var xappName string
+ e2termep = getEndpointByName(&eps, "E2TERM")
+ submanep = getEndpointByName(&eps, "SUBMAN")
+ if e2termep != nil && submanep != nil {
+ // looping through the subscription list, add routes one by one
+ for _, sub := range *subs {
+ // SubMan -> XApp
+ xappName = sub.Fqdn + ":" + strconv.Itoa(int(sub.Port))
+ xappEp = getEndpointByUuid(&eps, xappName)
+ if xappEp == nil {
+ rtmgr.Logger.Error("XApp not found: %s", xappName)
+ rtmgr.Logger.Debug("Endpoints: %v", eps)
+ } else {
+ txlist := rtmgr.EndpointList{*submanep}
+ rxlist := []rtmgr.EndpointList{[]rtmgr.Endpoint{*xappEp}}
+ subManMsgs := []string{"RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE"}
+ for _, entry := range subManMsgs {
+ rte := rtmgr.RouteTableEntry{
+ rtmgr.MESSAGETYPES[entry],
+ txlist,
+ rxlist,
+ sub.SubID,
+ }
+ *rt = append(*rt, rte)
+ }
+ // E2Term -> XApp
+ txlist = rtmgr.EndpointList{*e2termep}
+ rxlist = []rtmgr.EndpointList{[]rtmgr.Endpoint{*xappEp}}
+ e2apMsgs := []string{"RIC_CONTROL_ACK", "RIC_CONTROL_FAILURE", "RIC_INDICATION"}
+ for _, entry := range e2apMsgs {
+ rte := rtmgr.RouteTableEntry{
+ rtmgr.MESSAGETYPES[entry],
+ txlist,
+ rxlist,
+ sub.SubID,
+ }
+ *rt = append(*rt, rte)
+ }
+ }
+ }
+ rtmgr.Logger.Debug("addSubscriptionRoutes eps: %v", eps)
+ } else {
+ rtmgr.Logger.Warn("Subscription route update failure: Cannot get the static route details of the platform components E2TERM/SUBMAN")
+ }
+
+}
package rpe
-import "rtmgr"
+import "routing-manager/pkg/rtmgr"
type generatePolicies func(rtmgr.Endpoints) *[]string
+type getRouteTable func(rtmgr.Endpoints) *rtmgr.RouteTable
-type RpeEngine struct {
- Name string
- Version string
- Protocol string
+type RpeEngineConfig struct {
+ Name string
+ Version string
+ Protocol string
+ Instance RpeEngine
+ IsAvailable bool
}
-type RpeEngineConfig struct {
- Engine RpeEngine
- GeneratePolicies generatePolicies
- IsAvailable bool
+type RpeEngine interface {
+ GeneratePolicies(rtmgr.Endpoints) *[]string
+ GetRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable
}
import (
"github.com/jcelliott/lumber"
+ "errors"
+ "strings"
+ "os"
+ "io/ioutil"
+ "encoding/json"
)
var (
//TODO: temporary solution
// CamelCase Message Types are for being able to test with old fashioned admin controll xApps
+ // TODO: Add a seperate message definition file (Not using the one from RMR to not create dependency on that library).
MESSAGETYPES = map[string]string{
"HandoverPreparation": "0",
"HandoverCancel": "1",
"RIC_E2_MANAGER_HC_RESPONSE": "10008",
"RIC_CONTROL_XAPP_CONFIG_REQUEST": "100000",
"RIC_CONTROL_XAPP_CONFIG_RESPONSE": "100001",
+
+ "RIC_X2_SETUP_REQ": "10060",
+ "RIC_X2_SETUP_RESP": "10061",
+ "RIC_X2_SETUP_FAILURE": "10062",
+ "RIC_X2_RESET": "10070",
+ "RIC_X2_RESET_RESP": "10071",
+
+ "RIC_SUB_REQ": "12010",
+ "RIC_SUB_RESP": "12011",
+ "RIC_SUB_FAILURE": "12012",
+ "RIC_SUB_DEL_REQ": "12020",
+ "RIC_SUB_DEL_RESP": "12021",
+ "RIC_SUB_DEL_FAILURE": "12022",
+
+ "RIC_CONTROL_REQ": "12040",
+ "RIC_CONTROL_ACK": "12041",
+ "RIC_CONTROL_FAILURE": "12042",
+ "RIC_INDICATION": "12050",
+ "RIC_ENDC_X2_SETUP_REQ": "10360",
+ "RIC_ENDC_X2_SETUP_RESP": "10361",
+ "RIC_ENDC_X2_SETUP_FAILURE": "10362",
+ "RIC_ENDC_CONF_UPDATE": "10370",
+ "RIC_ENDC_CONF_UPDATE_ACK": "10371",
+ "RIC_ENDC_CONF_UPDATE_FAILURE": "10372",
+ "RIC_RES_STATUS_REQ": "10090",
+ "RIC_RES_STATUS_RESP": "10091",
+ "RIC_RES_STATUS_FAILURE": "10092",
+ "RIC_ENB_CONF_UPDATE": "10080",
+ "RIC_ENB_CONF_UPDATE_ACK": "10081",
+ "RIC_ENB_CONF_UPDATE_FAILURE": "10082",
+ "RIC_ENB_LOAD_INFORMATION": "10020",
+ "RIC_GNB_STATUS_INDICATION": "10450",
+ "RIC_RESOURCE_STATUS_UPDATE": "10100",
+ "RIC_ERROR_INDICATION": "10030",
+ "DC_ADM_INT_CONTROL": "20000",
+ "DC_ADM_INT_CONTROL_ACK": "20001",
}
+
+ // Messagetype mappings for the platform components.
+ // This implements static default routes needed by the RIC. Needs to be changed in case new components/message types needes to be added/updated.
+ // Representation : {"componentName1": {"tx": <tx message type list>, "rx": <rx message type list>}}
+ PLATFORMMESSAGETYPES = map[string]map[string][]string{
+ "E2TERM": {"tx": []string{"RIC_X2_SETUP_REQ", "RIC_X2_SETUP_RESP", "RIC_X2_SETUP_FAILURE", "RIC_X2_RESET", "RIC_X2_RESET_RESP", "RIC_ENDC_X2_SETUP_REQ", "RIC_ENDC_X2_SETUP_RESP", "RIC_ENDC_X2_SETUP_FAILURE", "RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE"}, "rx": []string{"RIC_X2_SETUP_REQ", "RIC_X2_SETUP_RESP", "RIC_X2_SETUP_FAILURE", "RIC_X2_RESET", "RIC_X2_RESET_RESP", "RIC_ENDC_X2_SETUP_REQ", "RIC_ENDC_X2_SETUP_RESP", "RIC_ENDC_X2_SETUP_FAILURE", "RIC_SUB_REQ", "RIC_SUB_DEL_REQ", "RIC_CONTROL_REQ"}},
+ "E2MAN": {"tx": []string{"RIC_X2_SETUP_REQ", "RIC_X2_SETUP_RESP", "RIC_X2_SETUP_FAILURE", "RIC_X2_RESET", "RIC_X2_RESET_RESP", "RIC_ENDC_X2_SETUP_REQ", "RIC_ENDC_X2_SETUP_RESP", "RIC_ENDC_X2_SETUP_FAILURE"}, "rx": []string{"RIC_X2_SETUP_REQ", "RIC_X2_SETUP_RESP", "RIC_X2_SETUP_FAILURE", "RIC_X2_RESET", "RIC_X2_RESET_RESP", "RIC_ENDC_X2_SETUP_REQ", "RIC_ENDC_X2_SETUP_RESP", "RIC_ENDC_X2_SETUP_FAILURE"}},
+ "SUBMAN": {"tx": []string{"RIC_SUB_REQ", "RIC_SUB_DEL_REQ"}, "rx": []string{"RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE"}},
+ "UEMAN": {"tx": []string{"RIC_CONTROL_REQ"}, "rx": []string{}},
+ }
+
Logger = lumber.NewConsoleLogger(lumber.INFO)
- Eps Endpoints
+ Eps Endpoints
+ Subs SubscriptionList
)
-func SetLogLevel(loglevel string) {
- switch loglevel {
+func SetLogLevel(loglevel string) error{
+ switch strings.ToUpper(loglevel) {
case "INFO":
Logger.Level(lumber.INFO)
+ return nil
case "WARN":
Logger.Level(lumber.WARN)
+ return nil
case "ERROR":
Logger.Level(lumber.ERROR)
+ return nil
case "DEBUG":
Logger.Info("debugmode")
Logger.Level(lumber.DEBUG)
+ return nil
+ default:
+ Logger.Error("Invalid log mode, setting info")
+ Logger.Level(lumber.INFO)
+ return errors.New("Invalid log level, setting info")
+ }
+}
+
+func GetPlatformComponents(configfile string) (*PlatformComponents, error) {
+ Logger.Debug("Invoked rtmgr.GetPlatformComponents("+ configfile +")")
+ var rcfg RtmgrConfig
+ jsonFile, err := os.Open(configfile)
+ if err != nil {
+ return nil, errors.New("cannot open the file due to: " + err.Error())
+ }
+ defer jsonFile.Close()
+ byteValue, err := ioutil.ReadAll(jsonFile)
+ if err != nil {
+ return nil, errors.New("cannot read the file due to: " + err.Error())
+ }
+ err = json.Unmarshal(byteValue, &rcfg)
+ if err != nil {
+ return nil, errors.New("cannot parse data due to: " + err.Error())
}
+ Logger.Debug("Platform components read from the configfile: %v", rcfg.Pcs)
+ return &(rcfg.Pcs), nil
}
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: rtmgr_test.go
+ Abstract:
+ Date: 14 May 2019
+*/
+
+package rtmgr
+
+import (
+ "testing"
+)
+
+func TestSetLogLevel(t *testing.T) {
+ modes_ok := []string{"info","warn","debug","error"}
+ modes_nok := []string{"inValId","LogLEVEL","PRoviDeD"}
+ for _, value := range(modes_ok) {
+ if SetLogLevel(value) != nil {
+ t.Error("Invalid log level: " + value)
+ }
+ }
+
+ for _, value := range(modes_nok) {
+ if SetLogLevel(value) == nil {
+ t.Error("Invalid log level: " + value)
+ }
+ }
+}
+
type Endpoints map[string]*Endpoint
+type SubscriptionList []Subscription
+
+
//TODO: uuid is not a real UUID but a string of "ip:port"
// this should be changed to real UUID later on which should come from xApp Manager // petszila
type Endpoint struct {
MessageType string
TxList EndpointList
RxGroups []EndpointList
+ SubID int16
}
type XApp struct {
TxMessages []string `json:"txMessages"`
RxMessages []string `json:"rxMessages"`
}
+
+type PlatformComponents []struct {
+ Name string `json:"name"`
+ Fqdn string `json:"fqdn"`
+ Port uint16 `json:"port"`
+}
+
+type RtmgrConfig struct {
+ Pcs PlatformComponents `json:"PlatformComponents"`
+}
+
+type RicComponents struct {
+ Xapps []XApp
+ Pcs PlatformComponents
+}
+
+type Subscription struct {
+ SubID int16
+ Fqdn string
+ Port uint16
+}
+
==================================================================================
*/
/*
- Mnemonic: nngpub.go
+ Mnemonic: NngPub.go
Abstract: mangos (NNG) Pub/Sub SBI implementation
Date: 12 March 2019
*/
import (
"errors"
- "nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/pub"
_ "nanomsg.org/go/mangos/v2/transport/all"
- "rtmgr"
+ "routing-manager/pkg/rtmgr"
"strconv"
)
-var socket mangos.Socket
+type NngPub struct {
+ Sbi
+ socket NngSocket
+ NewSocket CreateNewNngSocketHandler
+}
-func createNngPubEndpointSocket(ep *rtmgr.Endpoint) error {
- return nil
+func NewNngPub() *NngPub {
+ instance := new(NngPub)
+ instance.NewSocket = createNewPubSocket
+ return instance
}
-func destroyNngPubEndpointSocket(ep *rtmgr.Endpoint) error {
- return nil
+func createNewPubSocket() (NngSocket, error) {
+ rtmgr.Logger.Debug("Invoked createNewPubSocket()")
+ s, err := pub.NewSocket()
+ if err != nil {
+ return nil, errors.New("can't create new pub socket due to: " + err.Error())
+ }
+ return s, nil
}
-/*
-Creates the NNG publication channel
-*/
-func openNngPub(ip string) error {
+func (c *NngPub) Initialize(ip string) error {
+ rtmgr.Logger.Debug("Invoked sbi.Initialize("+ ip +")")
var err error
- if socket, err = pub.NewSocket(); err != nil {
- return errors.New("can't get new pub socket due to:" + err.Error())
+ c.socket, err = c.NewSocket()
+ if err != nil {
+ return errors.New("create socket error due to: " + err.Error())
+ }
+ if err = c.listen(ip); err != nil {
+ return errors.New("can't listen on socket due to: " + err.Error())
}
+ return nil
+}
+
+func (c *NngPub) Terminate() error {
+ rtmgr.Logger.Debug("Invoked sbi.Terminate()")
+ return c.closeSocket()
+}
+
+func (c *NngPub) AddEndpoint(ep *rtmgr.Endpoint) error {
+ return nil
+}
+
+func (c *NngPub) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+ return nil
+}
+
+func (c *NngPub) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+ c.updateEndpoints(rcs, c)
+}
+
+func (c *NngPub) listen(ip string) error {
+ rtmgr.Logger.Debug("Start listening on: " + ip)
uri := DEFAULT_NNG_PUBSUB_SOCKET_PREFIX + ip + ":" + strconv.Itoa(DEFAULT_NNG_PUBSUB_SOCKET_NUMBER)
rtmgr.Logger.Info("publishing on: " + uri)
- if err = socket.Listen(uri); err != nil {
- return errors.New("can't publish on socket " + uri + " due to:" + err.Error())
+ if err := c.socket.(NngSocket).Listen(uri); err != nil {
+ return errors.New("can't publish on socket " + uri + " due to: " + err.Error())
}
return nil
}
-func closeNngPub() error {
- if err := socket.Close(); err != nil {
- return errors.New("can't close socket due to:" + err.Error())
+func (c *NngPub) closeSocket() error {
+ rtmgr.Logger.Debug("Close NngPub Socket")
+ if err := c.socket.(NngSocket).Close(); err != nil {
+ return errors.New("can't close socket due to: " + err.Error())
}
return nil
}
-func publishAll(policies *[]string) error {
+func (c *NngPub) DistributeAll(policies *[]string) error {
+ rtmgr.Logger.Debug("Invoked: sbi.DistributeAll(), args: %v",(*policies))
for _, pe := range *policies {
- if err := socket.Send([]byte(pe)); err != nil {
+ if err := c.socket.(NngSocket).Send([]byte(pe)); err != nil {
return errors.New("Unable to send policy entry due to: " + err.Error())
}
}
- rtmgr.Logger.Info("NNG PUB: OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+ rtmgr.Logger.Info("NNG PUB: OK (# of Entries: " + strconv.Itoa(len((*policies))) + ")")
return nil
}
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: nngpub_test.go
+ Abstract:
+ Date: 25 April 2019
+*/
+package sbi
+
+import (
+ "errors"
+ "routing-manager/pkg/rtmgr"
+ "routing-manager/pkg/stub"
+ "testing"
+)
+
+/*
+Returns an error free Socket instance
+*/
+func createNewStubPubSocket() (NngSocket, error) {
+ socket := stub.MangosSocket{}
+ return socket, nil
+}
+
+/*
+Returns a SocketError
+*/
+func createNewStubPubSocketError() (NngSocket, error) {
+ return nil, errors.New("stub generated Create Socket error")
+}
+
+/*
+Returns a Socket which always generates error on Close()
+*/
+func createNewStubPubSocketCloseError() (NngSocket, error) {
+ socket := stub.MangosSocket{}
+ socket.GenerateSocketCloseError = true
+ return socket, nil
+}
+
+/*
+Returns a Socket which always generates error on Listen()
+*/
+func createNewStubPubSocketListenError() (NngSocket, error) {
+ socket := stub.MangosSocket{}
+ socket.GenerateSocketListenError = true
+ return socket, nil
+}
+
+/*
+Returns a Socket which always generates error on Send()
+*/
+func createNewStubPubSocketSendError() (NngSocket, error) {
+ socket := stub.MangosSocket{}
+ socket.GenerateSocketSendError = true
+ return socket, nil
+}
+
+/*
+Resets the EndpointList according to argumnets
+*/
+func resetTestPubDataset(instance NngPub, testdata []rtmgr.Endpoint) {
+ rtmgr.Eps = make(map[string]*rtmgr.Endpoint)
+ for _, endpoint := range testdata {
+ ep := endpoint
+ ep.Socket, _ = instance.NewSocket()
+ rtmgr.Eps[ep.Uuid] = &ep
+ }
+}
+
+/*
+nngPub.Initialize() method is tested for happy path case
+*/
+func TestNngPubInitialize(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocket
+
+ err := nngpub.Initialize("")
+ if err != nil {
+ t.Errorf("nngPub.Initialize() was incorrect, got: %v, want: %v.", err, nil)
+ }
+}
+
+/*
+nngPub.Initialize() is tested for Socket creating error case
+*/
+func TestNngPubInitializeWithSocketError(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocketError
+
+ err := nngpub.Initialize("")
+ if err == nil {
+ t.Errorf("nngPub.Initialize() was incorrect, got: %v, want: %v.", nil, "error")
+ }
+}
+
+/*
+nngPub.Initialize() is tested for Socket listening error case
+*/
+func TestNngPubInitializeWithSocketListenError(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocketListenError
+
+ err := nngpub.Initialize("")
+ if err == nil {
+ t.Errorf("nngPub.Initialize() was incorrect, got: %v, want: %v.", nil, "error")
+ }
+}
+
+/*
+nngPub.Terminate() method is empty, nothing to be tested
+*/
+func TestNngPubTerminate(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocket
+ nngpub.Initialize("")
+
+ err := nngpub.Terminate()
+ if err != nil {
+ t.Errorf("nngPub.Terminate() was incorrect, got: %v, want: %v.", err, nil)
+ }
+}
+
+/*
+nngPub.Terminate() is tested for Socket closing error case
+*/
+func TestNngPubTerminateWithSocketCloseError(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocketCloseError
+ nngpub.Initialize("")
+
+ err := nngpub.Terminate()
+ if err == nil {
+ t.Errorf("nngPub.Terminate() was incorrect, got: %v, want: %v.", nil, "error")
+ }
+}
+
+/*
+nngPub.UpdateEndpoints() is testd against stub.ValidXapps dataset
+*/
+func TestNngPubUpdateEndpoints(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocket
+ nngpub.Initialize("")
+ rtmgr.Eps = make(rtmgr.Endpoints)
+ nngpub.UpdateEndpoints(&stub.ValidRicComponents)
+ if rtmgr.Eps == nil {
+ t.Errorf("nngPub.UpdateEndpoints() result was incorrect, got: %v, want: %v.", nil, "rtmgr.Endpoints")
+ }
+}
+
+/*
+nngPub.AddEndpoint() method is empty, nothing to be tested
+*/
+func TestNngPubAddEndpoint(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocket
+
+ _ = nngpub.AddEndpoint(new(rtmgr.Endpoint))
+}
+
+/*
+nngPub.DeleteEndpoint() method is empty, nothing to be tested
+*/
+func TestNngPubDeleteEndpoint(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocket
+
+ _ = nngpub.DeleteEndpoint(new(rtmgr.Endpoint))
+}
+
+/*
+nngPub.DistributeAll() is tested for happy path case
+*/
+func TestNngPubDistributeAll(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocket
+ nngpub.Initialize("")
+ resetTestPubDataset(nngpub, stub.ValidEndpoints)
+
+ err := nngpub.DistributeAll(stub.ValidPolicies)
+ if err != nil {
+ t.Errorf("nngPub.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, nil)
+ }
+}
+
+/*
+nngPub.DistributeAll() is tested for Sending error case
+*/
+func TestNngPubDistributeAllSocketSendError(t *testing.T) {
+ var nngpub = NngPub{}
+ nngpub.NewSocket = createNewStubPubSocketSendError
+ nngpub.Initialize("")
+ resetTestPubDataset(nngpub, stub.ValidEndpoints)
+
+ err := nngpub.DistributeAll(stub.ValidPolicies)
+ if err == nil {
+ t.Errorf("nngPub.DistributeAll(policies) was incorrect, got: %v, want: %v.", nil, "error")
+ }
+}
"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/push"
_ "nanomsg.org/go/mangos/v2/transport/all"
- "rtmgr"
+ "routing-manager/pkg/rtmgr"
"strconv"
)
-func openNngPush(ip string) error {
- return nil
+type NngPush struct {
+ Sbi
+ NewSocket CreateNewNngSocketHandler
}
-func closeNngPush() error {
- return nil
+func NewNngPush() *NngPush {
+ instance := new(NngPush)
+ instance.NewSocket = createNewPushSocket
+ return instance
}
-func createNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
- rtmgr.Logger.Debug("Invoked sbi.createNngPushEndpointSocket")
- rtmgr.Logger.Debug("args: %v", (*ep))
- s, err := push.NewSocket()
+func createNewPushSocket() (NngSocket, error) {
+ rtmgr.Logger.Debug("Invoked: createNewPushSocket()")
+ socket, err := push.NewSocket()
if err != nil {
- return errors.New("can't open push socket for endpoint: " + ep.Name +" due to:" + err.Error())
+ return nil, errors.New("can't create new push socket due to:" + err.Error())
}
- s.SetPipeEventHook(pipeEventHandler)
- ep.Socket = s
- dial(ep)
- return nil
-}
-
-func destroyNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
- rtmgr.Logger.Debug("Invoked sbi.destroyNngPushEndpointSocket")
- rtmgr.Logger.Debug("args: %v", (*ep))
- if err:= ep.Socket.(mangos.Socket).Close(); err != nil {
- return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to:" + err.Error())
- }
- return nil
+ socket.SetPipeEventHook(pipeEventHandler)
+ return socket, nil
}
func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
+ rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
for _, ep := range rtmgr.Eps {
uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
if uri == pipe.Address() {
switch event {
case 1:
ep.IsReady = true
- rtmgr.Logger.Debug("Endpoint " + uri + " successfully registered")
+ rtmgr.Logger.Debug("Endpoint " + uri + " successfully attached")
default:
ep.IsReady = false
- rtmgr.Logger.Debug("Endpoint " + uri + " has been deregistered")
+ rtmgr.Logger.Debug("Endpoint " + uri + " has been detached")
}
- }
+ }
+ }
+}
+
+func (c *NngPush) Initialize(ip string) error {
+ return nil
+}
+
+func (c *NngPush) Terminate() error {
+ return nil
+}
+
+func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+ var err error
+ var socket NngSocket
+ rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
+ rtmgr.Logger.Debug("args: %v", (*ep))
+ socket, err = c.NewSocket()
+ if err != nil {
+ return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
+ }
+ ep.Socket = socket
+ err = c.dial(ep)
+ if err != nil {
+ return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
}
+ return nil
+}
+
+func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+ rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
+ rtmgr.Logger.Debug("args: %v", (*ep))
+ if err:= ep.Socket.(NngSocket).Close(); err != nil {
+ return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
+ }
+ return nil
+}
+
+func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+ c.updateEndpoints(rcs, c)
}
/*
NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
*/
-func dial(ep *rtmgr.Endpoint) {
+func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
options := make(map[string]interface{})
options[mangos.OptionDialAsynch] = true
- if err := ep.Socket.(mangos.Socket).DialOptions(uri, options); err != nil {
- rtmgr.Logger.Error("can't dial on push socket to " + uri + " due to:" + err.Error())
+ if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
+ return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
}
+ return nil
}
-func pushAll(policies *[]string) error {
- rtmgr.Logger.Debug("Invoked: sbi.pushAll")
+func (c *NngPush) DistributeAll(policies *[]string) error {
+ rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
rtmgr.Logger.Debug("args: %v", (*policies))
for _, ep := range rtmgr.Eps {
if ep.IsReady {
- go send(ep, policies)
+ go c.send(ep, policies)
} else {
- rtmgr.Logger.Warn("Endpoint " + ep.Uuid + "is not ready")
+ rtmgr.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
}
}
return nil
}
-func send(ep *rtmgr.Endpoint, policies *[]string) {
- rtmgr.Logger.Debug("Invoked: sbi.pushAll")
+func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
rtmgr.Logger.Debug("Push policy to endpoint: "+ ep.Uuid)
for _, pe := range *policies {
- if err := ep.Socket.(mangos.Socket).Send([]byte(pe)); err != nil {
+ if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
}
}
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: nngpush_test.go
+ Abstract:
+ Date: 3 May 2019
+*/
+package sbi
+
+import (
+ "errors"
+ "routing-manager/pkg/rtmgr"
+ "routing-manager/pkg/stub"
+ "testing"
+)
+
+/*
+Returns an error free Socket instance
+*/
+func createNewStubPushSocket() (NngSocket, error) {
+ socket := stub.MangosSocket{}
+ return socket, nil
+}
+
+/*
+Returns a SocketError
+*/
+func createNewStubPushSocketError() (NngSocket, error) {
+ return nil, errors.New("stub generated Create Socket error")
+}
+
+/*
+Returns a Socket which always generates error on Close()
+*/
+func createNewStubPushSocketCloseError() (NngSocket, error) {
+ socket := stub.MangosSocket{}
+ socket.GenerateSocketCloseError = true
+ return socket, nil
+}
+
+/*
+Returns a Socket which always generates error on Send()
+*/
+func createNewStubPushSocketSendError() (NngSocket, error) {
+ socket := stub.MangosSocket{}
+ socket.GenerateSocketSendError = true
+ return socket, nil
+}
+
+/*
+Returns a Socket which always generates error on Dial()
+*/
+func createNewStubPushSocketDialError() (NngSocket, error) {
+ socket := stub.MangosSocket{}
+ socket.GenerateSocketDialError = true
+ return socket, nil
+}
+
+/*
+Resets the EndpointList according to argumnets
+*/
+func resetTestPushDataset(instance NngPush, testdata []rtmgr.Endpoint) {
+ rtmgr.Eps = make(map[string]*rtmgr.Endpoint)
+ for _, endpoint := range testdata {
+ ep := endpoint
+ ep.Socket, _ = instance.NewSocket()
+ rtmgr.Eps[ep.Uuid] = &ep
+ }
+}
+
+/*
+nngpush.Initialize() method is empty, nothing to be tested
+*/
+func TestNngPushInitialize(t *testing.T) {
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocket
+
+ _ = nngpush.Initialize("")
+}
+
+/*
+nngpush.Terminate() method is empty, nothing to be tested
+*/
+func TestNngPushTerminate(t *testing.T) {
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocket
+
+ _ = nngpush.Terminate()
+}
+
+/*
+nngpush.UpdateEndpoints() is testd against stub.ValidXapps dataset
+*/
+func TestNngPushUpdateEndpoints(t *testing.T) {
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocket
+ rtmgr.Eps = make(rtmgr.Endpoints)
+
+ nngpush.UpdateEndpoints(&stub.ValidRicComponents)
+ if rtmgr.Eps == nil {
+ t.Errorf("nngpush.UpdateEndpoints() result was incorrect, got: %v, want: %v.", nil, "rtmgr.Endpoints")
+ }
+}
+
+/*
+nngpush.AddEndpoint() is tested for happy path case
+*/
+func TestNngPushAddEndpoint(t *testing.T) {
+ var err error
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocket
+ resetTestPushDataset(nngpush, stub.ValidEndpoints)
+
+ err = nngpush.AddEndpoint(rtmgr.Eps["10.0.0.1:0"])
+ if err != nil {
+ t.Errorf("nngpush.AddEndpoint() return was incorrect, got: %v, want: %v.", err, "nil")
+ }
+ if rtmgr.Eps["10.0.0.1:0"].Socket == nil {
+ t.Errorf("nngpush.AddEndpoint() was incorrect, got: %v, want: %v.", nil, "Socket")
+ }
+}
+
+/*
+nngpush.AddEndpoint() is tested for Socket creating error case
+*/
+func TestNngPushAddEndpointWithSocketError(t *testing.T) {
+ var err error
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocketError
+ resetTestPushDataset(nngpush, stub.ValidEndpoints)
+
+ err = nngpush.AddEndpoint(rtmgr.Eps["10.0.0.1:0"])
+ if err == nil {
+ t.Errorf("nngpush.AddEndpoint() was incorrect, got: %v, want: %v.", nil, "error")
+ }
+ if rtmgr.Eps["10.0.0.1:0"].Socket != nil {
+ t.Errorf("nngpush.AddEndpoint() was incorrect, got: %v, want: %v.", rtmgr.Eps["10.0.0.1:0"].Socket, nil)
+ }
+}
+
+/*
+nngpush.AddEndpoint() is tested for Dialing error case
+*/
+func TestNngPushAddEndpointWithSocketDialError(t *testing.T) {
+ var err error
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocketDialError
+ resetTestPushDataset(nngpush, stub.ValidEndpoints)
+
+ err = nngpush.AddEndpoint(rtmgr.Eps["10.0.0.1:0"])
+ if err == nil {
+ t.Errorf("nngpush.AddEndpoint() was incorrect, got: %v, want: %v.", nil, "error")
+ }
+}
+
+/*
+nngpush.DistributeAll() is tested for happy path case
+*/
+func TestNngPushDistributeAll(t *testing.T) {
+ var err error
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocket
+ resetTestPushDataset(nngpush, stub.ValidEndpoints)
+
+ err = nngpush.DistributeAll(stub.ValidPolicies)
+ if err != nil {
+ t.Errorf("nngpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil")
+ }
+}
+
+/*
+nngpush.DistributeAll() is tested for Sending error case
+*/
+func TestNngPushDistributeAllSocketSendError(t *testing.T) {
+ var err error
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocketSendError
+ resetTestPushDataset(nngpush, stub.ValidEndpoints)
+
+ err = nngpush.DistributeAll(stub.ValidPolicies)
+ if err != nil {
+ t.Errorf("nngpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil")
+ }
+}
+
+func TestNngPushDeleteEndpoint(t *testing.T) {
+ var err error
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocket
+ resetTestPushDataset(nngpush, stub.ValidEndpoints)
+
+ err = nngpush.DeleteEndpoint(rtmgr.Eps["10.0.0.1:0"])
+ if err != nil {
+ t.Errorf("nngpush.DeleteEndpoint() was incorrect, got: %v, want: %v.", err, "nil")
+ }
+}
+
+func TestNngPushDeleteEndpointWithSocketCloseError(t *testing.T) {
+ var err error
+ var nngpush = NngPush{}
+ nngpush.NewSocket = createNewStubPushSocketCloseError
+ resetTestPushDataset(nngpush, stub.ValidEndpoints)
+
+ err = nngpush.DeleteEndpoint(rtmgr.Eps["10.1.1.1:0"])
+ if err == nil {
+ t.Errorf("nngpush.DeleteEndpoint() was incorrect, got: %v, want: %v.", nil, "error")
+ }
+}
import (
"errors"
- "fmt"
- "rtmgr"
"strconv"
+ "routing-manager/pkg/rtmgr"
)
const DEFAULT_NNG_PUBSUB_SOCKET_PREFIX = "tcp://"
const DEFAULT_NNG_PUBSUB_SOCKET_NUMBER = 4560
const DEFAULT_NNG_PIPELINE_SOCKET_PREFIX = "tcp://"
const DEFAULT_NNG_PIPELINE_SOCKET_NUMBER = 4561
+const PLATFORMTYPE = "platformcomponenttype"
var (
SupportedSbis = []*SbiEngineConfig{
&SbiEngineConfig{
- SbiEngine{
- Name: "nngpub",
- Version: "v1",
- Protocol: "nngpubsub",
- },
- openSocket(openNngPub),
- closeSocket(closeNngPub),
- createEndpointSocket(createNngPubEndpointSocket),
- destroyEndpointSocket(createNngPubEndpointSocket),
- distributeAll(publishAll),
- true,
- },
+ Name: "nngpush",
+ Version: "v1",
+ Protocol: "nngpipeline",
+ Instance: NewNngPush(),
+ IsAvailable: true,
+ },
&SbiEngineConfig{
- SbiEngine{
- Name: "nngpush",
- Version: "v1",
- Protocol: "nngpipeline",
- },
- openSocket(openNngPush),
- closeSocket(closeNngPush),
- createEndpointSocket(createNngPushEndpointSocket),
- destroyEndpointSocket(destroyNngPushEndpointSocket),
- distributeAll(pushAll),
- true,
- },
+ Name: "nngpub",
+ Version: "v1",
+ Protocol: "nngpubsub",
+ Instance: NewNngPub(),
+ IsAvailable: true,
+ },
}
)
-func ListSbis() {
- fmt.Printf("SBI:\n")
+func GetSbi(sbiName string) (SbiEngine, error) {
for _, sbi := range SupportedSbis {
- if sbi.IsAvailable {
- rtmgr.Logger.Info(sbi.Engine.Name + "/" + sbi.Engine.Version)
+ if sbi.Name == sbiName && sbi.IsAvailable {
+ return sbi.Instance, nil
}
}
+ return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
}
-func GetSbi(sbiName string) (*SbiEngineConfig, error) {
- for _, sbi := range SupportedSbis {
- if (*sbi).Engine.Name == sbiName && (*sbi).IsAvailable {
- return sbi, nil
- }
- }
- return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
+type Sbi struct {
+
}
-func pruneEndpointList(sbii *SbiEngineConfig) {
+func (s *Sbi) pruneEndpointList(sbii SbiEngine) {
for _, ep := range rtmgr.Eps {
if !ep.Keepalive {
- sbii.DestroyEndpointSocket(ep)
+ rtmgr.Logger.Debug("deleting %v",ep)
+ sbii.DeleteEndpoint(ep)
delete(rtmgr.Eps, ep.Uuid)
} else {
rtmgr.Eps[ep.Uuid].Keepalive = false
}
}
-func UpdateEndpointList(xapps *[]rtmgr.XApp, sbii *SbiEngineConfig) {
- for _, xapp := range *xapps {
+func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbii SbiEngine) {
+ for _, xapp := range (*rcs).Xapps {
for _, instance := range xapp.Instances {
uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
if _, ok := rtmgr.Eps[uuid]; ok {
false,
true,
}
- if err := sbii.CreateEndpointSocket(ep); err != nil {
+ if err := sbii.AddEndpoint(ep); err != nil {
rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
continue
}
}
}
}
- pruneEndpointList(sbii)
+ s.updatePlatformEndpoints(&((*rcs).Pcs), sbii)
+ s.pruneEndpointList(sbii)
+}
+
+func (s *Sbi ) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbii SbiEngine) {
+ rtmgr.Logger.Debug("updatePlatformEndpoints invoked. PCS: %v", *pcs)
+ for _, pc := range *pcs {
+ uuid := pc.Fqdn + ":" + strconv.Itoa(int(pc.Port))
+ if _, ok := rtmgr.Eps[uuid]; ok {
+ rtmgr.Eps[uuid].Keepalive = true
+ } else {
+ ep := &rtmgr.Endpoint{
+ uuid,
+ pc.Name,
+ PLATFORMTYPE,
+ pc.Fqdn,
+ pc.Port,
+ rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
+ rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
+ nil,
+ false,
+ true,
+ }
+ rtmgr.Logger.Debug("ep created: %v",ep)
+ if err := sbii.AddEndpoint(ep); err != nil {
+ rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+ continue
+ }
+ rtmgr.Eps[uuid] = ep
+ }
+ }
}
+
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: sbi_test.go
+ Abstract:
+ Date: 25 April 2019
+*/
+package sbi
+
+import (
+ "testing"
+ "reflect"
+ "errors"
+)
+
+func TestGetSbi(t *testing.T) {
+ var errtype = errors.New("")
+ var sbitype = new(NngPub)
+ var invalids = []string{"nngpus", ""}
+
+ sbii, err := GetSbi("nngpub")
+ if err != nil {
+ t.Errorf("GetSbi(nngpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(err), nil)
+ }
+ if reflect.TypeOf(sbii) != reflect.TypeOf(sbitype) {
+ t.Errorf("GetSbi(nngpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(sbii), reflect.TypeOf(sbitype))
+ }
+
+ for _, arg := range invalids {
+ _, err := GetSbi(arg)
+ if err == nil {
+ t.Errorf("GetSbi("+arg+") was incorrect, got: %v, want: %v.", reflect.TypeOf(err), reflect.TypeOf(errtype))
+ }
+ }
+}
package sbi
-import "rtmgr"
+import "routing-manager/pkg/rtmgr"
-type distributeAll func(*[]string) error
-type openSocket func(string) error
-type closeSocket func() error
-type createEndpointSocket func(*rtmgr.Endpoint) error
-type destroyEndpointSocket func(*rtmgr.Endpoint) error
-
-
-type SbiEngine struct {
+type SbiEngineConfig struct {
Name string
Version string
- Protocol string
+ Protocol string
+ Instance SbiEngine
+ IsAvailable bool
}
-type SbiEngineConfig struct {
- Engine SbiEngine
- OpenSocket openSocket
- CloseSocket closeSocket
- CreateEndpointSocket createEndpointSocket
- DestroyEndpointSocket destroyEndpointSocket
- DistributeAll distributeAll
- IsAvailable bool
+type SbiEngine interface {
+ Initialize(string) error
+ Terminate() error
+ DistributeAll(*[]string) error
+ AddEndpoint(*rtmgr.Endpoint) error
+ DeleteEndpoint(*rtmgr.Endpoint) error
+ UpdateEndpoints(*rtmgr.RicComponents)
}
+
+type NngSocket interface {
+ Listen(string) error
+ Send([]byte) error
+ Close() error
+ DialOptions(string, map[string]interface{}) error
+}
+
+type CreateNewNngSocketHandler func() (NngSocket,error)
"errors"
"io/ioutil"
"os"
- "rtmgr"
+ "routing-manager/pkg/rtmgr"
)
/*
Parses the JSON content and loads each xApp entry into an xApp object
Returns an array os xApp object
*/
-func fileReadAll(file string) (*[]rtmgr.XApp, error) {
- rtmgr.Logger.Debug("Invoked file.fileReadAll")
- rtmgr.Logger.Debug("file.fileReadAll opens file: " + file)
- var xapps *[]rtmgr.XApp
+
+type File struct {
+ Sdl
+}
+
+func NewFile() *File {
+ instance := new(File)
+ return instance
+}
+
+func (f *File) ReadAll(file string) (*rtmgr.RicComponents, error) {
+ rtmgr.Logger.Debug("Invoked sdl.ReadAll("+ file +")")
+ var rcs *rtmgr.RicComponents
jsonFile, err := os.Open(file)
if err != nil {
return nil, errors.New("cannot open the file due to: " + err.Error())
if err != nil {
return nil, errors.New("cannot read the file due to: " + err.Error())
}
- err = json.Unmarshal(byteValue, &xapps)
+ err = json.Unmarshal(byteValue, &rcs)
if err != nil {
return nil, errors.New("cannot parse data due to: " + err.Error())
}
- rtmgr.Logger.Debug("file.fileReadAll returns: %v", xapps)
- return xapps, nil
+ rtmgr.Logger.Debug("file.fileReadAll returns: %v", rcs)
+ return rcs, nil
}
-func fileWriteAll(file string, xapps *[]rtmgr.XApp) error {
- rtmgr.Logger.Debug("Invoked file.fileWriteAll")
+func (f *File) WriteAll(file string, rcs *rtmgr.RicComponents) error {
+ rtmgr.Logger.Debug("Invoked sdl.WriteAll")
rtmgr.Logger.Debug("file.fileWriteAll writes into file: " + file)
- rtmgr.Logger.Debug("file.fileWriteAll writes data: %v", (*xapps))
- byteValue, err := json.Marshal(xapps)
+ rtmgr.Logger.Debug("file.fileWriteAll writes data: %v", (*rcs))
+ byteValue, err := json.Marshal(rcs)
+ if err != nil {
+ return errors.New("cannot convert data due to: " + err.Error())
+ }
+ err = ioutil.WriteFile(file, byteValue, 0644)
+ if err != nil {
+ return errors.New("cannot write file due to: " + err.Error())
+ }
+ return nil
+}
+
+func (f *File) WriteXapps(file string, xapps *[]rtmgr.XApp) error {
+ rtmgr.Logger.Debug("Invoked sdl.WriteXapps")
+ rtmgr.Logger.Debug("file.fileWriteXapps writes into file: " + file)
+ rtmgr.Logger.Debug("file.fileWriteXapps writes data: %v", (*xapps))
+
+ ricData, err := NewFile().ReadAll(file)
+ if err != nil || ricData == nil {
+ rtmgr.Logger.Error("cannot get data from sdl interface due to: " + err.Error())
+ return errors.New("cannot read full ric data to modify xapps data, due to: " + err.Error())
+ }
+
+ ricData.Xapps = *xapps
+
+ byteValue, err := json.Marshal(ricData)
if err != nil {
return errors.New("cannot convert data due to: " + err.Error())
}
import (
"errors"
- "fmt"
- "rtmgr"
)
var (
SupportedSdls = []*SdlEngineConfig{
&SdlEngineConfig{
- SdlEngine{
- Name: "file",
- Version: "v1",
- Protocol: "rawfile",
- },
- readAll(fileReadAll),
- writeAll(fileWriteAll),
- true,
+ Name: "file",
+ Version: "v1",
+ Protocol: "rawfile",
+ Instance: NewFile(),
+ IsAvailable: true,
},
&SdlEngineConfig{
- SdlEngine{
- Name: "redis",
- Version: "v1",
- Protocol: "nsdl",
- },
- readAll(nil),
- writeAll(nil),
- false,
+ Name: "redis",
+ Version: "v1",
+ Protocol: "ndsl",
+ Instance: nil,
+ IsAvailable: false,
},
}
)
-func ListSdls() {
- fmt.Printf("SDL:\n")
+func GetSdl(sdlName string) (SdlEngine, error) {
for _, sdl := range SupportedSdls {
- if sdl.IsAvailable {
- rtmgr.Logger.Info(sdl.Engine.Name + "/" + sdl.Engine.Version)
+ if sdl.Name == sdlName && sdl.IsAvailable {
+ return sdl.Instance, nil
}
}
+ return nil, errors.New("SDL:" + sdlName + " is not supported or still not a available")
}
-func GetSdl(sdlName string) (*SdlEngineConfig, error) {
- for _, sdl := range SupportedSdls {
- if sdl.Engine.Name == sdlName && sdl.IsAvailable {
- return sdl, nil
- }
- }
- return nil, errors.New("SDL:" + sdlName + " is not supported or still not a available")
+type Sdl struct {
+
}
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: nngpub_test.go
+ Abstract:
+ Date: 25 April 2019
+*/
+package sdl
+
+import (
+ "routing-manager/pkg/stub"
+ "testing"
+)
+
+/*
+RmrPub.GeneratePolicies() method is tested for happy path case
+*/
+func TestFileWriteAll(t *testing.T) {
+ var err error
+ var file = File{}
+
+ err = file.WriteAll("ut.rt", &stub.ValidRicComponents)
+ t.Log(err)
+}
+
+/*
+RmrPush.GeneratePolicies() method is tested for happy path case
+*/
+func TestFileReadAll(t *testing.T) {
+ var err error
+ var file = File{}
+
+ data, err := file.ReadAll("ut.rt")
+ t.Log(data)
+ t.Log(err)
+}
*/
package sdl
-import "rtmgr"
+import "routing-manager/pkg/rtmgr"
-type readAll func(string) (*[]rtmgr.XApp, error)
-type writeAll func(string, *[]rtmgr.XApp) error
+type readAll func(string) (*rtmgr.RicComponents, error)
+type writeAll func(string, *rtmgr.RicComponents) error
-type SdlEngine struct {
+type SdlEngineConfig struct {
Name string
Version string
- Protocol string
+ Protocol string
+ Instance SdlEngine
+ IsAvailable bool
}
-type SdlEngineConfig struct {
- Engine SdlEngine
- ReadAll readAll
- WriteAll writeAll
- IsAvailable bool
+type SdlEngine interface {
+ ReadAll(string) (*rtmgr.RicComponents, error)
+ WriteAll(string, *rtmgr.RicComponents) error
+ WriteXapps(string, *[]rtmgr.XApp) error
}
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: mangos.go
+ Abstract:
+ Date: 3 May 2019
+*/
+
+package stub
+
+import "errors"
+
+type MangosMessage struct {
+ Header []byte
+ Body []byte
+ Pipe MangosPipe
+ bbuf []byte
+ hbuf []byte
+ bsize int
+ pool interface {}
+}
+
+type MangosProtocolInfo struct {
+ Self uint16
+ Peer uint16
+ SelfName string
+ PeerName string
+}
+
+// Mangos Listener Stub
+
+type MangosListener struct {
+
+}
+
+func (l MangosListener) Listen() error {
+ return nil
+}
+
+func (l MangosListener) Close() error {
+ return nil
+}
+
+func (l MangosListener) Address() string {
+ return ""
+}
+
+func (l MangosListener) SetOption(s string, i interface{}) error {
+ return nil
+}
+
+func (l MangosListener) GetOption(s string) (interface{},error) {
+ return nil, nil
+}
+
+// Mangos Dialer Stub
+
+type MangosDialer struct {
+ }
+
+func (d MangosDialer) Open() error {
+ return nil
+}
+
+func (d MangosDialer) Close() error {
+ return nil
+}
+
+func (d MangosDialer) Address() string {
+ return ""
+}
+
+func (d MangosDialer) SetOption(s string, i interface{}) error {
+ return nil
+}
+
+func (d MangosDialer) GetOption(s string) (interface{},error) {
+ return nil, nil
+}
+
+// Mangos Context Stub
+
+type MangosContext struct {
+
+}
+
+func (c MangosContext) Close() error {
+ return nil
+}
+
+func (c MangosContext) SetOption(s string, i interface{}) error {
+ return nil
+}
+
+func (c MangosContext) GetOption(s string) (interface{},error) {
+ return nil, nil
+}
+
+func (c MangosContext) Send(b []byte) error {
+ return nil
+}
+
+func (c MangosContext) Recv() ([]byte, error) {
+ return make([]byte,0), nil
+}
+
+func (c MangosContext) SendMsg(*MangosMessage) error {
+ return nil
+}
+
+func (c MangosContext) RecvMsg() (*MangosMessage, error) {
+ return nil, nil
+}
+
+// Mangos Pipe Stub
+
+type MangosPipe struct {
+
+}
+
+func (p MangosPipe) ID() uint32 {
+ return 0
+}
+
+func (p MangosPipe) Listener() MangosListener {
+ return MangosListener{}
+}
+
+func (p MangosPipe) Dialer() MangosDialer {
+ return MangosDialer{}
+}
+
+func (p MangosPipe) Close() error {
+ return nil
+}
+
+func (p MangosPipe) Address() string {
+ return ""
+}
+
+func (p MangosPipe) GetOption(s string) (interface{},error) {
+ return nil, nil
+}
+
+// Mangos PipeEventHook Stub
+
+type PipeEventHook func(int, MangosPipe)
+
+// Mangos Socket Stub
+
+type MangosSocket struct {
+ GenerateSocketCloseError bool
+ GenerateSocketSendError bool
+ GenerateSocketDialError bool
+ GenerateSocketListenError bool
+}
+
+func (s MangosSocket) Info() MangosProtocolInfo {
+ return MangosProtocolInfo{}
+}
+
+func (s MangosSocket) Close() error {
+ if s.GenerateSocketCloseError {
+ return errors.New("stub generated Socket Close error")
+ }
+ return nil
+}
+
+func (s MangosSocket) Send(b []byte) error {
+ if s.GenerateSocketSendError {
+ return errors.New("stub generated Socket Send error")
+ }
+ return nil
+}
+
+func (s MangosSocket) Recv() ([]byte, error) {
+ return make([]byte,0), nil
+}
+
+func (s MangosSocket) SendMsg(*MangosMessage) error {
+ return nil
+}
+
+func (s MangosSocket) RecvMsg() (*MangosMessage, error) {
+ return nil, nil
+}
+
+func (s MangosSocket) Dial(t string) error {
+ if s.GenerateSocketDialError {
+ return errors.New("stub generated Socket Dial error")
+ }
+ return nil
+}
+
+func (s MangosSocket) DialOptions(t string, m map[string]interface{}) error {
+ if err := s.Dial(t); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s MangosSocket) NewDialer(t string, m map[string]interface{}) (MangosDialer, error) {
+ return MangosDialer{}, nil
+}
+
+func (s MangosSocket) Listen(t string) error {
+ if s.GenerateSocketListenError {
+ return errors.New("stub generated Socket Listen error")
+ }
+ return nil
+}
+
+func (s MangosSocket) ListenOptions(t string, m map[string]interface{}) error {
+ return nil
+}
+
+func (s MangosSocket) NewListener(t string, m map[string]interface{}) (MangosListener, error) {
+ return MangosListener{}, nil
+}
+
+func (s MangosSocket) SetOption(t string, i interface{}) error {
+ return nil
+}
+
+func (s MangosSocket) GetOption(t string) (interface{},error) {
+ return nil, nil
+}
+
+func (s MangosSocket) OpenContext() (MangosContext, error) {
+ return MangosContext{}, nil
+}
+
+func (s MangosSocket) SetPipeEventHook(p PipeEventHook) PipeEventHook {
+ return nil
+}
+
+// Mangos ProtocolPipe Stub
+
+type MangosProtocolPipe struct {
+
+}
+
+func (p MangosProtocolPipe) ID() uint32 {
+ return 0
+}
+
+func (p MangosProtocolPipe) Close() error {
+ return nil
+}
+
+func (p MangosProtocolPipe) SendMsg(m *MangosMessage) error {
+ return nil
+}
+
+func (p MangosProtocolPipe) RecvMsg() *MangosMessage {
+ return nil
+}
+
+// Mangos ProtocolContext Stub
+
+type MangosProtocolContext struct {
+
+}
+
+func (p MangosProtocolContext) Close() error {
+ return nil
+}
+
+func (p MangosProtocolContext) SendMsg(m *MangosMessage) error {
+ return nil
+}
+
+func (p MangosProtocolContext) RecvMsg() (*MangosMessage, error) {
+ return nil, nil
+}
+
+func (p MangosProtocolContext) GetOption(s string) (interface{}, error) {
+ return nil, nil
+}
+
+func (p MangosProtocolContext) SetOption(s string, i interface{}) error {
+ return nil
+}
+
+// Mangos ProtocolBase Stub
+
+type MangosProtocolBase struct {
+ MangosProtocolContext
+}
+
+func (p MangosProtocolBase) Info() MangosProtocolInfo {
+ return MangosProtocolInfo{}
+}
+
+func (p MangosProtocolBase) AddPipe(t MangosProtocolPipe) error {
+ return nil
+}
+
+func (p MangosProtocolBase) RemovePipe(MangosProtocolPipe) {
+
+}
+
+func (p MangosProtocolBase) OpenContext() (MangosProtocolContext, error) {
+ return MangosProtocolContext{}, nil
+}
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+/*
+ Mnemonic: stub.go
+ Abstract:
+ Date: 27 April 2019
+*/
+
+package stub
+
+import "routing-manager/pkg/rtmgr"
+
+var ValidXapps = &[]rtmgr.XApp{
+ rtmgr.XApp{Name: "app1", Status: "", Version: "", Instances: []rtmgr.XAppInstance{rtmgr.XAppInstance{Name: "instance1", Status: "unknown", Ip: "10.0.0.1", Port: 0, TxMessages: []string{"HandoverPreparation", "HandoverCancel"}, RxMessages: []string{"HandoverPreparation", "HandoverCancel"}}}},
+ rtmgr.XApp{Name: "app2", Status: "", Version: "", Instances: []rtmgr.XAppInstance{rtmgr.XAppInstance{Name: "instance2", Status: "unknown", Ip: "192.168.0.1", Port: 0, TxMessages: []string{"HandoverCancel", "HandoverPreparation"}, RxMessages: []string{"HandoverPreparation", "HandoverCancel"}}}},
+ rtmgr.XApp{Name: "app3", Status: "", Version: "", Instances: []rtmgr.XAppInstance{rtmgr.XAppInstance{Name: "instance2", Status: "unknown", Ip: "10.1.1.1", Port: 0, TxMessages: []string{"X2Setup"}, RxMessages: []string{"Reset", "UEContextRelease"}}}},
+ rtmgr.XApp{Name: "app4", Status: "", Version: "", Instances: []rtmgr.XAppInstance{rtmgr.XAppInstance{Name: "instance2", Status: "unknown", Ip: "10.2.2.1", Port: 0, TxMessages: []string{"Reset", "UEContextRelease"}, RxMessages: []string{"", ""}}}},
+}
+
+var ValidPlatformComponents = &rtmgr.PlatformComponents {
+ {Name:"E2TERM", Fqdn:"e2term", Port:4561},
+ {Name:"SUBMAN", Fqdn:"subman", Port:4561},
+ {Name:"E2MAN", Fqdn:"e2man", Port:4561},
+ {Name:"UEMAN", Fqdn:"ueman", Port:4561},
+}
+
+var ValidEndpoints = []rtmgr.Endpoint{
+ rtmgr.Endpoint{Uuid: "10.0.0.1:0", Name: "i1", XAppType: "app1", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: true},
+ rtmgr.Endpoint{Uuid: "192.168.0.1:0", Name: "i2", XAppType: "app2", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: false, Keepalive: false},
+ rtmgr.Endpoint{Uuid: "10.1.1.1:0", Name: "i3", XAppType: "app3", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: false},
+ rtmgr.Endpoint{Uuid: "10.2.2.1:0", Name: "i4", XAppType: "app4", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: false, Keepalive: true},
+}
+
+var ValidRicComponents = rtmgr.RicComponents {
+ *ValidXapps, *ValidPlatformComponents,
+}
+
+var ValidPolicies = &[]string{"", ""}
# Abstract: Runs the rtmgr executable with proper arguments
# Date: 19 March 2019
#
-exec ./rtmgr -xm-url=$XMURL -sbi=$SBI -sbi-if=$IP -filename=$RTFILE -rpe=$RPE -loglevel=INFO
+exec ./rtmgr -xm-url=$XMURL -nbi=$NBI -nbi-if=$NBIURL -sbi=$SBI -sbi-if=$SBIURL -filename=$RTFILE -rpe=$RPE -loglevel=$LOGLEVEL -configfile=$CFGFILE
--- /dev/null
+{
+ "PlatformComponents":
+ [
+ {
+ "name": "E2TERM",
+ "fqdn": "e2term",
+ "port": 4561
+ },
+ {
+ "name": "SUBMAN",
+ "fqdn": "subman",
+ "port": 4562
+ },
+ {
+ "name": "E2MAN",
+ "fqdn": "e2man",
+ "port": 4563
+ },
+ {
+ "name": "UEMAN",
+ "fqdn": "ueman",
+ "port": 4564
+ }
+ ]
+}
+
--- /dev/null
+{
+ "id": 1,
+ "event": "whatever",
+ "data-version": 0,
+ "data":
+ "[{
+ \"name\": \"xapp-01\",
+ \"status\": \"unknown\",
+ \"version\": \"1.2.3\",
+ \"instances\": [{
+ \"name\": \"xapp-01-instance-01\",
+ \"status\": \"pending\",
+ \"ip\": \"10.244.0.51\",
+ \"port\": 4560,
+ \"txMessages\": [\"ControlIndication\", \"HandoverPreparation\"],
+ \"rxMessages\": [\"LoadIndication\", \"Reset\"]
+ },{
+ \"name\": \"xapp-01-instance-02\",
+ \"status\": \"pending\",
+ \"ip\": \"10.244.0.52\",
+ \"port\": 4560,
+ \"txMessages\": [\"ControlIndication\", \"HandoverPreparation\"],
+ \"rxMessages\": [\"LoadIndication\", \"Reset\"]
+ }]
+ },{
+ \"name\": \"xapp-02\",
+ \"status\": \"unknown\",
+ \"version\": \"1.2.3\",
+ \"instances\": [{
+ \"name\": \"xapp-02-instance-01\",
+ \"status\": \"pending\",
+ \"ip\": \"10.244.0.53\",
+ \"port\": 4560,
+ \"txMessages\": [\"ControlIndication\", \"HandoverPreparation\"],
+ \"rxMessages\": [\"LoadIndication\", \"Reset\"]
+ },{
+ \"name\": \"xapp-02-instance-01\",
+ \"status\": \"pending\",
+ \"ip\": \"10.244.0.54\",
+ \"port\": 4560,
+ \"txMessages\": [\"ControlIndication\", \"HandoverPreparation\"],
+ \"rxMessages\": [\"LoadIndication\", \"Reset\"]
+ }]
+ }]"
+}
\ No newline at end of file
#
FROM node
+COPY middleware.js middleware.js
+
RUN npm install -g json-server
--- /dev/null
+module.exports = function (req, res, next) {
+ if (req.method === 'POST') {
+ // Converts POST to GET and move payload to query params
+ // This way it will make JSON Server that it's GET request
+ req.method = 'GET'
+ req.query = req.body
+ }
+ // Continue to JSON Server router
+ next()
+}
args:
- /run_test-tx.sh
ports:
- - containerPort: 4555
+ - containerPort: 4561
env:
- name: NAME
value: "RM"
- name: PORT
- value: "tcp:4555"
+ value: "tcp:4561"
- name: RATE
value: "10"
- name: RMR_RTG_SVC
xmgrdata: |
{
"xapps":
- [{
- "name": "admin",
- "status": "unknown",
- "version": "1.2.3",
- "instances": [{
- "name": "admin-01",
- "status": "pending",
- "ip": "10.244.1.19",
- "port": 4561,
- "txMessages": ["HandoverPreparation","HandoverCancel"],
- "rxMessages": []
- },
- {
- "name": "admin-02",
- "status": "pending",
- "ip": "10.244.3.16",
- "port": 4555,
- "txMessages": ["HandoverPreparation","HandoverCancel"],
- "rxMessages": []
- }
- ]
- },
+ [
+ {
+ "name": "admin",
+ "status": "unknown",
+ "version": "1.2.3",
+ "instances": [{
+ "name": "admin-01",
+ "status": "pending",
+ "ip": "10.244.1.19",
+ "port": 4561,
+ "txMessages": ["HandoverPreparation","HandoverCancel"],
+ "rxMessages": []
+ },
+ {
+ "name": "admin-02",
+ "status": "pending",
+ "ip": "10.244.3.16",
+ "port": 4555,
+ "txMessages": ["HandoverPreparation","HandoverCancel"],
+ "rxMessages": []
+ }
+ ]
+ },
+ {
+ "name": "xapp",
+ "status": "unknown",
+ "version": "1.2.3",
+ "instances": [{
+ "name": "xapp-01",
+ "status": "pending",
+ "ip": "192.168.2.1",
+ "port": 32300,
+ "txMessages": ["X2Setup","Reset"],
+ "rxMessages": ["HandoverPreparation","HandoverCancel"]
+ },
+ {
+ "name": "xapp-02",
+ "status": "pending",
+ "ip": "192.168.2.2",
+ "port": 32300,
+ "txMessages": ["X2Setup","Reset"],
+ "rxMessages": ["HandoverPreparation","HandoverCancel"]
+ }
+ ]
+ },
+ {
+ "name": "e2t",
+ "status": "unknown",
+ "version": "1.2.3",
+ "instances": [{
+ "name": "e2t-01",
+ "status": "pending",
+ "ip": "192.168.3.1",
+ "port": 32300,
+ "txMessages": [],
+ "rxMessages": ["HandoverPreparation","HandoverCancel","X2Setup","Reset"]
+ },
+ {
+ "name": "e2t-02",
+ "status": "pending",
+ "ip": "192.168.3.2",
+ "port": 32300,
+ "txMessages": [],
+ "rxMessages": ["HandoverPreparation","HandoverCancel","X2Setup","Reset"]
+ }
+ ]
+ }
+ ],
+ "subscriptions":
{
- "name": "xapp",
- "status": "unknown",
- "version": "1.2.3",
- "instances": [{
- "name": "xapp-01",
- "status": "pending",
- "ip": "192.168.2.1",
- "port": 32300,
- "txMessages": ["X2Setup","Reset"],
- "rxMessages": ["HandoverPreparation","HandoverCancel"]
- },
- {
- "name": "xapp-02",
- "status": "pending",
- "ip": "192.168.2.2",
- "port": 32300,
- "txMessages": ["X2Setup","Reset"],
- "rxMessages": ["HandoverPreparation","HandoverCancel"]
- }
- ]
- },
- {
- "name": "e2t",
- "status": "unknown",
- "version": "1.2.3",
- "instances": [{
- "name": "e2t-01",
- "status": "pending",
- "ip": "192.168.3.1",
- "port": 32300,
- "txMessages": [],
- "rxMessages": ["HandoverPreparation","HandoverCancel","X2Setup","Reset"]
- },
- {
- "name": "e2t-02",
- "status": "pending",
- "ip": "192.168.3.2",
- "port": 32300,
- "txMessages": [],
- "rxMessages": ["HandoverPreparation","HandoverCancel","X2Setup","Reset"]
- }
- ]
+ "id": "1ILBltYYzEGzWRrVPZKmuUmhwcc",
+ "version": 0,
+ "eventType": "all"
}
-
- ]
}
spec:
containers:
- name: xmgr
- image: cmaster:5000/xmgr:0.0.2
+ image: xmgr:mock
args:
- json-server
- -w
- --host=0.0.0.0
- /db/xapps.json
+ - --middlewares=middleware.js
ports:
- containerPort: 3000
volumeMounts: