Release rtmgr v0.6.1
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
index 01bde0c..74e077b 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
 ==================================================================================
 */
 /*
 package sbi
 
 import (
 package sbi
 
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
        "errors"
-       "nanomsg.org/go/mangos/v2"
-       "nanomsg.org/go/mangos/v2/protocol/push"
-       _ "nanomsg.org/go/mangos/v2/transport/all"
-       "rtmgr"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "routing-manager/pkg/rtmgr"
        "strconv"
        "strconv"
+       //"time"
+       "fmt"
 )
 
 )
 
-func openNngPush(ip string) error {
+type NngPush struct {
+       Sbi
+       rcChan chan *xapp.RMRParams
+}
+
+type RMRParams struct {
+       *xapp.RMRParams
+}
+
+func (params *RMRParams) String() string {
+       var b bytes.Buffer
+       sum := md5.Sum(params.Payload)
+       fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
+       return b.String()
+}
+
+func NewNngPush() *NngPush {
+       instance := new(NngPush)
+       return instance
+}
+
+func (c *NngPush) Initialize(ip string) error {
        return nil
 }
 
        return nil
 }
 
-func closeNngPush() error {
+func (c *NngPush) Terminate() error {
        return nil
 }
 
        return nil
 }
 
-func createNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi.createNngPushEndpointSocket")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       s, err := push.NewSocket()
-       if err != nil {
-               return errors.New("can't open push socket for endpoint: " + ep.Name +" due to:" + err.Error())
+func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+
+       xapp.Logger.Debug("Invoked sbi.AddEndpoint")
+       endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
+       ep.Whid = int(xapp.Rmr.Openwh(endpoint))
+       if ep.Whid < 0 {
+               return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
+       } else {
+               xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
        }
        }
-       s.SetPipeEventHook(pipeEventHandler)
-       ep.Socket = s
-       dial(ep)
+
        return nil
 }
 
        return nil
 }
 
-func destroyNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi.destroyNngPushEndpointSocket")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       if err:= ep.Socket.(mangos.Socket).Close(); err != nil {
-                       return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to:" + err.Error())
-               }
+func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
+
+       xapp.Rmr.Closewh(ep.Whid)
        return nil
 }
 
        return nil
 }
 
-func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
-       for _, ep := range rtmgr.Eps {
-               uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
-               if uri == pipe.Address() {
-                       switch event {
-                       case 1:
-                               ep.IsReady = true
-                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully registered")
-                       default:
-                               ep.IsReady = false
-                               rtmgr.Logger.Debug("Endpoint " + uri + " has been deregistered")
-                       }
-               }       
-       }
+func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+       c.updateEndpoints(rcs, c)
 }
 
 }
 
-/*
-NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
-*/
-func dial(ep *rtmgr.Endpoint) {
-       rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
-       uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
-       options := make(map[string]interface{})
-       options[mangos.OptionDialAsynch] = true
-       if err := ep.Socket.(mangos.Socket).DialOptions(uri, options); err != nil {
-               rtmgr.Logger.Error("can't dial on push socket to " + uri + " due to:" + err.Error())
-       }
-}
+func (c *NngPush) DistributeAll(policies *[]string) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeAll")
+       xapp.Logger.Debug("args: %v", *policies)
 
 
-func pushAll(policies *[]string) error {
-       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
-       rtmgr.Logger.Debug("args: %v", (*policies))
        for _, ep := range rtmgr.Eps {
        for _, ep := range rtmgr.Eps {
-               if ep.IsReady {
-                       go send(ep, policies)
-               } else {
-                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + "is not ready")
-               }
+               go c.send(ep, policies)
        }
        }
+
        return nil
 }
 
        return nil
 }
 
-func send(ep *rtmgr.Endpoint, policies *[]string) {
-       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
-       rtmgr.Logger.Debug("Push policy to endpoint: "+ ep.Uuid)
+func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
+       xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+
+       var policy = []byte{}
+       cumulative_policy := 0
+       count := 0
+       maxrecord := xapp.Config.GetInt("maxrecord")
+       if maxrecord == 0 {
+               maxrecord = 10
+       }
+
        for _, pe := range *policies {
        for _, pe := range *policies {
-               if err := ep.Socket.(mangos.Socket).Send([]byte(pe)); err != nil {
-                       rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
+               b := []byte(pe)
+               for j := 0; j < len(b); j++ {
+                       policy = append(policy, b[j])
+               }
+               count++
+               cumulative_policy++
+               if count == maxrecord || cumulative_policy == len(*policies) {
+                       params := &RMRParams{&xapp.RMRParams{}}
+                       params.Mtype = 20
+                       params.PayloadLen = len(policy)
+                       params.Payload = []byte(policy)
+                       params.Mbuf = nil
+                       params.Whid = ep.Whid
+                       xapp.Rmr.SendMsg(params.RMRParams)
+                       count = 0
+                       policy = nil
+                       xapp.Logger.Debug("Sent message with payload len = %d to %s", params.PayloadLen, ep.Uuid)
                }
        }
                }
        }
-       rtmgr.Logger.Info("NNG PUSH to ednpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+
+       xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
+}
+
+func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
+       return c.createEndpoint(payload, c)
+}
+
+func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
+       xapp.Logger.Debug("args: %v", *policies)
+
+       go c.send(ep, policies)
+
+       return nil
 }
 }