Added config and logger module from xapp-fwk. Added Routes related to A1Mediator...
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
index 4f56753..8438064 100644 (file)
@@ -30,6 +30,7 @@ package sbi
 
 import (
        "errors"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/push"
        _ "nanomsg.org/go/mangos/v2/transport/all"
@@ -49,7 +50,7 @@ func NewNngPush() *NngPush {
 }
 
 func createNewPushSocket() (NngSocket, error) {
-       rtmgr.Logger.Debug("Invoked: createNewPushSocket()")
+       xapp.Logger.Debug("Invoked: createNewPushSocket()")
        socket, err := push.NewSocket()
        if err != nil {
                return nil, errors.New("can't create new push socket due to:" + err.Error())
@@ -59,18 +60,18 @@ func createNewPushSocket() (NngSocket, error) {
 }
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
-       rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
-       rtmgr.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
+       xapp.Logger.Debug("Invoked: pipeEventHandler()")
+       xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
        for _, ep := range rtmgr.Eps {
                uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
                if uri == pipe.Address() {
                        switch event {
                        case 1:
                                ep.IsReady = true
-                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully attached")
+                               xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
                        default:
                                ep.IsReady = false
-                               rtmgr.Logger.Debug("Endpoint " + uri + " has been detached")
+                               xapp.Logger.Debug("Endpoint " + uri + " has been detached")
                        }
                }
        }
@@ -87,8 +88,8 @@ func (c *NngPush) Terminate() error {
 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
        var err error
        var socket NngSocket
-       rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
-       rtmgr.Logger.Debug("args: %v", *ep)
+       xapp.Logger.Debug("Invoked sbi.AddEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
        socket, err = c.NewSocket()
        if err != nil {
                return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
@@ -102,8 +103,8 @@ func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
 }
 
 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
-       rtmgr.Logger.Debug("args: %v", *ep)
+       xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
        if err := ep.Socket.(NngSocket).Close(); err != nil {
                return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
        }
@@ -118,7 +119,7 @@ func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
 */
 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
+       xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
        uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
        options := make(map[string]interface{})
        options[mangos.OptionDialAsynch] = true
@@ -129,24 +130,24 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
 }
 
 func (c *NngPush) DistributeAll(policies *[]string) error {
-       rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
-       rtmgr.Logger.Debug("args: %v", *policies)
+       xapp.Logger.Debug("Invoked: sbi.DistributeAll")
+       xapp.Logger.Debug("args: %v", *policies)
        for _, ep := range rtmgr.Eps {
                if ep.IsReady {
                        go c.send(ep, policies)
                } else {
-                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
+                       xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
                }
        }
        return nil
 }
 
 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
-       rtmgr.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+       xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
        for _, pe := range *policies {
                if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
-                       rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
+                       xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
                }
        }
-       rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
+       xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
 }