Fixed integration and subscription routes related issues for R3
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
index 6f2535c..4f56753 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
@@ -56,8 +60,9 @@ func createNewPushSocket() (NngSocket, error) {
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
        rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
+       rtmgr.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
        for _, ep := range rtmgr.Eps {
-               uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+               uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
                if uri == pipe.Address() {
                        switch event {
                        case 1:
@@ -83,7 +88,7 @@ func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
        var err error
        var socket NngSocket
        rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
-       rtmgr.Logger.Debug("args: %v", (*ep))
+       rtmgr.Logger.Debug("args: %v", *ep)
        socket, err = c.NewSocket()
        if err != nil {
                return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
@@ -98,10 +103,10 @@ func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
 
 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
        rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       if err:= ep.Socket.(NngSocket).Close(); err != nil {
-                       return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
-               }
+       rtmgr.Logger.Debug("args: %v", *ep)
+       if err := ep.Socket.(NngSocket).Close(); err != nil {
+               return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
+       }
        return nil
 }
 
@@ -114,7 +119,7 @@ NOTE: Asynchronous dial starts a goroutine which keep maintains the connection t
 */
 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
        rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
-       uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+       uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
        options := make(map[string]interface{})
        options[mangos.OptionDialAsynch] = true
        if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
@@ -125,7 +130,7 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
 
 func (c *NngPush) DistributeAll(policies *[]string) error {
        rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
-       rtmgr.Logger.Debug("args: %v", (*policies))
+       rtmgr.Logger.Debug("args: %v", *policies)
        for _, ep := range rtmgr.Eps {
                if ep.IsReady {
                        go c.send(ep, policies)
@@ -137,11 +142,11 @@ func (c *NngPush) DistributeAll(policies *[]string) error {
 }
 
 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
-       rtmgr.Logger.Debug("Push policy to endpoint: "+ ep.Uuid)
+       rtmgr.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
        for _, pe := range *policies {
                if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
                        rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
                }
        }
-       rtmgr.Logger.Info("NNG PUSH to ednpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+       rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
 }