getAllSubscriptions API (RM -> SM) during restart of routing manager handled
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
index 1f062d3..b270abf 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
 ==================================================================================
 */
 /*
 
 package sbi
 
 
 package sbi
 
+/*
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <rmr/rmr.h>
+#include <rmr/RIC_message_types.h>
+
+
+#cgo CFLAGS: -I../
+#cgo LDFLAGS: -lrmr_nng -lnng
+*/
+import "C"
+
 import (
        "errors"
 import (
        "errors"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/push"
        _ "nanomsg.org/go/mangos/v2/transport/all"
        "routing-manager/pkg/rtmgr"
        "strconv"
        "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/push"
        _ "nanomsg.org/go/mangos/v2/transport/all"
        "routing-manager/pkg/rtmgr"
        "strconv"
+       "time"
 )
 
 type NngPush struct {
        Sbi
        NewSocket CreateNewNngSocketHandler
 )
 
 type NngPush struct {
        Sbi
        NewSocket CreateNewNngSocketHandler
+       rcChan    chan *xapp.RMRParams
 }
 
 func NewNngPush() *NngPush {
 }
 
 func NewNngPush() *NngPush {
@@ -45,7 +66,7 @@ func NewNngPush() *NngPush {
 }
 
 func createNewPushSocket() (NngSocket, error) {
 }
 
 func createNewPushSocket() (NngSocket, error) {
-       rtmgr.Logger.Debug("Invoked: createNewPushSocket()")
+       xapp.Logger.Debug("Invoked: createNewPushSocket()")
        socket, err := push.NewSocket()
        if err != nil {
                return nil, errors.New("can't create new push socket due to:" + err.Error())
        socket, err := push.NewSocket()
        if err != nil {
                return nil, errors.New("can't create new push socket due to:" + err.Error())
@@ -55,17 +76,18 @@ func createNewPushSocket() (NngSocket, error) {
 }
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
 }
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
-       rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
+       xapp.Logger.Debug("Invoked: pipeEventHandler()")
+       xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
        for _, ep := range rtmgr.Eps {
                uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
                if uri == pipe.Address() {
                        switch event {
                        case 1:
                                ep.IsReady = true
        for _, ep := range rtmgr.Eps {
                uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
                if uri == pipe.Address() {
                        switch event {
                        case 1:
                                ep.IsReady = true
-                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully attached")
+                               xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
                        default:
                                ep.IsReady = false
                        default:
                                ep.IsReady = false
-                               rtmgr.Logger.Debug("Endpoint " + uri + " has been detached")
+                               xapp.Logger.Debug("Endpoint " + uri + " has been detached")
                        }
                }
        }
                        }
                }
        }
@@ -82,8 +104,8 @@ func (c *NngPush) Terminate() error {
 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
        var err error
        var socket NngSocket
 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
        var err error
        var socket NngSocket
-       rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
-       rtmgr.Logger.Debug("args: %v", *ep)
+       xapp.Logger.Debug("Invoked sbi.AddEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
        socket, err = c.NewSocket()
        if err != nil {
                return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
        socket, err = c.NewSocket()
        if err != nil {
                return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
@@ -97,8 +119,8 @@ func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
 }
 
 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
 }
 
 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
-       rtmgr.Logger.Debug("args: %v", *ep)
+       xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
        if err := ep.Socket.(NngSocket).Close(); err != nil {
                return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
        }
        if err := ep.Socket.(NngSocket).Close(); err != nil {
                return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
        }
@@ -113,7 +135,7 @@ func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
 */
 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
 */
 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
+       xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
        uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
        options := make(map[string]interface{})
        options[mangos.OptionDialAsynch] = true
        uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
        options := make(map[string]interface{})
        options[mangos.OptionDialAsynch] = true
@@ -124,24 +146,30 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
 }
 
 func (c *NngPush) DistributeAll(policies *[]string) error {
 }
 
 func (c *NngPush) DistributeAll(policies *[]string) error {
-       rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
-       rtmgr.Logger.Debug("args: %v", *policies)
+       xapp.Logger.Debug("Invoked: sbi.DistributeAll")
+       xapp.Logger.Debug("args: %v", *policies)
        for _, ep := range rtmgr.Eps {
        for _, ep := range rtmgr.Eps {
-               if ep.IsReady {
-                       go c.send(ep, policies)
-               } else {
-                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
+               i := 1
+               for i < 5 {
+                       if ep.IsReady {
+                               go c.send(ep, policies)
+                               break
+                       } else {
+                               xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
+                               time.Sleep(10 * time.Millisecond)
+                               i++
+                       }
                }
        }
        return nil
 }
 
 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
                }
        }
        return nil
 }
 
 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
-       rtmgr.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+       xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
        for _, pe := range *policies {
                if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
        for _, pe := range *policies {
                if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
-                       rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
+                       xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
                }
        }
                }
        }
-       rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
+       xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
 }
 }