Release of Routing Manager v0.3.0
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
index 01bde0c..6f2535c 100644 (file)
@@ -29,87 +29,117 @@ import (
        "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/push"
        _ "nanomsg.org/go/mangos/v2/transport/all"
-       "rtmgr"
+       "routing-manager/pkg/rtmgr"
        "strconv"
 )
 
-func openNngPush(ip string) error {
-       return nil
+type NngPush struct {
+       Sbi
+       NewSocket CreateNewNngSocketHandler
 }
 
-func closeNngPush() error {
-       return nil
+func NewNngPush() *NngPush {
+       instance := new(NngPush)
+       instance.NewSocket = createNewPushSocket
+       return instance
 }
 
-func createNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi.createNngPushEndpointSocket")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       s, err := push.NewSocket()
+func createNewPushSocket() (NngSocket, error) {
+       rtmgr.Logger.Debug("Invoked: createNewPushSocket()")
+       socket, err := push.NewSocket()
        if err != nil {
-               return errors.New("can't open push socket for endpoint: " + ep.Name +" due to:" + err.Error())
+               return nil, errors.New("can't create new push socket due to:" + err.Error())
        }
-       s.SetPipeEventHook(pipeEventHandler)
-       ep.Socket = s
-       dial(ep)
-       return nil
-}
-
-func destroyNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi.destroyNngPushEndpointSocket")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       if err:= ep.Socket.(mangos.Socket).Close(); err != nil {
-                       return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to:" + err.Error())
-               }
-       return nil
+       socket.SetPipeEventHook(pipeEventHandler)
+       return socket, nil
 }
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
+       rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
        for _, ep := range rtmgr.Eps {
                uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
                if uri == pipe.Address() {
                        switch event {
                        case 1:
                                ep.IsReady = true
-                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully registered")
+                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully attached")
                        default:
                                ep.IsReady = false
-                               rtmgr.Logger.Debug("Endpoint " + uri + " has been deregistered")
+                               rtmgr.Logger.Debug("Endpoint " + uri + " has been detached")
                        }
-               }       
+               }
+       }
+}
+
+func (c *NngPush) Initialize(ip string) error {
+       return nil
+}
+
+func (c *NngPush) Terminate() error {
+       return nil
+}
+
+func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+       var err error
+       var socket NngSocket
+       rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
+       rtmgr.Logger.Debug("args: %v", (*ep))
+       socket, err = c.NewSocket()
+       if err != nil {
+               return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
+       }
+       ep.Socket = socket
+       err = c.dial(ep)
+       if err != nil {
+               return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
        }
+       return nil
+}
+
+func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+       rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
+       rtmgr.Logger.Debug("args: %v", (*ep))
+       if err:= ep.Socket.(NngSocket).Close(); err != nil {
+                       return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
+               }
+       return nil
+}
+
+func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+       c.updateEndpoints(rcs, c)
 }
 
 /*
 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
 */
-func dial(ep *rtmgr.Endpoint) {
+func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
        rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
        uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
        options := make(map[string]interface{})
        options[mangos.OptionDialAsynch] = true
-       if err := ep.Socket.(mangos.Socket).DialOptions(uri, options); err != nil {
-               rtmgr.Logger.Error("can't dial on push socket to " + uri + " due to:" + err.Error())
+       if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
+               return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
        }
+       return nil
 }
 
-func pushAll(policies *[]string) error {
-       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
+func (c *NngPush) DistributeAll(policies *[]string) error {
+       rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
        rtmgr.Logger.Debug("args: %v", (*policies))
        for _, ep := range rtmgr.Eps {
                if ep.IsReady {
-                       go send(ep, policies)
+                       go c.send(ep, policies)
                } else {
-                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + "is not ready")
+                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
                }
        }
        return nil
 }
 
-func send(ep *rtmgr.Endpoint, policies *[]string) {
-       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
+func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
        rtmgr.Logger.Debug("Push policy to endpoint: "+ ep.Uuid)
        for _, pe := range *policies {
-               if err := ep.Socket.(mangos.Socket).Send([]byte(pe)); err != nil {
+               if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
                        rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
                }
        }