Making Route Distribution Synchronous 03/4103/2
authorwahidw <abdulwahid.w@nokia.com>
Mon, 15 Jun 2020 13:52:55 +0000 (13:52 +0000)
committerwahidw <abdulwahid.w@nokia.com>
Mon, 22 Jun 2020 10:26:12 +0000 (10:26 +0000)
Change-Id: Ibe7dc23933f446fc433e6e583d6044f3e5e93d88
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
18 files changed:
RELNOTES
cmd/rtmgr.go
container-tag.yaml
manifests/rtmgr/rtmgr-cfg.yaml
pkg/nbi/control.go
pkg/nbi/httpgetter.go
pkg/nbi/httprestful.go
pkg/nbi/httprestful_test.go
pkg/nbi/types.go
pkg/rpe/rmr_test.go
pkg/rpe/rpe.go
pkg/rtmgr/rtmgr.go
pkg/rtmgr/types.go
pkg/sbi/nngpush.go
pkg/sbi/nngpush_test.go
pkg/sbi/sbi.go
pkg/sbi/sbi_test.go
pkg/sbi/types.go

index f6465d3..d2a370b 100644 (file)
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.6.2
+* Distribution of routes is synchronous. 
+
 ### v0.6.1
 * Updating xapp_fwk to v0.4.15, that contains RIC_HEALTH_CHECK message types
 
index 63b67b7..af0aba1 100644 (file)
@@ -39,109 +39,15 @@ import (
        "os"
        "os/signal"
        "routing-manager/pkg/nbi"
-       "routing-manager/pkg/rpe"
+       //"routing-manager/pkg/rpe"
        "routing-manager/pkg/rtmgr"
-       "routing-manager/pkg/sbi"
-       "routing-manager/pkg/sdl"
+       //"routing-manager/pkg/sbi"
+       //"routing-manager/pkg/sdl"
        "syscall"
        "time"
-       "sync"
 )
 
 const SERVICENAME = "rtmgr"
-const INTERVAL time.Duration = 60
-
-func initRtmgr() (nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, err error) {
-       if nbiEngine, err = nbi.GetNbi(xapp.Config.GetString("nbi")); err == nil && nbiEngine != nil {
-               if sbiEngine, err = sbi.GetSbi(xapp.Config.GetString("sbi")); err == nil && sbiEngine != nil {
-                       if sdlEngine, err = sdl.GetSdl(xapp.Config.GetString("sdl")); err == nil && sdlEngine != nil {
-                               if rpeEngine, err = rpe.GetRpe(xapp.Config.GetString("rpe")); err == nil && rpeEngine != nil {
-                                       return nbiEngine, sbiEngine, sdlEngine, rpeEngine, nil
-                               }
-                       }
-               }
-       }
-       return nil, nil, nil, nil, err
-}
-
-
-
-func serveSBI(triggerSBI <-chan bool, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
-       for {
-               if <-triggerSBI {
-                       m.Lock()
-                       data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
-                       m.Unlock()
-                       if err != nil || data == nil {
-                               xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
-                               continue
-                       }
-                       sbiEngine.UpdateEndpoints(data)
-                       policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
-                       err = sbiEngine.DistributeAll(policies)
-                       if err != nil {
-                               xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
-                       }
-               }
-       }
-}
-
-func sendRoutesToAll(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine) {
-
-       data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
-       if err != nil || data == nil {
-               xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
-               return
-       }
-       sbiEngine.UpdateEndpoints(data)
-       policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
-       err = sbiEngine.DistributeAll(policies)
-       if err != nil {
-               xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
-               return
-       }
-}
-
-
-func serve(nbiEngine nbi.Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
-
-       triggerSBI := make(chan bool)
-
-       nbiErr := nbiEngine.Initialize(xapp.Config.GetString("xmurl"), xapp.Config.GetString("nbiurl"), xapp.Config.GetString("rtfile"), xapp.Config.GetString("cfgfile"), xapp.Config.GetString("e2murl"), 
-               sdlEngine, rpeEngine, triggerSBI, m)
-       if nbiErr != nil {
-               xapp.Logger.Error("Failed to initialize nbi due to: " + nbiErr.Error())
-               return
-       }
-
-       err := sbiEngine.Initialize(xapp.Config.GetString("sbiurl"))
-       if err != nil {
-               xapp.Logger.Info("Failed to open push socket due to: " + err.Error())
-               return
-       }
-       defer nbiEngine.Terminate()
-       defer sbiEngine.Terminate()
-
-       // This SBI Go routine is trtiggered by periodic main loop and when data is recieved on REST interface.
-       go serveSBI(triggerSBI, sbiEngine, sdlEngine, rpeEngine, m)
-
-       for {
-               if xapp.Config.GetString("nbi") == "httpGetter" {
-                       data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXApps(xapp.Config.GetString("xmurl"))
-                       if err != nil {
-                               xapp.Logger.Error("Cannot fetch xapp data due to: " + err.Error())
-                       } else if data != nil {
-                               sdlEngine.WriteXApps(xapp.Config.GetString("rtfile"), data)
-                       }
-               }
-
-               sendRoutesToAll(sbiEngine, sdlEngine, rpeEngine)
-
-               rtmgr.Rtmgr_ready = true
-               time.Sleep(INTERVAL * time.Second)
-               xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
-       }
-}
 
 func SetupCloseHandler() {
        c := make(chan os.Signal, 2)
@@ -155,23 +61,16 @@ func SetupCloseHandler() {
 
 func main() {
 
-       nbiEngine, sbiEngine, sdlEngine, rpeEngine, err := initRtmgr()
-       if err != nil {
-               xapp.Logger.Error(err.Error())
-               os.Exit(1)
-       }
-
        SetupCloseHandler()
 
        xapp.Logger.Info("Start " + SERVICENAME + " service")
        rtmgr.Eps = make(rtmgr.Endpoints)
+       rtmgr.Mtype = make(rtmgr.MessageTypeList)
        rtmgr.Rtmgr_ready = false
 
-       var m sync.Mutex
-
 // RMR thread is starting port: 4560
        c := nbi.NewControl()
-       go c.Run(sbiEngine, sdlEngine, rpeEngine, &m)
+       go c.Run()
 
 // Waiting for RMR to be ready
        time.Sleep(time.Duration(2) * time.Second)
@@ -182,6 +81,6 @@ func main() {
        dummy_whid := int(xapp.Rmr.Openwh("localhost:4560"))
        xapp.Logger.Info("created dummy Wormhole ID for routingmanager and dummy_whid :%d", dummy_whid)
 
-       serve(nbiEngine, sbiEngine, sdlEngine, rpeEngine, &m)
+       nbi.Serve()
        os.Exit(0)
 }
index 944d34a..bff5250 100644 (file)
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.6.1
+tag: 0.6.2
index 55aab19..471dca3 100644 (file)
@@ -64,6 +64,25 @@ data:
       "protPort": "tcp:4560"
       "maxSize": 2072
       "numWorkers": 1
+      "RTFILE":
+         "/db/rt.json"
+      "NBIURL":
+              "http://localhost:3800"
+   
     }
 subscription:
  host: "127.0.0.1:8089"
+
+ #xmurl: "http://service-ricplt-appmgr-http:8080/ric/v1/xapps"
+ #e2murl: "http://service-ricplt-e2mgr-http:3800/v1/e2t/list"
+ #rtfile: "/db/rt.json"
+ #CFGFILE: "/cfg/rtmgr-config.yaml"
+ #rpe: "rmrpush"
+ #s#bi: "rmrpush"
+ #s#biurl: "0.0.0.0"
+ #nbi: "httpRESTful"
+ #nbiurl: "http://service-ricplt-rtmgr-http:3800"
+ ##sdl: "file"
+ #local: 
+ #host: ":8080"
+
index ccf4e57..5483aba 100644 (file)
@@ -33,10 +33,20 @@ import (
        "routing-manager/pkg/sdl"
        "strconv"
        "sync"
+       "time"
+       "os"
 )
 
-func NewControl() Control {
+var m sync.Mutex
+
+var nbiEngine Engine
+var sbiEngine sbi.Engine
+var sdlEngine sdl.Engine
+var rpeEngine rpe.Engine
+
+const INTERVAL time.Duration = 60
 
+func NewControl() Control {
        return Control{make(chan *xapp.RMRParams)}
 }
 
@@ -44,8 +54,15 @@ type Control struct {
        rcChan chan *xapp.RMRParams
 }
 
-func (c *Control) Run(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
-       go c.controlLoop(sbiEngine, sdlEngine, rpeEngine, m)
+
+func (c *Control) Run() {
+       var err error
+       go c.controlLoop()
+       nbiEngine, sbiEngine, sdlEngine, rpeEngine, err = initRtmgr()
+       if err != nil {
+                xapp.Logger.Error(err.Error())
+                os.Exit(1)
+        }
        xapp.Run(c)
 }
 
@@ -54,7 +71,20 @@ func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
        return
 }
 
-func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
+func initRtmgr() (nbiEngine Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, err error) {
+        if nbiEngine, err = GetNbi(xapp.Config.GetString("nbi")); err == nil && nbiEngine != nil {
+                if sbiEngine, err = sbi.GetSbi(xapp.Config.GetString("sbi")); err == nil && sbiEngine != nil {
+                        if sdlEngine, err = sdl.GetSdl(xapp.Config.GetString("sdl")); err == nil && sdlEngine != nil {
+                                if rpeEngine, err = rpe.GetRpe(xapp.Config.GetString("rpe")); err == nil && rpeEngine != nil {
+                                        return nbiEngine, sbiEngine, sdlEngine, rpeEngine, nil
+                                }
+                        }
+                }
+        }
+        return nil, nil, nil, nil, err
+}
+
+func (c *Control) controlLoop() {
        for {
                msg := <-c.rcChan
                xapp_msg := sbi.RMRParams{msg}
@@ -64,7 +94,7 @@ func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEng
                                xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready")
                        } else {
                                xapp.Logger.Info("Update Route Table Request(RMR to RM)")
-                               go c.handleUpdateToRoutingManagerRequest(msg, sbiEngine, sdlEngine, rpeEngine, m)
+                               go c.handleUpdateToRoutingManagerRequest(msg)
                        }
                case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]:
                        xapp.Logger.Info("state of table to route mgr %s,payload %s", xapp_msg.String(), msg.Payload)
@@ -77,7 +107,7 @@ func (c *Control) controlLoop(sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEng
        }
 }
 
-func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) {
+func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams) {
 
        msg := sbi.RMRParams{params}
 
@@ -105,3 +135,46 @@ func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams, sb
                return
        }
 }
+
+func sendRoutesToAll() (err error) {
+
+        m.Lock()
+        data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
+        m.Unlock()
+        if err != nil || data == nil {
+                return errors.New("Cannot get data from sdl interface due to: " + err.Error())
+        }
+        sbiEngine.UpdateEndpoints(data)
+        policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
+        err = sbiEngine.DistributeAll(policies)
+        if err != nil {
+                return errors.New("Routing table cannot be published due to: " + err.Error())
+        }
+
+       return nil
+}
+
+func Serve() {
+
+        nbiErr := nbiEngine.Initialize(xapp.Config.GetString("xmurl"), xapp.Config.GetString("nbiurl"), xapp.Config.GetString("rtfile"), xapp.Config.GetString("cfgfile"), xapp.Config.GetString("e2murl"), sdlEngine, rpeEngine, &m)
+        if nbiErr != nil {
+                xapp.Logger.Error("Failed to initialize nbi due to: " + nbiErr.Error())
+                return
+        }
+
+        err := sbiEngine.Initialize(xapp.Config.GetString("sbiurl"))
+        if err != nil {
+                xapp.Logger.Info("Failed to open push socket due to: " + err.Error())
+                return
+        }
+        defer nbiEngine.Terminate()
+        defer sbiEngine.Terminate()
+
+        for {
+                sendRoutesToAll()
+
+                rtmgr.Rtmgr_ready = true
+                time.Sleep(INTERVAL * time.Second)
+                xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
+        }
+}
index fca8367..7de6b96 100644 (file)
@@ -78,7 +78,7 @@ func fetchAllXApps(xmurl string) (*[]rtmgr.XApp, error) {
 }
 
 func (g *HttpGetter) Initialize(xmurl string, nbiif string, fileName string, configfile string, e2murl string,
-       sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
+       sdlEngine sdl.Engine, rpeEngine rpe.Engine,  m *sync.Mutex) error {
        return nil
 }
 
index 98209e7..f49f3ae 100644 (file)
@@ -56,39 +56,20 @@ import (
        "time"
 )
 
-//var myClient = &http.Client{Timeout: 1 * time.Second}
-
 type HttpRestful struct {
        Engine
        LaunchRest                   LaunchRestHandler
-       RecvXappCallbackData         RecvXappCallbackDataHandler
-       RecvNewE2Tdata               RecvNewE2TdataHandler
-       ProvideXappHandleHandlerImpl ProvideXappHandleHandlerImpl
        RetrieveStartupData          RetrieveStartupDataHandler
 }
 
 func NewHttpRestful() *HttpRestful {
        instance := new(HttpRestful)
        instance.LaunchRest = launchRest
-       instance.RecvXappCallbackData = recvXappCallbackData
-       instance.RecvNewE2Tdata = recvNewE2Tdata
-       instance.ProvideXappHandleHandlerImpl = provideXappHandleHandlerImpl
        instance.RetrieveStartupData = retrieveStartupData
        return instance
 }
 
-// ToDo: Use Range over channel. Read and return only the latest one.
-func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr.XApp, error) {
-       var xappData *models.XappCallbackData
-       // Drain the channel as we are only looking for the latest value until
-       // xapp manager sends all xapp data with every request.
-       length := len(dataChannel)
-       //xapp.Logger.Info(length)
-       for i := 0; i <= length; i++ {
-               xapp.Logger.Info("data received")
-               // If no data received from the REST, it blocks.
-               xappData = <-dataChannel
-       }
+func recvXappCallbackData(xappData *models.XappCallbackData) (*[]rtmgr.XApp, error) {
        if nil != xappData {
                var xapps []rtmgr.XApp
                err := json.Unmarshal([]byte(xappData.XApps), &xapps)
@@ -101,13 +82,10 @@ func recvXappCallbackData(dataChannel <-chan *models.XappCallbackData) (*[]rtmgr
        return nil, nil
 }
 
-func recvNewE2Tdata(dataChannel <-chan *models.E2tData) (*rtmgr.E2TInstance, string, error) {
-       var e2tData *models.E2tData
+func recvNewE2Tdata(e2tData *models.E2tData) (*rtmgr.E2TInstance, string, error) {
        var str string
        xapp.Logger.Info("data received")
 
-       e2tData = <-dataChannel
-
        if nil != e2tData {
 
                e2tinst := rtmgr.E2TInstance{
@@ -146,7 +124,7 @@ func validateXappCallbackData(callbackData *models.XappCallbackData) error {
        return nil
 }
 
-func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *models.XappCallbackData) error {
+func provideXappHandleHandlerImpl(data *models.XappCallbackData) error {
        if data != nil {
                xapp.Logger.Debug("Received callback data")
        }
@@ -155,7 +133,20 @@ func provideXappHandleHandlerImpl(datach chan<- *models.XappCallbackData, data *
                xapp.Logger.Warn("XApp callback data validation failed: " + err.Error())
                return err
        } else {
-               datach <- data
+               appdata, err := recvXappCallbackData(data)
+                if err != nil {
+                       xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
+                } else if appdata != nil {
+                        xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
+                        alldata, err1 := httpGetXApps(xapp.Config.GetString("xmurl"))
+                        if alldata != nil && err1 == nil {
+                               m.Lock()
+                                sdlEngine.WriteXApps(xapp.Config.GetString("rtfile"), alldata)
+                                m.Unlock()
+                               return sendRoutesToAll()
+                        }
+                }
+
                return nil
        }
 }
@@ -223,18 +214,17 @@ func checkValidaE2TAddress(e2taddress string) bool {
 
 }
 
-func provideXappSubscriptionHandleImpl(subchan chan<- *models.XappSubscriptionData,
-       data *models.XappSubscriptionData) error {
+func provideXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error {
        xapp.Logger.Debug("Invoked provideXappSubscriptionHandleImpl")
        err := validateXappSubscriptionData(data)
        if err != nil {
                xapp.Logger.Error(err.Error())
                return err
        }
-       subchan <- data
-       //var val = string(*data.Address + ":" + strconv.Itoa(int(*data.Port)))
+        xapp.Logger.Debug("received XApp subscription data")
+        addSubscription(&rtmgr.Subs, data)
        xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
-       return nil
+       return sendRoutesToAll()
 }
 
 func subscriptionExists(data *models.XappSubscriptionData) bool {
@@ -249,8 +239,7 @@ func subscriptionExists(data *models.XappSubscriptionData) bool {
        return present
 }
 
-func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscriptionData,
-       data *models.XappSubscriptionData) error {
+func deleteXappSubscriptionHandleImpl(data *models.XappSubscriptionData) error {
        xapp.Logger.Debug("Invoked deleteXappSubscriptionHandleImpl")
        err := validateXappSubscriptionData(data)
        if err != nil {
@@ -264,11 +253,13 @@ func deleteXappSubscriptionHandleImpl(subdelchan chan<- *models.XappSubscription
                return err
        }
 
-       subdelchan <- data
-       return nil
+        xapp.Logger.Debug("received XApp subscription delete data")
+        delSubscription(&rtmgr.Subs, data)
+       return sendRoutesToAll()
+
 }
 
-func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data *models.XappList, subid uint16) error {
+func updateXappSubscriptionHandleImpl(data *models.XappList, subid uint16) error {
        xapp.Logger.Debug("Invoked updateXappSubscriptionHandleImpl")
 
        var fqdnlist []rtmgr.FqDn
@@ -289,20 +280,26 @@ func updateXappSubscriptionHandleImpl(subupdatechan chan<- *rtmgr.XappList, data
                        return err
                }
        }
-       subupdatechan <- &xapplist
-       return nil
+        xapp.Logger.Debug("received XApp subscription Merge data")
+        updateSubscription(&xapplist)
+       return sendRoutesToAll()
 }
 
-func createNewE2tHandleHandlerImpl(e2taddchan chan<- *models.E2tData,
-       data *models.E2tData) error {
+func createNewE2tHandleHandlerImpl(data *models.E2tData) error {
        xapp.Logger.Debug("Invoked createNewE2tHandleHandlerImpl")
        err := validateE2tData(data)
        if err != nil {
                xapp.Logger.Error(err.Error())
                return err
        }
-       e2taddchan <- data
-       return nil
+       //e2taddchan <- data
+       e2data, meiddata, _ := recvNewE2Tdata(data)
+        xapp.Logger.Debug("received create New E2T data")
+        m.Lock()
+        sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata)
+        m.Unlock()
+        return sendRoutesToAll()
+
 }
 
 func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
@@ -323,32 +320,37 @@ func validateE2TAddressRANListData(assRanE2tData models.RanE2tMap) error {
        return nil
 }
 
-func associateRanToE2THandlerImpl(assranchan chan<- models.RanE2tMap,
-       data models.RanE2tMap) error {
+func associateRanToE2THandlerImpl(data models.RanE2tMap) error {
        xapp.Logger.Debug("Invoked associateRanToE2THandlerImpl")
        err := validateE2TAddressRANListData(data)
        if err != nil {
                xapp.Logger.Warn(" Association of RAN to E2T Instance data validation failed: " + err.Error())
                return err
        }
-       assranchan <- data
-       return nil
+        xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
+        m.Lock()
+        sdlEngine.WriteAssRANToE2TInstance(xapp.Config.GetString("rtfile"), data)
+        m.Unlock()
+       return sendRoutesToAll()
+
 }
 
-func disassociateRanToE2THandlerImpl(disassranchan chan<- models.RanE2tMap,
-       data models.RanE2tMap) error {
+func disassociateRanToE2THandlerImpl(data models.RanE2tMap) error {
        xapp.Logger.Debug("Invoked disassociateRanToE2THandlerImpl")
        err := validateE2TAddressRANListData(data)
        if err != nil {
                xapp.Logger.Warn(" Disassociation of RAN List from E2T Instance data validation failed: " + err.Error())
                return err
        }
-       disassranchan <- data
-       return nil
+        xapp.Logger.Debug("received disassociate RANs from E2T instance")
+        m.Lock()
+        sdlEngine.WriteDisAssRANFromE2TInstance(xapp.Config.GetString("rtfile"), data)
+        m.Unlock()
+       return sendRoutesToAll()
+
 }
 
-func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
-       data *models.E2tDeleteData) error {
+func deleteE2tHandleHandlerImpl(data *models.E2tDeleteData) error {
        xapp.Logger.Debug("Invoked deleteE2tHandleHandlerImpl")
 
        err := validateDeleteE2tData(data)
@@ -356,9 +358,11 @@ func deleteE2tHandleHandlerImpl(e2tdelchan chan<- *models.E2tDeleteData,
                xapp.Logger.Error(err.Error())
                return err
        }
+        m.Lock()
+        sdlEngine.WriteDeleteE2TInstance(xapp.Config.GetString("rtfile"), data)
+        m.Unlock()
+       return sendRoutesToAll()
 
-       e2tdelchan <- data
-       return nil
 }
 
 func dumpDebugData() (models.Debuginfo, error) {
@@ -378,8 +382,7 @@ func dumpDebugData() (models.Debuginfo, error) {
        return response, err
 }
 
-func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan chan<- *models.XappSubscriptionData, subupdatechan chan<- *rtmgr.XappList,
-       subdelchan chan<- *models.XappSubscriptionData, e2taddchan chan<- *models.E2tData, assranchan chan<- models.RanE2tMap, disassranchan chan<- models.RanE2tMap, e2tdelchan chan<- *models.E2tDeleteData) {
+func launchRest(nbiif *string){
        swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
        if err != nil {
                //log.Fatalln(err)
@@ -405,7 +408,7 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c
        api.HandleProvideXappHandleHandler = handle.ProvideXappHandleHandlerFunc(
                func(params handle.ProvideXappHandleParams) middleware.Responder {
                        xapp.Logger.Info("Data received on Http interface")
-                       err := provideXappHandleHandlerImpl(datach, params.XappCallbackData)
+                       err := provideXappHandleHandlerImpl(params.XappCallbackData)
                        if err != nil {
                                xapp.Logger.Error("Invalid XApp callback data: " + err.Error())
                                return handle.NewProvideXappHandleBadRequest()
@@ -415,77 +418,67 @@ func launchRest(nbiif *string, datach chan<- *models.XappCallbackData, subchan c
                })
        api.HandleProvideXappSubscriptionHandleHandler = handle.ProvideXappSubscriptionHandleHandlerFunc(
                func(params handle.ProvideXappSubscriptionHandleParams) middleware.Responder {
-                       err := provideXappSubscriptionHandleImpl(subchan, params.XappSubscriptionData)
+                       err := provideXappSubscriptionHandleImpl(params.XappSubscriptionData)
                        if err != nil {
                                return handle.NewProvideXappSubscriptionHandleBadRequest()
                        } else {
-                               //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
-                               time.Sleep(1 * time.Second)
                                return handle.NewGetHandlesOK()
                        }
                })
        api.HandleDeleteXappSubscriptionHandleHandler = handle.DeleteXappSubscriptionHandleHandlerFunc(
                func(params handle.DeleteXappSubscriptionHandleParams) middleware.Responder {
-                       err := deleteXappSubscriptionHandleImpl(subdelchan, params.XappSubscriptionData)
+                       err := deleteXappSubscriptionHandleImpl(params.XappSubscriptionData)
                        if err != nil {
                                return handle.NewDeleteXappSubscriptionHandleNoContent()
                        } else {
-                               //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
-                               time.Sleep(1 * time.Second)
                                return handle.NewGetHandlesOK()
                        }
                })
        api.HandleUpdateXappSubscriptionHandleHandler = handle.UpdateXappSubscriptionHandleHandlerFunc(
                func(params handle.UpdateXappSubscriptionHandleParams) middleware.Responder {
-                       err := updateXappSubscriptionHandleImpl(subupdatechan, &params.XappList, params.SubscriptionID)
+                       err := updateXappSubscriptionHandleImpl(&params.XappList, params.SubscriptionID)
                        if err != nil {
                                return handle.NewUpdateXappSubscriptionHandleBadRequest()
                        } else {
-                               //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
-                               time.Sleep(1 * time.Second)
                                return handle.NewUpdateXappSubscriptionHandleCreated()
                        }
                })
        api.HandleCreateNewE2tHandleHandler = handle.CreateNewE2tHandleHandlerFunc(
                func(params handle.CreateNewE2tHandleParams) middleware.Responder {
-                       err := createNewE2tHandleHandlerImpl(e2taddchan, params.E2tData)
+                       err := createNewE2tHandleHandlerImpl(params.E2tData)
                        if err != nil {
                                return handle.NewCreateNewE2tHandleBadRequest()
                        } else {
-                               time.Sleep(1 * time.Second)
                                return handle.NewCreateNewE2tHandleCreated()
                        }
                })
 
        api.HandleAssociateRanToE2tHandleHandler = handle.AssociateRanToE2tHandleHandlerFunc(
                func(params handle.AssociateRanToE2tHandleParams) middleware.Responder {
-                       err := associateRanToE2THandlerImpl(assranchan, params.RanE2tList)
+                       err := associateRanToE2THandlerImpl(params.RanE2tList)
                        if err != nil {
                                return handle.NewAssociateRanToE2tHandleBadRequest()
                        } else {
-                               time.Sleep(1 * time.Second)
                                return handle.NewAssociateRanToE2tHandleCreated()
                        }
                })
 
        api.HandleDissociateRanHandler = handle.DissociateRanHandlerFunc(
                func(params handle.DissociateRanParams) middleware.Responder {
-                       err := disassociateRanToE2THandlerImpl(disassranchan, params.DissociateList)
+                       err := disassociateRanToE2THandlerImpl(params.DissociateList)
                        if err != nil {
                                return handle.NewDissociateRanBadRequest()
                        } else {
-                               time.Sleep(1 * time.Second)
                                return handle.NewDissociateRanCreated()
                        }
                })
 
        api.HandleDeleteE2tHandleHandler = handle.DeleteE2tHandleHandlerFunc(
                func(params handle.DeleteE2tHandleParams) middleware.Responder {
-                       err := deleteE2tHandleHandlerImpl(e2tdelchan, params.E2tData)
+                       err := deleteE2tHandleHandlerImpl(params.E2tData)
                        if err != nil {
                                return handle.NewDeleteE2tHandleBadRequest()
                        } else {
-                               time.Sleep(1 * time.Second)
                                return handle.NewDeleteE2tHandleCreated()
                        }
                })
@@ -670,120 +663,14 @@ func retrieveStartupData(xmurl string, nbiif string, fileName string, configfile
 }
 
 func (r *HttpRestful) Initialize(xmurl string, nbiif string, fileName string, configfile string, e2murl string,
-       sdlEngine sdl.Engine, rpeEngine rpe.Engine, triggerSBI chan<- bool, m *sync.Mutex) error {
+       sdlEngine sdl.Engine, rpeEngine rpe.Engine, m *sync.Mutex) error {
        err := r.RetrieveStartupData(xmurl, nbiif, fileName, configfile, e2murl, sdlEngine)
        if err != nil {
                xapp.Logger.Error("Exiting as nbi failed to get the initial startup data from the xapp manager: " + err.Error())
                return err
        }
-
-       datach := make(chan *models.XappCallbackData, 10)
-       subschan := make(chan *models.XappSubscriptionData, 10)
-       subdelchan := make(chan *models.XappSubscriptionData, 10)
-       subupdatechan := make(chan *rtmgr.XappList, 10)
-       e2taddchan := make(chan *models.E2tData, 10)
-       associateranchan := make(chan models.RanE2tMap, 10)
-       disassociateranchan := make(chan models.RanE2tMap, 10)
-       e2tdelchan := make(chan *models.E2tDeleteData, 10)
-       xapp.Logger.Info("Launching Rest Http service")
-       go func() {
-               r.LaunchRest(&nbiif, datach, subschan, subupdatechan, subdelchan, e2taddchan, associateranchan, disassociateranchan, e2tdelchan)
-       }()
-
-       go func() {
-               for {
-                       data, err := r.RecvXappCallbackData(datach)
-                       if err != nil {
-                               xapp.Logger.Error("cannot get data from rest api dute to: " + err.Error())
-                       } else if data != nil {
-                               xapp.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
-                               alldata, err1 := httpGetXApps(xmurl)
-                               if alldata != nil && err1 == nil {
-                                       m.Lock()
-                                       sdlEngine.WriteXApps(fileName, alldata)
-                                       m.Unlock()
-                                       triggerSBI <- true
-                               }
-                       }
-               }
-       }()
-
-       go func() {
-               for {
-                       data := <-subschan
-                       xapp.Logger.Debug("received XApp subscription data")
-                       addSubscription(&rtmgr.Subs, data)
-                       triggerSBI <- true
-               }
-       }()
-
-       go func() {
-               for {
-                       data := <-subdelchan
-                       xapp.Logger.Debug("received XApp subscription delete data")
-                       delSubscription(&rtmgr.Subs, data)
-                       triggerSBI <- true
-               }
-       }()
-
-       go func() {
-               for {
-                       data := <-subupdatechan
-                       xapp.Logger.Debug("received XApp subscription Merge data")
-                       updateSubscription(data)
-                       triggerSBI <- true
-               }
-       }()
-
-       go func() {
-               for {
-
-                       data, meiddata, _ := r.RecvNewE2Tdata(e2taddchan)
-                       if data != nil {
-                               xapp.Logger.Debug("received create New E2T data")
-                               m.Lock()
-                               sdlEngine.WriteNewE2TInstance(fileName, data, meiddata)
-                               m.Unlock()
-                               triggerSBI <- true
-                       }
-               }
-       }()
-
-       go func() {
-               for {
-                       data := <-associateranchan
-                       xapp.Logger.Debug("received associate RAN list to E2T instance mapping from E2 Manager")
-                       m.Lock()
-                       sdlEngine.WriteAssRANToE2TInstance(fileName, data)
-                       m.Unlock()
-                       triggerSBI <- true
-               }
-       }()
-
        go func() {
-               for {
-
-                       data := <-disassociateranchan
-                       xapp.Logger.Debug("received disassociate RANs from E2T instance")
-                       m.Lock()
-                       sdlEngine.WriteDisAssRANFromE2TInstance(fileName, data)
-                       m.Unlock()
-                       triggerSBI <- true
-               }
-       }()
-
-       go func() {
-               for {
-
-                       data := <-e2tdelchan
-                       xapp.Logger.Debug("received Delete E2T data")
-                       if data != nil {
-                               m.Lock()
-                               sdlEngine.WriteDeleteE2TInstance(fileName, data)
-                               m.Unlock()
-                               triggerSBI <- true
-                       }
-               }
+               r.LaunchRest(&nbiif)
        }()
 
        return nil
index 5a9ae8c..bebc4f7 100644 (file)
@@ -43,7 +43,6 @@ import (
        "routing-manager/pkg/sdl"
        "routing-manager/pkg/stub"
        "testing"
-       "time"
        "sync"
        "github.com/go-openapi/swag"
 )
@@ -113,7 +112,6 @@ func TestValidateXappSubscriptionsData(t *testing.T) {
                Port:           &p,
                SubscriptionID: swag.Int32(123456)}
        err = validateXappSubscriptionData(&data1)
-       t.Log(err)
 
        //Validate E2tData
        data2 := models.E2tData{
@@ -121,24 +119,31 @@ func TestValidateXappSubscriptionsData(t *testing.T) {
        }
        /*err = validateE2tData(&data2)*/
 
-       e2tchannel := make(chan *models.E2tData, 10)
-       _ = createNewE2tHandleHandlerImpl(e2tchannel, &data2)
-       defer close(e2tchannel)
+       //e2tchannel := make(chan *models.E2tData, 10)
+       _ = createNewE2tHandleHandlerImpl(&data2)
+       //defer close(e2tchannel)
 
        //test case for provideXappSubscriptionHandleImp
-       datachannel := make(chan *models.XappSubscriptionData, 10)
-       _ = provideXappSubscriptionHandleImpl(datachannel, &data1)
-       defer close(datachannel)
+       //datachannel := make(chan *models.XappSubscriptionData, 10)
+       sdlEngine, _ = sdl.GetSdl("file")
+       _ = provideXappSubscriptionHandleImpl( &data1)
+       //defer close(datachannel)
 
        //test case for deleteXappSubscriptionHandleImpl
-       _ = deleteXappSubscriptionHandleImpl(datachannel, &data1)
+       _ = deleteXappSubscriptionHandleImpl(&data1)
 
        data3 := models.XappSubscriptionData{
                Address:        swag.String("10.55.55.5"),
                Port:           &p,
                SubscriptionID: swag.Int32(123456)}
        //test case for deleteXappSubscriptionHandleImpl
-       _ = deleteXappSubscriptionHandleImpl(datachannel, &data3)
+       _ = deleteXappSubscriptionHandleImpl(&data3)
+       data4 := models.XappSubscriptionData{
+               Address:        swag.String("1.5.5.5"),
+               Port:           &p,
+               SubscriptionID: swag.Int32(1236)}
+       _ = deleteXappSubscriptionHandleImpl(&data4)
+
 }
 
 func TestValidateE2tDataEmpty(t *testing.T) {
@@ -174,6 +179,8 @@ func TestValidateE2tDatavalid(t *testing.T) {
        err := validateE2tData(&data)
        t.Log(err)
 
+       _ = createNewE2tHandleHandlerImpl(&data)
+
 }
 
 func TestValidateE2tDatavalidEndpointPresent(t *testing.T) {
@@ -321,13 +328,12 @@ func TestValidateE2TAddressRANListData(t *testing.T) {
 
 func TestAssociateRanToE2THandlerImpl(t *testing.T) {
 
-       associateranchan := make(chan models.RanE2tMap, 10)
        data := models.RanE2tMap{
                                {
                                        E2TAddress: swag.String("10.101.01.1:8098"),
                        },
        }
-       err := associateRanToE2THandlerImpl(associateranchan, data)
+       err := associateRanToE2THandlerImpl( data)
        if (err != nil ) {
                t.Log(err)
        }
@@ -345,13 +351,11 @@ func TestAssociateRanToE2THandlerImpl(t *testing.T) {
                                        E2TAddress: swag.String("10.101.01.1:8098"),
                        },
        }
-       err = associateRanToE2THandlerImpl(associateranchan, data)
+       err = associateRanToE2THandlerImpl(data)
        if (err != nil ) {
                t.Log(err)
        }
-       data1 := <-associateranchan
 
-       fmt.Println(data1)
 //################ Delete End Point dummy entry  
     delete(rtmgr.Eps, uuid);
 //#####################
@@ -359,14 +363,13 @@ func TestAssociateRanToE2THandlerImpl(t *testing.T) {
 
 func TestDisassociateRanToE2THandlerImpl(t *testing.T) {
 
-       disassranchan  := make(chan models.RanE2tMap, 10)
 
        data := models.RanE2tMap{
                                {
                                        E2TAddress: swag.String("10.101.01.1:8098"),
                        },
        }
-       err := disassociateRanToE2THandlerImpl(disassranchan, data)
+       err := disassociateRanToE2THandlerImpl(data)
        if (err != nil ) {
                t.Log(err)
        }
@@ -383,13 +386,11 @@ func TestDisassociateRanToE2THandlerImpl(t *testing.T) {
                                        E2TAddress: swag.String("10.101.01.1:8098"),
                        },
        }
-       err = disassociateRanToE2THandlerImpl(disassranchan, data)
+       err = disassociateRanToE2THandlerImpl(data)
        if (err != nil ) {
                t.Log(err)
        }
-       data1 := <-disassranchan
 
-       fmt.Println(data1)
 //################ Delete End Point dummy entry  
     delete(rtmgr.Eps, uuid);
 //#####################
@@ -397,11 +398,10 @@ func TestDisassociateRanToE2THandlerImpl(t *testing.T) {
 
 func TestDeleteE2tHandleHandlerImpl(t *testing.T) {
 
-       e2tdelchan := make(chan *models.E2tDeleteData, 10)
        data := models.E2tDeleteData{
                E2TAddress: swag.String(""),
        }
-       err := deleteE2tHandleHandlerImpl(e2tdelchan, &data)
+       err := deleteE2tHandleHandlerImpl(&data)
        if (err != nil ) {
                t.Log(err)
        }
@@ -417,13 +417,10 @@ func TestDeleteE2tHandleHandlerImpl(t *testing.T) {
        data = models.E2tDeleteData{
                E2TAddress: swag.String("10.101.01.1:8098"),
        }
-       err = deleteE2tHandleHandlerImpl(e2tdelchan, &data)
+       err = deleteE2tHandleHandlerImpl(&data)
        if (err != nil ) {
                t.Log(err)
        }
-       data1 := <-e2tdelchan
-
-       fmt.Println(data1)
 //################ Delete End Point dummy entry  
     delete(rtmgr.Eps, uuid);
 //#####################
@@ -466,47 +463,38 @@ func TestHttpInstance(t *testing.T) {
        err := httpinstance.Terminate()
        t.Log(err)
 
-       triggerSBI := make(chan bool)
        createMockPlatformComponents()
        //ts := createMockAppmgrWithData("127.0.0.1:3000", BasicXAppLists, nil)
        //ts.Start()
        //defer ts.Close()
        var m sync.Mutex
-       err = httpinstance.Initialize(XMURL, "httpgetter", "rt.json", "config.json", E2MURL, sdlEngine, rpeEngine, triggerSBI, &m)
+       err = httpinstance.Initialize(XMURL, "httpgetter", "rt.json", "config.json", E2MURL, sdlEngine, rpeEngine, &m)
 }
 
-func TestXappCallbackDataChannelwithdata(t *testing.T) {
+func TestXappCallbackWithData(t *testing.T) {
        data := models.XappCallbackData{
                XApps:   *swag.String("[]"),
                Version: *swag.Int64(1),
                Event:   *swag.String("someevent"),
                ID:      *swag.String("123456")}
-       datach := make(chan *models.XappCallbackData, 1)
-       go func() { _, _ = recvXappCallbackData(datach) }()
-       defer close(datach)
-       datach <- &data
+        _, _ = recvXappCallbackData(&data)
 }
-func TestXappCallbackDataChannelNodata(t *testing.T) {
-       datach := make(chan *models.XappCallbackData, 1)
-       go func() { _, _ = recvXappCallbackData(datach) }()
-       defer close(datach)
+
+func TestXappCallbackNodata(t *testing.T) {
+       //data := *models.XappCallbackData
+        _, _ = recvXappCallbackData(nil)
 }
 
-func TestE2TChannelwithData(t *testing.T) {
-       data2 := models.E2tData{
-               E2TAddress: swag.String("1.2.3.4"),
-               RanNamelist: []string{"ran1","ran2"},
-       }
-       dataChannel := make(chan *models.E2tData, 10)
-       go func() { _, _,_ = recvNewE2Tdata(dataChannel) }()
-       defer close(dataChannel)
-       dataChannel <- &data2
+func TestE2TwithData(t *testing.T) {
+        data2 := models.E2tData{
+                E2TAddress: swag.String("1.2.3.4"),
+                RanNamelist: []string{"ran1","ran2"},
+        }
+         _, _,_ = recvNewE2Tdata(&data2)
 }
 
-func TestE2TChannelwithNoData(t *testing.T) {
-       dataChannel := make(chan *models.E2tData, 10)
-       go func() { _, _ ,_= recvNewE2Tdata(dataChannel) }()
-       defer close(dataChannel)
+func TestE2TwithNoData(t *testing.T) {
+         _, _,_ = recvNewE2Tdata(nil)
 }
 
 func TestProvideXappSubscriptionHandleImpl(t *testing.T) {
@@ -515,12 +503,7 @@ func TestProvideXappSubscriptionHandleImpl(t *testing.T) {
                Address:        swag.String("10.0.0.0"),
                Port:           &p,
                SubscriptionID: swag.Int32(1234)}
-       datachannel := make(chan *models.XappSubscriptionData, 10)
-       go func() { _ = provideXappSubscriptionHandleImpl(datachannel, &data) }()
-       defer close(datachannel)
-       datachannel <- &data
-
-       //subdel test
+        _ = provideXappSubscriptionHandleImpl(&data)
 }
 
 func createMockAppmgrWithData(url string, g []byte, p []byte, t []byte) *httptest.Server {
@@ -579,58 +562,18 @@ func createMockPlatformComponents() {
        _ = ioutil.WriteFile(filename, file, 644)
 }
 
-func TestRecvXappCallbackData(t *testing.T) {
-       data := models.XappCallbackData{
-               XApps:   *swag.String("[]"),
-               Version: *swag.Int64(1),
-               Event:   *swag.String("any"),
-               ID:      *swag.String("123456"),
-       }
-
-       ch := make(chan *models.XappCallbackData)
-       defer close(ch)
-       httpRestful := NewHttpRestful()
-       go func() { ch <- &data }()
-       time.Sleep(1 * time.Second)
-       t.Log(string(len(ch)))
-       xappList, err := httpRestful.RecvXappCallbackData(ch)
-       if err != nil {
-               t.Error("Receive failed: " + err.Error())
-       } else {
-               if xappList == nil {
-                       t.Error("Expected an XApp notification list")
-               } else {
-                       t.Log("whatever")
-               }
-       }
-}
-
 func TestProvideXappHandleHandlerImpl(t *testing.T) {
-       datach := make(chan *models.XappCallbackData, 10)
-       defer close(datach)
        data := models.XappCallbackData{
                XApps:   *swag.String("[]"),
                Version: *swag.Int64(1),
                Event:   *swag.String("someevent"),
                ID:      *swag.String("123456")}
-       var httpRestful, _ = GetNbi("httpRESTful")
-       err := httpRestful.(*HttpRestful).ProvideXappHandleHandlerImpl(datach, &data)
-       if err != nil {
-               t.Error("Error occured: " + err.Error())
-       } else {
-               recv := <-datach
-               if recv == nil {
-                       t.Error("Something gone wrong: " + err.Error())
-               } else {
-                       if recv != &data {
-                               t.Error("Malformed data on channel")
-                       }
-               }
-       }
+       err := provideXappHandleHandlerImpl( &data)
 
        //Empty XappCallbackdata
        data1 := models.XappCallbackData{}
-       err = httpRestful.(*HttpRestful).ProvideXappHandleHandlerImpl(datach, &data1)
+       err = provideXappHandleHandlerImpl(&data1)
+       t.Log(err)
 }
 
 func TestValidateXappCallbackData(t *testing.T) {
@@ -738,3 +681,45 @@ func TestInvalidarguments(t *testing.T) {
        _ = PostSubReq("\n","nbifinterface")
        _ = PostSubReq("xmurl","\n")
 }
+
+func TestInitEngine(t *testing.T) {
+       initRtmgr()
+}
+
+func TestUpdateXappSubscription(t *testing.T) {
+       ep := make(map[string]*rtmgr.Endpoint)
+        ep["dummy"] = &rtmgr.Endpoint{Uuid: "10.0.0.1:0", Name: "E2TERM", XAppType: "app1", Ip: "10.1.1.1", Port: 1234, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: true}
+
+        rtmgr.Eps = ep
+
+
+       p := uint16(1234)
+       xapp := models.XappElement{
+               Address:        swag.String("10.1.1.1"),
+                Port:           &p,
+       }
+
+       var b models.XappList
+       b = append(b,&xapp)
+       _ = updateXappSubscriptionHandleImpl(&b, 10)
+
+       //Test case when subscriptions already exist
+        data := models.XappSubscriptionData{
+                Address:        swag.String("10.0.0.0"),
+                Port:           &p,
+                SubscriptionID: swag.Int32(12345)}
+
+        rtmgr.Subs = *stub.ValidSubscriptions
+
+        subscriptionExists(&data)
+        addSubscription(&rtmgr.Subs, &data)
+       _ = updateXappSubscriptionHandleImpl(&b, 10)
+
+
+}
+
+func TestDumpDebugdata(t *testing.T) {
+       _,_ = dumpDebugData()
+}
+
+
index f538ac4..722289e 100644 (file)
@@ -40,7 +40,7 @@ import (
 type FetchAllXAppsHandler func(string) (*[]rtmgr.XApp, error)
 type RecvXappCallbackDataHandler func(<-chan *models.XappCallbackData) (*[]rtmgr.XApp, error)
 type RecvNewE2TdataHandler func(<-chan *models.E2tData) (*rtmgr.E2TInstance, string, error)
-type LaunchRestHandler func(*string, chan<- *models.XappCallbackData, chan<- *models.XappSubscriptionData, chan<- *rtmgr.XappList, chan<- *models.XappSubscriptionData, chan<- *models.E2tData, chan<- models.RanE2tMap, chan<- models.RanE2tMap, chan<- *models.E2tDeleteData)
+type LaunchRestHandler func(*string)
 type ProvideXappHandleHandlerImpl func(chan<- *models.XappCallbackData, *models.XappCallbackData) error
 type RetrieveStartupDataHandler func(string, string, string, string, string, sdl.Engine) error
 
@@ -53,6 +53,6 @@ type EngineConfig struct {
 }
 
 type Engine interface {
-       Initialize(string, string, string, string, string, sdl.Engine, rpe.Engine, chan<- bool, *sync.Mutex) error
+       Initialize(string, string, string, string, string, sdl.Engine, rpe.Engine, *sync.Mutex) error
        Terminate() error
 }
index 950cd99..fe61356 100644 (file)
@@ -22,7 +22,7 @@
 ==================================================================================
 */
 /*
-       Mnemonic:       nngpub_test.go
+       Mnemonic:       rmrpub_test.go
        Abstract:
        Date:           25 April 2019
 */
index b080bcb..39c2bb1 100644 (file)
@@ -115,7 +115,8 @@ func (r *Rpe) addRoute(messageType string, tx *rtmgr.Endpoint, rx *rtmgr.Endpoin
                if rx != nil {
                        rxList = []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
                }
-               messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+               //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+               messageId := rtmgr.Mtype[messageType]
                route := rtmgr.RouteTableEntry{
                        MessageType: messageId,
                        TxList:      txList,
@@ -143,7 +144,8 @@ func (r *Rpe) addRoute_rx_list(messageType string, tx *rtmgr.Endpoint, rx []rtmg
                }
        }
 
-       messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+       //messageId := strconv.Itoa(xapp.RICMessageTypes[messageType])
+       messageId := rtmgr.Mtype[messageType]
        route := rtmgr.RouteTableEntry{
                MessageType: messageId,
                TxList:      txList,
@@ -250,8 +252,6 @@ func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.
                        sendEp = subManEp
                case "E2MAN":
                        sendEp = e2ManEp
-               //case "UEMAN":
-               //      sendEp = ueManEp
                case "RSM":
                        sendEp = rsmEp
                case "A1MEDIATOR":
@@ -300,11 +300,6 @@ func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable
                xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
                xapp.Logger.Debug("Endpoints: %v", endPointList)
        }
-       /*ueManEp := getEndpointByName(&endPointList, "UEMAN")
-       if ueManEp == nil {
-               xapp.Logger.Error("Platform component not found: %v", "UE Manger")
-               xapp.Logger.Debug("Endpoints: %v", endPointList)
-       }*/
        rsmEp := getEndpointByName(&endPointList, "RSM")
        if rsmEp == nil {
                xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
index 06d0f15..b636e12 100644 (file)
@@ -35,6 +35,7 @@ import (
        "github.com/ghodss/yaml"
        "io/ioutil"
        "os"
+       "strings"
 )
 
 var (
@@ -122,12 +123,14 @@ var (
        Eps  Endpoints
        Subs SubscriptionList
        PrsCfg  *PlatformRoutes
+       Mtype MessageTypeList
 )
 
 func GetPlatformComponents(configfile string) (*PlatformComponents, error) {
        xapp.Logger.Debug("Invoked rtmgr.GetPlatformComponents(" + configfile + ")")
        var rcfg ConfigRtmgr
        var rtroutes RtmgrRoutes
+       var mtypes MessageTypeIdentifier
        yamlFile, err := os.Open(configfile)
        if err != nil {
                return nil, errors.New("cannot open the file due to: " + err.Error())
@@ -147,6 +150,16 @@ func GetPlatformComponents(configfile string) (*PlatformComponents, error) {
         }
         PrsCfg = &(rtroutes.Prs)
 
+       err = json.Unmarshal(jsonByteValue,&mtypes)
+        if err != nil {
+               return nil, errors.New("cannot parse data due to: " + err.Error())
+        } else {
+               xapp.Logger.Debug("Messgaetypes = %v", mtypes)
+               for _,m := range mtypes.Mit {
+                       splitstr := strings.Split(m,"=")
+                       Mtype[splitstr[0]] = splitstr[1]
+               }
+       }
        err = json.Unmarshal(jsonByteValue, &rcfg)
        if err != nil {
                return nil, errors.New("cannot parse data due to: " + err.Error())
index 2846173..138c2b5 100644 (file)
@@ -39,6 +39,8 @@ type Endpoints map[string]*Endpoint
 
 type SubscriptionList []Subscription
 
+type MessageTypeList map[string]string
+
 //TODO: uuid is not a real UUID but a string of "ip:port"
 // this should be changed to real UUID later on which should come from xApp Manager // petszila
 type Endpoint struct {
@@ -102,6 +104,12 @@ type ConfigRtmgr struct {
        Pcs PlatformComponents `json:"PlatformComponents"`
 }
 
+
+type MessageTypeIdentifier struct {
+       Mit []string `json:"messagetypes"`
+}
+
+
 type RicComponents struct {
        XApps []XApp
        E2Ts  map [string]E2TInstance
index 74e077b..1f0e0e6 100644 (file)
 ==================================================================================
 */
 /*
-  Mnemonic:    nngpipe.go
-  Abstract: mangos (NNG) Pipeline SBI implementation
+  Mnemonic:    rmrpipe.go
+  Abstract: mangos (RMR) Pipeline SBI implementation
   Date:                12 March 2019
 */
 
 package sbi
 
+/*
+#include <rmr/rmr.h>
+*/
+import "C"
+
 import (
        "bytes"
        "crypto/md5"
@@ -35,15 +40,23 @@ import (
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "routing-manager/pkg/rtmgr"
        "strconv"
-       //"time"
+       "strings"
        "fmt"
 )
 
-type NngPush struct {
+var rmrcallid = 1
+var rmrdynamiccallid = 201
+
+type RmrPush struct {
        Sbi
        rcChan chan *xapp.RMRParams
 }
 
+type EPStatus struct {
+        endpoint string
+        status   bool
+}
+
 type RMRParams struct {
        *xapp.RMRParams
 }
@@ -55,23 +68,23 @@ func (params *RMRParams) String() string {
        return b.String()
 }
 
-func NewNngPush() *NngPush {
-       instance := new(NngPush)
+func NewRmrPush() *RmrPush {
+       instance := new(RmrPush)
        return instance
 }
 
-func (c *NngPush) Initialize(ip string) error {
+func (c *RmrPush) Initialize(ip string) error {
        return nil
 }
 
-func (c *NngPush) Terminate() error {
+func (c *RmrPush) Terminate() error {
        return nil
 }
 
-func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
 
        xapp.Logger.Debug("Invoked sbi.AddEndpoint")
-       endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
+       endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
        ep.Whid = int(xapp.Rmr.Openwh(endpoint))
        if ep.Whid < 0 {
                return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
@@ -82,7 +95,7 @@ func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
        return nil
 }
 
-func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
        xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
        xapp.Logger.Debug("args: %v", *ep)
 
@@ -90,65 +103,105 @@ func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
        return nil
 }
 
-func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
        c.updateEndpoints(rcs, c)
 }
 
-func (c *NngPush) DistributeAll(policies *[]string) error {
+func (c *RmrPush) DistributeAll(policies *[]string) error {
        xapp.Logger.Debug("Invoked: sbi.DistributeAll")
        xapp.Logger.Debug("args: %v", *policies)
 
-       for _, ep := range rtmgr.Eps {
+       /*for _, ep := range rtmgr.Eps {
                go c.send(ep, policies)
+       }*/
+       channel := make(chan EPStatus)
+
+       if rmrcallid == 200 {
+               rmrcallid = 1
        }
 
+        for _, ep := range rtmgr.Eps {
+                go c.send_sync(ep,  policies, channel, rmrcallid)
+        }
+       rmrcallid++
+
+        count := 0
+        result := make([]EPStatus, len(rtmgr.Eps))
+        for i, _ := range result {
+                result[i] = <-channel
+                if result[i].status == true {
+                        count++
+                } else {
+                        xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
+                }
+        }
+
+        if count < len(rtmgr.Eps) {
+                return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
+        }
+
+
        return nil
 }
 
-func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
-       xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
+        xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
 
-       var policy = []byte{}
-       cumulative_policy := 0
-       count := 0
-       maxrecord := xapp.Config.GetInt("maxrecord")
-       if maxrecord == 0 {
-               maxrecord = 10
-       }
+       ret := c.send_data(ep, policies, call_id)
 
-       for _, pe := range *policies {
-               b := []byte(pe)
-               for j := 0; j < len(b); j++ {
-                       policy = append(policy, b[j])
-               }
-               count++
-               cumulative_policy++
-               if count == maxrecord || cumulative_policy == len(*policies) {
-                       params := &RMRParams{&xapp.RMRParams{}}
-                       params.Mtype = 20
-                       params.PayloadLen = len(policy)
-                       params.Payload = []byte(policy)
-                       params.Mbuf = nil
-                       params.Whid = ep.Whid
-                       xapp.Rmr.SendMsg(params.RMRParams)
-                       count = 0
-                       policy = nil
-                       xapp.Logger.Debug("Sent message with payload len = %d to %s", params.PayloadLen, ep.Uuid)
-               }
-       }
+        channel <- EPStatus{ep.Uuid, ret}
 
-       xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
 }
 
-func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
+func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
+        xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
+        var state int
+        var retstr string
+
+        var policy = []byte{}
+
+        for _, pe := range *policies {
+                b:= []byte(pe)
+                for j:=0; j<len(b); j++{
+                        policy = append(policy,b[j])
+                }
+       }
+        params := &RMRParams{&xapp.RMRParams{}}
+        params.Mtype = 20
+        params.PayloadLen = len(policy)
+        params.Payload =[]byte(policy)
+        params.Mbuf = nil
+        params.Whid = ep.Whid
+        params.Callid = call_id
+        params.Timeout = 200
+        state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+       routestatus := strings.Split(retstr," ")
+        if state != C.RMR_OK && routestatus[0] == "OK" {
+              xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+              return false
+        } else {
+               xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+              return true
+        }
+
+        xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
+        return false
+}
+
+func (c *RmrPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
        return c.createEndpoint(payload, c)
 }
 
-func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
+func (c *RmrPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
        xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
        xapp.Logger.Debug("args: %v", *policies)
 
-       go c.send(ep, policies)
+       if rmrdynamiccallid == 255 {
+                rmrdynamiccallid = 201
+        }
+
+       go c.send_data(ep, policies,rmrdynamiccallid)
+       rmrdynamiccallid++
 
        return nil
 }
index 954dcaa..740f5e8 100644 (file)
@@ -21,7 +21,7 @@
 ==================================================================================
 */
 /*
-       Mnemonic:       nngpush_test.go
+       Mnemonic:       rmrpush_test.go
        Abstract:
        Date:           3 May 2019
 */
@@ -55,7 +55,7 @@ func TestMain(m *testing.M) {
 /*
 Resets the EndpointList according to argumnets
 */
-func resetTestPushDataset(instance NngPush, testdata []rtmgr.Endpoint) {
+func resetTestPushDataset(instance RmrPush, testdata []rtmgr.Endpoint) {
        rtmgr.Eps = make(map[string]*rtmgr.Endpoint)
        for _, endpoint := range testdata {
                ep := endpoint
@@ -65,101 +65,101 @@ func resetTestPushDataset(instance NngPush, testdata []rtmgr.Endpoint) {
 }
 
 /*
-nngpush.Initialize() method is empty, nothing to be tested
+rmrpush.Initialize() method is empty, nothing to be tested
 */
-func TestNngPushInitialize(t *testing.T) {
-       var nngpush = NngPush{}
+func TestRmrPushInitialize(t *testing.T) {
+       var rmrpush = RmrPush{}
 
-       _ = nngpush.Initialize("")
+       _ = rmrpush.Initialize("")
 }
 
 /*
-nngpush.Terminate() method is empty, nothing to be tested
+rmrpush.Terminate() method is empty, nothing to be tested
 */
-func TestNngPushTerminate(t *testing.T) {
-       var nngpush = NngPush{}
+func TestRmrPushTerminate(t *testing.T) {
+       var rmrpush = RmrPush{}
 
-       _ = nngpush.Terminate()
+       _ = rmrpush.Terminate()
 }
 
 /*
-nngpush.UpdateEndpoints() is testd against stub.ValidXApps dataset
+rmrpush.UpdateEndpoints() is testd against stub.ValidXApps dataset
 */
-func TestNngPushUpdateEndpoints(t *testing.T) {
-       var nngpush = NngPush{}
-       resetTestPushDataset(nngpush, stub.ValidEndpoints)
+func TestRmrPushUpdateEndpoints(t *testing.T) {
+       var rmrpush = RmrPush{}
+       resetTestPushDataset(rmrpush, stub.ValidEndpoints)
 
-       nngpush.UpdateEndpoints(&stub.ValidRicComponents)
+       rmrpush.UpdateEndpoints(&stub.ValidRicComponents)
        if rtmgr.Eps == nil {
-               t.Errorf("nngpush.UpdateEndpoints() result was incorrect, got: %v, want: %v.", nil, "rtmgr.Endpoints")
+               t.Errorf("rmrpush.UpdateEndpoints() result was incorrect, got: %v, want: %v.", nil, "rtmgr.Endpoints")
        }
 }
 
 /*
-nngpush.AddEndpoint() is tested for happy path case
+rmrpush.AddEndpoint() is tested for happy path case
 */
-func TestNngPushAddEndpoint(t *testing.T) {
+func TestRmrPushAddEndpoint(t *testing.T) {
 //     var err error
-       var nngpush = NngPush{}
-       resetTestPushDataset(nngpush, stub.ValidEndpoints)
-       _ = nngpush.AddEndpoint(rtmgr.Eps["localhost"])
+       var rmrpush = RmrPush{}
+       resetTestPushDataset(rmrpush, stub.ValidEndpoints)
+       _ = rmrpush.AddEndpoint(rtmgr.Eps["localhost"])
 /*     if err != nil {
-               t.Errorf("nngpush.AddEndpoint() return was incorrect, got: %v, want: %v.", err, "nil")
+               t.Errorf("rmrpush.AddEndpoint() return was incorrect, got: %v, want: %v.", err, "nil")
        }*/
 }
 
 
 /*
-nngpush.DistributeAll() is tested for happy path case
+rmrpush.DistributeAll() is tested for happy path case
 */
-func TestNngPushDistributeAll(t *testing.T) {
+func TestRmrPushDistributeAll(t *testing.T) {
        var err error
-       var nngpush = NngPush{}
-       resetTestPushDataset(nngpush, stub.ValidEndpoints)
+       var rmrpush = RmrPush{}
+       resetTestPushDataset(rmrpush, stub.ValidEndpoints)
 
-       err = nngpush.DistributeAll(stub.ValidPolicies)
+       err = rmrpush.DistributeAll(stub.ValidPolicies)
        if err != nil {
-               t.Errorf("nngpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil")
+               t.Errorf("rmrpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil")
        }
 }
 
 /*
-nngpush.DistributeToEp() is tested for Sending case
+rmrpush.DistributeToEp() is tested for Sending case
 */
 func TestDistributeToEp(t *testing.T) {
        var err error
-       var nngpush = NngPush{}
-       resetTestPushDataset(nngpush, stub.ValidEndpoints)
+       var rmrpush = RmrPush{}
+       resetTestPushDataset(rmrpush, stub.ValidEndpoints)
 
-       err = nngpush.DistributeToEp(stub.ValidPolicies,rtmgr.Eps["localhost"])
+       err = rmrpush.DistributeToEp(stub.ValidPolicies,rtmgr.Eps["localhost"])
        if err != nil {
-               t.Errorf("nngpush.DistributetoEp(policies) was incorrect, got: %v, want: %v.", err, "nil")
+               t.Errorf("rmrpush.DistributetoEp(policies) was incorrect, got: %v, want: %v.", err, "nil")
        }
 }
 
 func TestDeleteEndpoint(t *testing.T) {
        var err error
-       var nngpush = NngPush{}
-       resetTestPushDataset(nngpush, stub.ValidEndpoints)
+       var rmrpush = RmrPush{}
+       resetTestPushDataset(rmrpush, stub.ValidEndpoints)
 
-       err = nngpush.DeleteEndpoint(rtmgr.Eps["localhost"])
+       err = rmrpush.DeleteEndpoint(rtmgr.Eps["localhost"])
        if err != nil {
-               t.Errorf("nngpush.DeleteEndpoint() was incorrect, got: %v, want: %v.", err, "nil")
+               t.Errorf("rmrpush.DeleteEndpoint() was incorrect, got: %v, want: %v.", err, "nil")
        }
 }
 
 func TestCreateEndpoint(t *testing.T) {
-       var nngpush = NngPush{}
-       resetTestPushDataset(nngpush, stub.ValidEndpoints1)
-        nngpush.CreateEndpoint("192.168.0.1:0")
-        nngpush.CreateEndpoint("localhost:4560")
+       var rmrpush = RmrPush{}
+       resetTestPushDataset(rmrpush, stub.ValidEndpoints1)
+        rmrpush.CreateEndpoint("192.168.0.1:0")
+        rmrpush.CreateEndpoint("localhost:4560")
 }
 /*
 Initialize and send policies
 */
-func TestNngPushInitializeandsendPolicies(t *testing.T) {
-        var nngpush = NngPush{}
-       resetTestPushDataset(nngpush, stub.ValidEndpoints)
+func TestRmrPushInitializeandsendPolicies(t *testing.T) {
+        var rmrpush = RmrPush{}
+       resetTestPushDataset(rmrpush, stub.ValidEndpoints)
         policies := []string{"hello","welcome"}
-       nngpush.send(rtmgr.Eps["localhost"],&policies)
+       rmrpush.send_data(rtmgr.Eps["localhost"],&policies,1)
 }
index be415f3..dca7278 100644 (file)
@@ -39,17 +39,17 @@ import (
        "strings"
 )
 
-const DefaultNngPipelineSocketPrefix = "tcp://"
-const DefaultNngPipelineSocketNumber = 4561
+const DefaultRmrPipelineSocketPrefix = "tcp://"
+const DefaultRmrPipelineSocketNumber = 4561
 const PlatformType = "platform"
 
 var (
        SupportedSbis = []*EngineConfig{
                {
-                       Name:        "nngpush",
+                       Name:        "rmrpush",
                        Version:     "v1",
-                       Protocol:    "nngpipeline",
-                       Instance:    NewNngPush(),
+                       Protocol:    "rmrpipeline",
+                       Instance:    NewRmrPush(),
                        IsAvailable: true,
                },
        }
index 68c7654..aa12d60 100644 (file)
@@ -36,15 +36,15 @@ import (
 
 func TestGetSbi(t *testing.T) {
        var errtype = errors.New("")
-       var sbitype = new(NngPush)
-       var invalids = []string{"nngpus", ""}
+       var sbitype = new(RmrPush)
+       var invalids = []string{"rmrpus", ""}
 
-       sbii, err := GetSbi("nngpush")
+       sbii, err := GetSbi("rmrpush")
        if err != nil {
-               t.Errorf("GetSbi(nngpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(err), nil)
+               t.Errorf("GetSbi(rmrpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(err), nil)
        }
        if reflect.TypeOf(sbii) != reflect.TypeOf(sbitype) {
-               t.Errorf("GetSbi(nngpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(sbii), reflect.TypeOf(sbitype))
+               t.Errorf("GetSbi(rmrpub) was incorrect, got: %v, want: %v.", reflect.TypeOf(sbii), reflect.TypeOf(sbitype))
        }
 
        for _, arg := range invalids {
@@ -58,14 +58,14 @@ func TestGetSbi(t *testing.T) {
 func TestUpdateE2TendPoint(t *testing.T) {
        var err error
        var sbi = Sbi{}
-       sbii, err := GetSbi("nngpush")
+       sbii, err := GetSbi("rmrpush")
 
        var EP = make(map[string]*rtmgr.Endpoint)
        EP["127.0.0.2"] = &rtmgr.Endpoint{Uuid: "127.0.0.2", Name: "E2TERM", XAppType: "app1", Ip: "127.0.0.2", Port: 4562, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: false}
        rtmgr.Eps = EP
 
-       var nngpush = NngPush{}
-       nngpush.AddEndpoint(rtmgr.Eps["127.0.0.2"])
+       var rmrpush = RmrPush{}
+       rmrpush.AddEndpoint(rtmgr.Eps["127.0.0.2"])
 
        var E2map = make(map[string]rtmgr.E2TInstance)
 
@@ -82,13 +82,13 @@ func TestUpdateE2TendPoint(t *testing.T) {
 func TestPruneEndpointList(t *testing.T) {
        var sbi = Sbi{}
        var err error
-       sbii, err := GetSbi("nngpush")
+       sbii, err := GetSbi("rmrpush")
        var EP = make(map[string]*rtmgr.Endpoint)
        EP["127.0.0.2"] = &rtmgr.Endpoint{Uuid: "127.0.0.2", Name: "E2TERM", XAppType: "app1", Ip: "127.0.0.1", Port: 4562, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: false}
        rtmgr.Eps = EP
 
-       var nngpush = NngPush{}
-       nngpush.AddEndpoint(rtmgr.Eps["127.0.0.2"])
+       var rmrpush = RmrPush{}
+       rmrpush.AddEndpoint(rtmgr.Eps["127.0.0.2"])
 
        sbi.pruneEndpointList(sbii)
        t.Log(err)
index c0ab373..232a1dc 100644 (file)
@@ -49,11 +49,11 @@ type Engine interface {
        DistributeToEp(*[]string, *rtmgr.Endpoint) error
 }
 
-type NngSocket interface {
+/*type NngSocket interface {
        Listen(string) error
        Send([]byte) error
        Close() error
        DialOptions(string, map[string]interface{}) error
 }
 
-type CreateNewNngSocketHandler func() (NngSocket, error)
+type CreateNewNngSocketHandler func() (NngSocket, error)*/