func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
for _, ep := range rtmgr.Eps {
- uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+ uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
if uri == pipe.Address() {
switch event {
case 1:
var err error
var socket NngSocket
rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
- rtmgr.Logger.Debug("args: %v", (*ep))
+ rtmgr.Logger.Debug("args: %v", *ep)
socket, err = c.NewSocket()
if err != nil {
return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
- rtmgr.Logger.Debug("args: %v", (*ep))
- if err:= ep.Socket.(NngSocket).Close(); err != nil {
- return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
- }
+ rtmgr.Logger.Debug("args: %v", *ep)
+ if err := ep.Socket.(NngSocket).Close(); err != nil {
+ return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
+ }
return nil
}
*/
func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
- uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+ uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
options := make(map[string]interface{})
options[mangos.OptionDialAsynch] = true
if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
func (c *NngPush) DistributeAll(policies *[]string) error {
rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
- rtmgr.Logger.Debug("args: %v", (*policies))
+ rtmgr.Logger.Debug("args: %v", *policies)
for _, ep := range rtmgr.Eps {
if ep.IsReady {
go c.send(ep, policies)
}
func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
- rtmgr.Logger.Debug("Push policy to endpoint: "+ ep.Uuid)
+ rtmgr.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
for _, pe := range *policies {
if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
}
}
- rtmgr.Logger.Info("NNG PUSH to ednpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+ rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
}