Release of Routing Manager v0.3.0
[ric-plt/rtmgr.git] / pkg / sbi / nngpub.go
index c48a40f..ae96aa4 100644 (file)
@@ -17,7 +17,7 @@
 ==================================================================================
 */
 /*
-  Mnemonic:    nngpub.go
+  Mnemonic:    NngPub.go
   Abstract:    mangos (NNG) Pub/Sub SBI implementation
   Date:                12 March 2019
 */
@@ -26,52 +26,88 @@ package sbi
 
 import (
        "errors"
-       "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/pub"
        _ "nanomsg.org/go/mangos/v2/transport/all"
-       "rtmgr"
+       "routing-manager/pkg/rtmgr"
        "strconv"
 )
 
-var socket mangos.Socket
+type NngPub struct {
+       Sbi
+       socket NngSocket
+       NewSocket CreateNewNngSocketHandler
+}
 
-func createNngPubEndpointSocket(ep *rtmgr.Endpoint) error {
-       return nil
+func NewNngPub() *NngPub {
+       instance := new(NngPub)
+       instance.NewSocket = createNewPubSocket
+       return instance
 }
 
-func destroyNngPubEndpointSocket(ep *rtmgr.Endpoint) error {
-       return nil
+func createNewPubSocket() (NngSocket, error) {
+       rtmgr.Logger.Debug("Invoked createNewPubSocket()")
+       s, err := pub.NewSocket()
+       if err != nil {
+               return nil, errors.New("can't create new pub socket due to: " + err.Error())
+       }
+       return s, nil
 }
 
-/*
-Creates the NNG publication channel
-*/
-func openNngPub(ip string) error {
+func (c *NngPub) Initialize(ip string) error {
+       rtmgr.Logger.Debug("Invoked sbi.Initialize("+ ip +")")
        var err error
-       if socket, err = pub.NewSocket(); err != nil {
-               return errors.New("can't get new pub socket due to:" + err.Error())
+       c.socket, err = c.NewSocket()
+       if err != nil {
+               return errors.New("create socket error due to: " + err.Error())
+       }
+       if err = c.listen(ip); err != nil {
+               return errors.New("can't listen on socket due to: " + err.Error())
        }
+       return nil
+}
+
+func (c *NngPub) Terminate() error {
+       rtmgr.Logger.Debug("Invoked sbi.Terminate()")
+       return c.closeSocket()
+}
+
+func (c *NngPub) AddEndpoint(ep *rtmgr.Endpoint) error {
+       return nil
+}
+
+func (c *NngPub) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+       return nil
+}
+
+func (c *NngPub) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+       c.updateEndpoints(rcs, c)
+}
+
+func (c *NngPub) listen(ip string) error {
+       rtmgr.Logger.Debug("Start listening on: " + ip)
        uri := DEFAULT_NNG_PUBSUB_SOCKET_PREFIX + ip + ":" + strconv.Itoa(DEFAULT_NNG_PUBSUB_SOCKET_NUMBER)
        rtmgr.Logger.Info("publishing on: " + uri)
-       if err = socket.Listen(uri); err != nil {
-               return errors.New("can't publish on socket " + uri + " due to:" + err.Error())
+       if err := c.socket.(NngSocket).Listen(uri); err != nil {
+               return errors.New("can't publish on socket " + uri + " due to: " + err.Error())
        }
        return nil
 }
 
-func closeNngPub() error {
-       if err := socket.Close(); err != nil {
-               return errors.New("can't close socket due to:" + err.Error())
+func (c *NngPub) closeSocket() error {
+       rtmgr.Logger.Debug("Close NngPub Socket")
+       if err := c.socket.(NngSocket).Close(); err != nil {
+               return errors.New("can't close socket due to: " + err.Error())
        }
        return nil
 }
 
-func publishAll(policies *[]string) error {
+func (c *NngPub) DistributeAll(policies *[]string) error {
+       rtmgr.Logger.Debug("Invoked: sbi.DistributeAll(), args: %v",(*policies))
        for _, pe := range *policies {
-               if err := socket.Send([]byte(pe)); err != nil {
+               if err := c.socket.(NngSocket).Send([]byte(pe)); err != nil {
                        return errors.New("Unable to send policy entry due to: " + err.Error())
                }
        }
-       rtmgr.Logger.Info("NNG PUB: OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+       rtmgr.Logger.Info("NNG PUB: OK (# of Entries: " + strconv.Itoa(len((*policies))) + ")")
        return nil
 }