FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:4-u18.04-nng as rtmgrbuild
# Install RMr shared library
-ARG RMRVERSION=3.6.0
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.13.1_amd64.deb/download.deb && dpkg -i rmr_1.13.1_amd64.deb && rm -rf rmr_1.13.1_amd64.deb
# Install RMr development header files
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.13.1_amd64.deb/download.deb && dpkg -i rmr-dev_1.13.1_amd64.deb && rm -rf rmr-dev_1.13.1_amd64.deb
ENV GOLANG_VERSION 1.12.1
RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \
COPY pkg pkg
COPY cmd cmd
COPY run_rtmgr.sh /run_rtmgr.sh
-RUN mkdir manifests
-COPY manifests/ /go/src/routing-manager/manifests
#RUN go mod download
#RUN /usr/local/go/bin/go mod tidy
ENV GOPATH /go
RUN go install ./cmd/rtmgr.go
# UT intermediate container
-#FROM rtmgrbuild as rtmgrut
-#RUN ldconfig
-#RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "/go/src/routing-manager/manifests/rtmgr/rtmgr-cfg.yaml" -cover -race
+FROM rtmgrbuild as rtmgrut
+RUN ldconfig
+RUN go test ./pkg/sbi ./pkg/rpe ./pkg/nbi ./pkg/sdl -f "./manifests/rtmgr/rtmgr-cfg.yaml" -cover -race
# Final, executable container
FROM ubuntu:16.04
-### v0.5.3
-* RMR updated to v3.6.0 with support for E2 Setup message types
-
-### v0.5.2
-* Switch to RMR Si95 mode
-
-### v0.5.1
-* Removal of go mangoes and using RMR nng
-
-### v0.4.16
-* getAllSubscriptions API (RM -> SM) during restart of routing manager handled
-
### v0.4.15
* Retained (E2M->E2T issue) - retrying when is_Ready flag in socket handle is false
return nil, nil, nil, nil, err
}
-
-
func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
for {
if <-triggerSBI {
}
}
-func sendRoutesToAll(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) {
-
- data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
- if err != nil || data == nil {
- xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
- return
- }
- sbiEngine.UpdateEndpoints(data)
- policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
- err = sbiEngine.DistributeAll(policies)
- if err != nil {
- xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
- return
- }
-}
-
-
func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
triggerSBI := make(chan bool)
}
}
- sendRoutesToAll(sbiEngine, sdlEngine, rpeEngine)
+ triggerSBI <- true
- rtmgr.Rtmgr_ready = true
time.Sleep(INTERVAL * time.Second)
xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
}
}
func main() {
-
nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr()
if err != nil {
xapp.Logger.Error(err.Error())
os.Exit(1)
}
-
SetupCloseHandler()
-
xapp.Logger.Info("Start " + SERVICENAME + " service")
rtmgr.Eps = make(rtmgr.Endpoints)
- rtmgr.Rtmgr_ready = false
var m sync.Mutex
-// RMR thread is starting port: 4560
- c := nbi.NewControl()
- go c.Run(sbiEngine, sdlEngine, rpeEngine, &m)
-
-// Waiting for RMR to be ready
- time.Sleep(time.Duration(2) * time.Second)
- for xapp.Rmr.IsReady() == false {
- time.Sleep(time.Duration(2) * time.Second)
- }
-
- dummy_whid := int(xapp.Rmr.Openwh("localhost:4560"))
- xapp.Logger.Info("created dummy Wormhole ID for routingmanager and dummy_whid :%d", dummy_whid)
+ c := sbi.NewControl()
+ go c.Run()
serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m)
os.Exit(0)
# By default this file is in the docker build directory,
# but the location can configured in the JJB template.
---
-tag: 0.5.3
+tag: 0.4.15
go 1.12.1
require (
- gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.4.4
+ gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.24
nanomsg.org/go/mangos/v2 v2.0.5
)
replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo => gerrit.o-ran-sc.org/r/ric-plt/sdlgo.git v0.2.0
-replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.4.4
+replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.0.24
replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.1
"maxSize": 2072
"numWorkers": 1
}
-subscription:
- host: "127.0.0.1:8089"
+++ /dev/null
-/*
-==================================================================================
- Copyright (c) 2019 AT&T Intellectual Property.
- Copyright (c) 2019 Nokia
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
- This source code is part of the near-RT RIC (RAN Intelligent Controller)
- platform project (RICP).
-
-==================================================================================
-*/
-package nbi
-
-import "C"
-
-import (
- "errors"
- "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
- "routing-manager/pkg/rpe"
- "routing-manager/pkg/rtmgr"
- "routing-manager/pkg/sbi"
- "routing-manager/pkg/sdl"
- "strconv"
- "sync"
-)
-
-func NewControl() Control {
-
- return Control{make(chan *xapp.RMRParams)}
-}
-
-type Control struct {
- rcChan chan *xapp.RMRParams
-}
-
-func (c *Control) Run(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
- go c.controlLoop(sbiEngine, sdlEngine, rpeEngine, m)
- xapp.Run(c)
-}
-
-func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
- c.rcChan <- rp
- return
-}
-
-func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
- for {
- msg := <-c.rcChan
- xapp_msg := sbi.RMRParams{msg}
- switch msg.Mtype {
- case xapp.RICMessageTypes["RMRRM_REQ_TABLE"]:
- if rtmgr.Rtmgr_ready == false {
- xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready")
- } else {
- xapp.Logger.Info("Update Route Table Request(RMR to RM)")
- go c.handleUpdateToRoutingManagerRequest(msg, sbiEngine, sdlEngine, rpeEngine, m)
- }
- case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]:
- xapp.Logger.Info("state of table to route mgr %s,payload %s", xapp_msg.String(), msg.Payload)
-
- default:
- err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
- xapp.Logger.Error("Unknown message type: %v", err)
- }
- }
-}
-
-func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
-
- msg := sbi.RMRParams{params}
-
- xapp.Logger.Info("Update Route Table Request, msg.String() : %s", msg.String())
- xapp.Logger.Info("Update Route Table Request, params.Payload : %s", string(params.Payload))
-
- m.Lock()
- data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
- m.Unlock()
- if err != nil || data == nil {
- xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
- return
- }
-
- ep := sbiEngine.CreateEndpoint(string(params.Payload))
- if ep == nil {
- xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
- return
- }
-
- policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
- err = sbiEngine.DistributeToEp(policies, ep)
- if err != nil {
- xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
- return
- }
-}
"errors"
"fmt"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
- xfmodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"github.com/go-openapi/loads"
"github.com/go-openapi/runtime/middleware"
"net"
func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile string, e2murl string, sdlEngine sdl.Engine) error {
xapp.Logger.Info("Invoked retrieveStartupData ")
- var readErr error
- var maxRetries = 10
+ var readErr error
+ var maxRetries = 10
var xappData *[]rtmgr.XApp
xappData = new([]rtmgr.XApp)
xapp.Logger.Info("Trying to fetch XApps data from XAPP manager")
- for i := 1; i <= maxRetries; i++ {
- time.Sleep(2 * time.Second)
+ for i := 1; i <= maxRetries; i++ {
+ time.Sleep(2 * time.Second)
- readErr = nil
- xappData, err := httpGetXApps(xmurl)
- if xappData != nil && err == nil {
+ readErr = nil
+ xappData, err := httpGetXApps(xmurl)
+ if xappData != nil && err == nil {
break
- } else if err == nil {
- readErr = errors.New("unexpected HTTP status code")
- } else {
- xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
- readErr = err
- }
- }
+ } else if err == nil {
+ readErr = errors.New("unexpected HTTP status code")
+ } else {
+ xapp.Logger.Warn("cannot get xapp data due to: " + err.Error())
+ readErr = err
+ }
+ }
if ( readErr != nil) {
return readErr
e2ts := make(map[string]rtmgr.E2TInstance)
xapp.Logger.Info("Trying to fetch E2T data from E2manager")
for i := 1; i <= maxRetries; i++ {
+ time.Sleep(2 * time.Second)
readErr = nil
e2tDataList, err := httpGetE2TList(e2murl)
xapp.Logger.Warn("cannot get E2T data from E2M due to: " + err.Error())
readErr = err
}
- time.Sleep(2 * time.Second)
}
if ( readErr != nil) {
}
pcData, confErr := rtmgr.GetPlatformComponents(configfile)
- if confErr != nil {
- xapp.Logger.Error(confErr.Error())
- return confErr
- }
- xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.")
- // Combine the xapps data and platform data before writing to the SDL
+ if confErr != nil {
+ xapp.Logger.Error(confErr.Error())
+ return confErr
+ }
+ xapp.Logger.Info("Recieved intial xapp data, E2T data and platform data, writing into SDL.")
+ // Combine the xapps data and platform data before writing to the SDL
ricData := &rtmgr.RicComponents{XApps: *xappData, Pcs: *pcData, E2Ts: e2ts, MeidMap: meids}
- writeErr := sdlEngine.WriteAll(fileName, ricData)
- if writeErr != nil {
- xapp.Logger.Error(writeErr.Error())
- }
-
- xapp.Logger.Info("Trying to fetch Subscriptions data from Subscription manager")
-/* for i := 1; i <= maxRetries; i++ {
- readErr = nil
- sub_list, err := xapp.Subscription.QuerySubscriptions()
-
- if sub_list != nil && err == nil {
- PopulateSubscription(sub_list)
- break
- } else {
- readErr = err
- xapp.Logger.Warn("cannot get xapp data due to: " + readErr.Error())
+ writeErr := sdlEngine.WriteAll(fileName, ricData)
+ if writeErr != nil {
+ xapp.Logger.Error(writeErr.Error())
+ }
+ // post subscription req to appmgr
+ readErr = PostSubReq(xmurl, nbiif)
+ if readErr == nil {
+ return nil
}
- time.Sleep(2 * time.Second)
- }
-
- if (readErr != nil) {
- return readErr
- }
-*/
- // post subscription req to appmgr
- readErr = PostSubReq(xmurl, nbiif)
- if readErr == nil {
- return nil
- }
-
return readErr
}
}
}
-
-func PopulateSubscription(sub_list xfmodel.SubscriptionList) {
- for _, sub_row := range sub_list {
- var subdata models.XappSubscriptionData
- id := int32(sub_row.SubscriptionID)
- subdata.SubscriptionID = &id
- for _, ep := range sub_row.Endpoint {
-
- stringSlice := strings.Split(ep, ":")
- subdata.Address = &stringSlice[0]
- intportval, _ := strconv.Atoi( stringSlice[1])
- value := uint16(intportval)
- subdata.Port = &value
- xapp.Logger.Debug("Adding Subscription List has Address :%v, port :%v, SubscriptionID :%v ", subdata.Address, subdata.Address, subdata.SubscriptionID)
- addSubscription(&rtmgr.Subs, &subdata)
- }
- }
-}
var E2TListResp = []byte(`[{"e2tAddress":"127.0.0.1:0","ranNames":["RanM0","RanN0"]},{"e2tAddress":"127.0.0.1:1","ranNames":["RanM1","RanN1"]},{"e2tAddress":"127.0.0.1:2","ranNames":["RanM2","RanN2"]},{"e2tAddress":"127.0.0.1:3","ranNames":["RanM3","RanN3"]}]`)
-var SubscriptionList = []byte(`[{"SubscriptionId":11,"Meid":"Test-Gnb","Endpoint":["127.0.0.1:4056"]}]`)
-
var InvalidSubResp = []byte(`{"Version":0, "EventType":all}`)
func TestValidateXappCallbackData_1(t *testing.T) {
return ts
}
-func createMockSubmgrWithData(url string, t []byte) *httptest.Server {
- l, err := net.Listen("tcp", url)
- if err != nil {
- fmt.Println("Failed to create listener: " + err.Error())
- }
- ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-
- if r.Method == "GET" && r.URL.String() == "//ric/v1/subscriptions" {
- w.Header().Add("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
- w.Write(t)
- }
-
- }))
- ts.Listener.Close()
- ts.Listener = l
- return ts
-}
-
func createMockPlatformComponents() {
var filename = "config.json"
file, _ := json.MarshalIndent(stub.ValidPlatformComponents, "", "")
ts1.Start()
defer ts1.Close()
- ts2 := createMockSubmgrWithData("127.0.0.1:8089", SubscriptionList)
- ts2.Start()
- defer ts2.Close()
-
sdlEngine, _ := sdl.GetSdl("file")
var httpRestful, _ = GetNbi("httpRESTful")
createMockPlatformComponents()
}
rawrt = append(rawrt, key+"newrt|end\n")
count := 0
-
+/* meidrt := key +"meid_map|start\n"
+ for e2tkey, value := range rcs.E2Ts {
+ xapp.Logger.Debug("rmr.E2T Key: %v", e2tkey)
+ xapp.Logger.Debug("rmr.E2T Value: %v", value)
+ xapp.Logger.Debug("rmr.E2T RAN List: %v", rcs.E2Ts[e2tkey].Ranlist)
+ if ( len(rcs.E2Ts[e2tkey].Ranlist) != 0 ) {
+ ranList := strings.Join(rcs.E2Ts[e2tkey].Ranlist, " ")
+ meidrt += key + "mme_ar|" + e2tkey + "|" + ranList + "\n"
+ count++
+ } else {
+ xapp.Logger.Debug("rmr.E2T Empty RAN LIST for FQDN: %v", e2tkey)
+ }
+ }
+ meidrt += key+"meid_map|end|" + strconv.Itoa(count) +"\n" */
meidrt := key +"meid_map|start\n"
for _, value := range rcs.MeidMap {
meidrt += key + value + "\n"
if len(e2TermEp) > 0 {
r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermEp, routeTable, -1, "")
r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermEp, routeTable, -1, "")
- r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermEp, routeTable, -1, "")
- r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermEp, routeTable, -1, "")
}
}
Socket interface{}
IsReady bool
Keepalive bool
- Whid int
}
type RouteTableEntry struct {
SubscriptionID uint16
FqdnList []FqDn
}
-
-var (
- Rtmgr_ready bool
-)
--- /dev/null
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ platform project (RICP).
+
+==================================================================================
+*/
+package sbi
+
+import "C"
+
+import (
+ "errors"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "strconv"
+)
+
+
+func NewControl() Control {
+
+ return Control{make(chan *xapp.RMRParams)}
+}
+
+
+type Control struct {
+ rcChan chan *xapp.RMRParams
+}
+
+
+func (c *Control) Run() {
+ go c.controlLoop()
+ xapp.Run(c)
+}
+
+func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
+ c.rcChan <- rp
+ return
+}
+
+func (c *Control) controlLoop() {
+ for {
+ msg := <-c.rcChan
+ switch msg.Mtype {
+ case xapp.RICMessageTypes["RIC_SUB_REQ"]:
+ xapp.Logger.Info("Message handling when RMR instance queries for Routes")
+ default:
+ err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
+ xapp.Logger.Error("Unknown message type: %v", err)
+ }
+ }
+}
+
import "C"
import (
- "bytes"
- "crypto/md5"
"errors"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"nanomsg.org/go/mangos/v2"
"routing-manager/pkg/rtmgr"
"strconv"
"time"
- "fmt"
)
type NngPush struct {
rcChan chan *xapp.RMRParams
}
-type RMRParams struct {
- *xapp.RMRParams
-}
-
-
-func (params *RMRParams) String() string {
- var b bytes.Buffer
- sum := md5.Sum(params.Payload)
- fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
- return b.String()
-}
-
func NewNngPush() *NngPush {
instance := new(NngPush)
instance.NewSocket = createNewPushSocket
}
func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
-
+ var err error
+ var socket NngSocket
xapp.Logger.Debug("Invoked sbi.AddEndpoint")
xapp.Logger.Debug("args: %v", *ep)
- endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
- ep.Whid = int(xapp.Rmr.Openwh(endpoint))
- if ep.Whid < 0 {
- return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
- }else {
- xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
+ socket, err = c.NewSocket()
+ if err != nil {
+ return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
+ }
+ ep.Socket = socket
+ err = c.dial(ep)
+ if err != nil {
+ return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
}
-
return nil
}
func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
xapp.Logger.Debug("args: %v", *ep)
-
- xapp.Rmr.Closewh(ep.Whid)
+ if err := ep.Socket.(NngSocket).Close(); err != nil {
+ return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
+ }
return nil
}
func (c *NngPush) DistributeAll(policies *[]string) error {
xapp.Logger.Debug("Invoked: sbi.DistributeAll")
xapp.Logger.Debug("args: %v", *policies)
-
for _, ep := range rtmgr.Eps {
- go c.send(ep, policies)
+ i := 1
+ for i < 5 {
+ if ep.IsReady {
+ go c.send(ep, policies)
+ break
+ } else {
+ xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
+ time.Sleep(10 * time.Millisecond)
+ i++
+ }
+ }
}
-
return nil
}
func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
-
for _, pe := range *policies {
- params := &RMRParams{&xapp.RMRParams{}}
- params.Mtype = 20
- params.PayloadLen = len([]byte(pe))
- params.Payload =[]byte(pe)
- params.Mbuf = nil
- params.Whid = ep.Whid
- time.Sleep(1 * time.Millisecond)
- xapp.Rmr.SendMsg(params.RMRParams)
+ if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
+ xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
+ }
}
xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
}
-
-func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
- return c.createEndpoint(payload, c)
-}
-
-func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
- xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
- xapp.Logger.Debug("args: %v", *policies)
-
- go c.send(ep, policies)
-
- return nil
-}
-
}
}
}
-
-func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) {
- xapp.Logger.Debug("CreateEndPoint %v", payload)
- stringSlice := strings.Split(payload, " ")
- uuid := stringSlice[0]
- xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
-
-
- if _, ok := rtmgr.Eps[uuid]; ok {
- ep := rtmgr.Eps[uuid]
- return ep
- }
-
- return nil
-}
AddEndpoint(*rtmgr.Endpoint) error
DeleteEndpoint(*rtmgr.Endpoint) error
UpdateEndpoints(*rtmgr.RicComponents)
- CreateEndpoint(string) (*rtmgr.Endpoint)
- DistributeToEp(*[]string, *rtmgr.Endpoint) error
}
type NngSocket interface {