Removal of go mangoes and using RMR nng
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
index 01bde0c..4451299 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
 ==================================================================================
 */
 /*
 
 package sbi
 
 
 package sbi
 
+/*
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <rmr/rmr.h>
+#include <rmr/RIC_message_types.h>
+
+
+#cgo CFLAGS: -I../
+#cgo LDFLAGS: -lrmr_nng -lnng
+*/
+import "C"
+
 import (
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
        "errors"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/push"
        _ "nanomsg.org/go/mangos/v2/transport/all"
        "nanomsg.org/go/mangos/v2"
        "nanomsg.org/go/mangos/v2/protocol/push"
        _ "nanomsg.org/go/mangos/v2/transport/all"
-       "rtmgr"
+       "routing-manager/pkg/rtmgr"
        "strconv"
        "strconv"
+       "time"
+       "fmt"
 )
 
 )
 
-func openNngPush(ip string) error {
-       return nil
+type NngPush struct {
+       Sbi
+       NewSocket CreateNewNngSocketHandler
+       rcChan    chan *xapp.RMRParams
 }
 
 }
 
-func closeNngPush() error {
-       return nil
+type RMRParams struct {
+        *xapp.RMRParams
 }
 
 }
 
-func createNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi.createNngPushEndpointSocket")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       s, err := push.NewSocket()
-       if err != nil {
-               return errors.New("can't open push socket for endpoint: " + ep.Name +" due to:" + err.Error())
-       }
-       s.SetPipeEventHook(pipeEventHandler)
-       ep.Socket = s
-       dial(ep)
-       return nil
+
+func (params *RMRParams) String() string {
+        var b bytes.Buffer
+        sum := md5.Sum(params.Payload)
+        fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
+        return b.String()
 }
 
 }
 
-func destroyNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi.destroyNngPushEndpointSocket")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       if err:= ep.Socket.(mangos.Socket).Close(); err != nil {
-                       return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to:" + err.Error())
-               }
-       return nil
+func NewNngPush() *NngPush {
+       instance := new(NngPush)
+       instance.NewSocket = createNewPushSocket
+       return instance
+}
+
+func createNewPushSocket() (NngSocket, error) {
+       xapp.Logger.Debug("Invoked: createNewPushSocket()")
+       socket, err := push.NewSocket()
+       if err != nil {
+               return nil, errors.New("can't create new push socket due to:" + err.Error())
+       }
+       socket.SetPipeEventHook(pipeEventHandler)
+       return socket, nil
 }
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
 }
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
+       xapp.Logger.Debug("Invoked: pipeEventHandler()")
+       xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
        for _, ep := range rtmgr.Eps {
        for _, ep := range rtmgr.Eps {
-               uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+               uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
                if uri == pipe.Address() {
                        switch event {
                        case 1:
                                ep.IsReady = true
                if uri == pipe.Address() {
                        switch event {
                        case 1:
                                ep.IsReady = true
-                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully registered")
+                               xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
                        default:
                                ep.IsReady = false
                        default:
                                ep.IsReady = false
-                               rtmgr.Logger.Debug("Endpoint " + uri + " has been deregistered")
+                               xapp.Logger.Debug("Endpoint " + uri + " has been detached")
                        }
                        }
-               }       
+               }
+       }
+}
+
+func (c *NngPush) Initialize(ip string) error {
+       return nil
+}
+
+func (c *NngPush) Terminate() error {
+       return nil
+}
+
+func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+
+       xapp.Logger.Debug("Invoked sbi.AddEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
+       endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
+       ep.Whid = int(xapp.Rmr.Openwh(endpoint))
+       if ep.Whid < 0   {
+               return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
+       }else {
+               xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
        }
        }
+
+       return nil
+}
+
+func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
+
+       xapp.Rmr.Closewh(ep.Whid)
+       return nil
+}
+
+func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+       c.updateEndpoints(rcs, c)
 }
 
 /*
 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
 */
 }
 
 /*
 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
 */
-func dial(ep *rtmgr.Endpoint) {
-       rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
-       uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
+func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
+       uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
        options := make(map[string]interface{})
        options[mangos.OptionDialAsynch] = true
        options := make(map[string]interface{})
        options[mangos.OptionDialAsynch] = true
-       if err := ep.Socket.(mangos.Socket).DialOptions(uri, options); err != nil {
-               rtmgr.Logger.Error("can't dial on push socket to " + uri + " due to:" + err.Error())
+       if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
+               return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
        }
        }
+       return nil
 }
 
 }
 
-func pushAll(policies *[]string) error {
-       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
-       rtmgr.Logger.Debug("args: %v", (*policies))
+func (c *NngPush) DistributeAll(policies *[]string) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeAll")
+       xapp.Logger.Debug("args: %v", *policies)
+
        for _, ep := range rtmgr.Eps {
        for _, ep := range rtmgr.Eps {
-               if ep.IsReady {
-                       go send(ep, policies)
-               } else {
-                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + "is not ready")
-               }
+               go c.send(ep, policies)
        }
        }
+
        return nil
 }
 
        return nil
 }
 
-func send(ep *rtmgr.Endpoint, policies *[]string) {
-       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
-       rtmgr.Logger.Debug("Push policy to endpoint: "+ ep.Uuid)
+func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
+       xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+
        for _, pe := range *policies {
        for _, pe := range *policies {
-               if err := ep.Socket.(mangos.Socket).Send([]byte(pe)); err != nil {
-                       rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
-               }
+               params := &RMRParams{&xapp.RMRParams{}}
+               params.Mtype = 20
+               params.PayloadLen = len([]byte(pe))
+               params.Payload =[]byte(pe)
+               params.Mbuf = nil
+               params.Whid = ep.Whid
+               time.Sleep(1 * time.Millisecond)
+               xapp.Rmr.SendMsg(params.RMRParams)
        }
        }
-       rtmgr.Logger.Info("NNG PUSH to ednpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+       xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
+}
+
+func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
+       return c.createEndpoint(payload, c)
 }
 }
+
+func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
+       xapp.Logger.Debug("args: %v", *policies)
+
+       go c.send(ep, policies)
+
+       return nil
+}
+