-func createNewPushSocket() (NngSocket, error) {
- xapp.Logger.Debug("Invoked: createNewPushSocket()")
- socket, err := push.NewSocket()
- if err != nil {
- return nil, errors.New("can't create new push socket due to:" + err.Error())
- }
- socket.SetPipeEventHook(pipeEventHandler)
- return socket, nil
-}
-
-func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
- xapp.Logger.Debug("Invoked: pipeEventHandler()")
- xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
- for _, ep := range rtmgr.Eps {
- uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
- if uri == pipe.Address() {
- switch event {
- case 1:
- ep.IsReady = true
- xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
- default:
- ep.IsReady = false
- xapp.Logger.Debug("Endpoint " + uri + " has been detached")
- }
- }
- }
-}
-