==================================================================================
*/
/*
- Mnemonic: nngpub.go
+ Mnemonic: NngPub.go
Abstract: mangos (NNG) Pub/Sub SBI implementation
Date: 12 March 2019
*/
import (
"errors"
- "nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/pub"
_ "nanomsg.org/go/mangos/v2/transport/all"
- "rtmgr"
+ "routing-manager/pkg/rtmgr"
"strconv"
)
-var socket mangos.Socket
+type NngPub struct {
+ Sbi
+ socket NngSocket
+ NewSocket CreateNewNngSocketHandler
+}
-func createNngPubEndpointSocket(ep *rtmgr.Endpoint) error {
- return nil
+func NewNngPub() *NngPub {
+ instance := new(NngPub)
+ instance.NewSocket = createNewPubSocket
+ return instance
}
-func destroyNngPubEndpointSocket(ep *rtmgr.Endpoint) error {
- return nil
+func createNewPubSocket() (NngSocket, error) {
+ rtmgr.Logger.Debug("Invoked createNewPubSocket()")
+ s, err := pub.NewSocket()
+ if err != nil {
+ return nil, errors.New("can't create new pub socket due to: " + err.Error())
+ }
+ return s, nil
}
-/*
-Creates the NNG publication channel
-*/
-func openNngPub(ip string) error {
+func (c *NngPub) Initialize(ip string) error {
+ rtmgr.Logger.Debug("Invoked sbi.Initialize("+ ip +")")
var err error
- if socket, err = pub.NewSocket(); err != nil {
- return errors.New("can't get new pub socket due to:" + err.Error())
+ c.socket, err = c.NewSocket()
+ if err != nil {
+ return errors.New("create socket error due to: " + err.Error())
+ }
+ if err = c.listen(ip); err != nil {
+ return errors.New("can't listen on socket due to: " + err.Error())
}
+ return nil
+}
+
+func (c *NngPub) Terminate() error {
+ rtmgr.Logger.Debug("Invoked sbi.Terminate()")
+ return c.closeSocket()
+}
+
+func (c *NngPub) AddEndpoint(ep *rtmgr.Endpoint) error {
+ return nil
+}
+
+func (c *NngPub) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+ return nil
+}
+
+func (c *NngPub) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+ c.updateEndpoints(rcs, c)
+}
+
+func (c *NngPub) listen(ip string) error {
+ rtmgr.Logger.Debug("Start listening on: " + ip)
uri := DEFAULT_NNG_PUBSUB_SOCKET_PREFIX + ip + ":" + strconv.Itoa(DEFAULT_NNG_PUBSUB_SOCKET_NUMBER)
rtmgr.Logger.Info("publishing on: " + uri)
- if err = socket.Listen(uri); err != nil {
- return errors.New("can't publish on socket " + uri + " due to:" + err.Error())
+ if err := c.socket.(NngSocket).Listen(uri); err != nil {
+ return errors.New("can't publish on socket " + uri + " due to: " + err.Error())
}
return nil
}
-func closeNngPub() error {
- if err := socket.Close(); err != nil {
- return errors.New("can't close socket due to:" + err.Error())
+func (c *NngPub) closeSocket() error {
+ rtmgr.Logger.Debug("Close NngPub Socket")
+ if err := c.socket.(NngSocket).Close(); err != nil {
+ return errors.New("can't close socket due to: " + err.Error())
}
return nil
}
-func publishAll(policies *[]string) error {
+func (c *NngPub) DistributeAll(policies *[]string) error {
+ rtmgr.Logger.Debug("Invoked: sbi.DistributeAll(), args: %v",(*policies))
for _, pe := range *policies {
- if err := socket.Send([]byte(pe)); err != nil {
+ if err := c.socket.(NngSocket).Send([]byte(pe)); err != nil {
return errors.New("Unable to send policy entry due to: " + err.Error())
}
}
- rtmgr.Logger.Info("NNG PUB: OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+ rtmgr.Logger.Info("NNG PUB: OK (# of Entries: " + strconv.Itoa(len((*policies))) + ")")
return nil
}