Added unit test cases for rtmgr package
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
index 01bde0c..1f0e0e6 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
 ==================================================================================
 */
 /*
-  Mnemonic:    nngpipe.go
-  Abstract: mangos (NNG) Pipeline SBI implementation
+  Mnemonic:    rmrpipe.go
+  Abstract: mangos (RMR) Pipeline SBI implementation
   Date:                12 March 2019
 */
 
 package sbi
 
   Date:                12 March 2019
 */
 
 package sbi
 
+/*
+#include <rmr/rmr.h>
+*/
+import "C"
+
 import (
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
        "errors"
-       "nanomsg.org/go/mangos/v2"
-       "nanomsg.org/go/mangos/v2/protocol/push"
-       _ "nanomsg.org/go/mangos/v2/transport/all"
-       "rtmgr"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "routing-manager/pkg/rtmgr"
        "strconv"
        "strconv"
+       "strings"
+       "fmt"
 )
 
 )
 
-func openNngPush(ip string) error {
+var rmrcallid = 1
+var rmrdynamiccallid = 201
+
+type RmrPush struct {
+       Sbi
+       rcChan chan *xapp.RMRParams
+}
+
+type EPStatus struct {
+        endpoint string
+        status   bool
+}
+
+type RMRParams struct {
+       *xapp.RMRParams
+}
+
+func (params *RMRParams) String() string {
+       var b bytes.Buffer
+       sum := md5.Sum(params.Payload)
+       fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
+       return b.String()
+}
+
+func NewRmrPush() *RmrPush {
+       instance := new(RmrPush)
+       return instance
+}
+
+func (c *RmrPush) Initialize(ip string) error {
        return nil
 }
 
        return nil
 }
 
-func closeNngPush() error {
+func (c *RmrPush) Terminate() error {
        return nil
 }
 
        return nil
 }
 
-func createNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi.createNngPushEndpointSocket")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       s, err := push.NewSocket()
-       if err != nil {
-               return errors.New("can't open push socket for endpoint: " + ep.Name +" due to:" + err.Error())
+func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+
+       xapp.Logger.Debug("Invoked sbi.AddEndpoint")
+       endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
+       ep.Whid = int(xapp.Rmr.Openwh(endpoint))
+       if ep.Whid < 0 {
+               return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
+       } else {
+               xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
        }
        }
-       s.SetPipeEventHook(pipeEventHandler)
-       ep.Socket = s
-       dial(ep)
+
        return nil
 }
 
        return nil
 }
 
-func destroyNngPushEndpointSocket(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi.destroyNngPushEndpointSocket")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       if err:= ep.Socket.(mangos.Socket).Close(); err != nil {
-                       return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to:" + err.Error())
-               }
+func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
+
+       xapp.Rmr.Closewh(ep.Whid)
        return nil
 }
 
        return nil
 }
 
-func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
-       for _, ep := range rtmgr.Eps {
-               uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
-               if uri == pipe.Address() {
-                       switch event {
-                       case 1:
-                               ep.IsReady = true
-                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully registered")
-                       default:
-                               ep.IsReady = false
-                               rtmgr.Logger.Debug("Endpoint " + uri + " has been deregistered")
-                       }
-               }       
-       }
+func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+       c.updateEndpoints(rcs, c)
 }
 
 }
 
-/*
-NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
-*/
-func dial(ep *rtmgr.Endpoint) {
-       rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
-       uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
-       options := make(map[string]interface{})
-       options[mangos.OptionDialAsynch] = true
-       if err := ep.Socket.(mangos.Socket).DialOptions(uri, options); err != nil {
-               rtmgr.Logger.Error("can't dial on push socket to " + uri + " due to:" + err.Error())
-       }
-}
+func (c *RmrPush) DistributeAll(policies *[]string) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeAll")
+       xapp.Logger.Debug("args: %v", *policies)
+
+       /*for _, ep := range rtmgr.Eps {
+               go c.send(ep, policies)
+       }*/
+       channel := make(chan EPStatus)
 
 
-func pushAll(policies *[]string) error {
-       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
-       rtmgr.Logger.Debug("args: %v", (*policies))
-       for _, ep := range rtmgr.Eps {
-               if ep.IsReady {
-                       go send(ep, policies)
-               } else {
-                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + "is not ready")
-               }
+       if rmrcallid == 200 {
+               rmrcallid = 1
        }
        }
+
+        for _, ep := range rtmgr.Eps {
+                go c.send_sync(ep,  policies, channel, rmrcallid)
+        }
+       rmrcallid++
+
+        count := 0
+        result := make([]EPStatus, len(rtmgr.Eps))
+        for i, _ := range result {
+                result[i] = <-channel
+                if result[i].status == true {
+                        count++
+                } else {
+                        xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
+                }
+        }
+
+        if count < len(rtmgr.Eps) {
+                return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
+        }
+
+
        return nil
 }
 
        return nil
 }
 
-func send(ep *rtmgr.Endpoint, policies *[]string) {
-       rtmgr.Logger.Debug("Invoked: sbi.pushAll")
-       rtmgr.Logger.Debug("Push policy to endpoint: "+ ep.Uuid)
-       for _, pe := range *policies {
-               if err := ep.Socket.(mangos.Socket).Send([]byte(pe)); err != nil {
-                       rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
-               }
+func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
+        xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+
+       ret := c.send_data(ep, policies, call_id)
+
+        channel <- EPStatus{ep.Uuid, ret}
+
+}
+
+func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
+        xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
+        var state int
+        var retstr string
+
+        var policy = []byte{}
+
+        for _, pe := range *policies {
+                b:= []byte(pe)
+                for j:=0; j<len(b); j++{
+                        policy = append(policy,b[j])
+                }
        }
        }
-       rtmgr.Logger.Info("NNG PUSH to ednpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+        params := &RMRParams{&xapp.RMRParams{}}
+        params.Mtype = 20
+        params.PayloadLen = len(policy)
+        params.Payload =[]byte(policy)
+        params.Mbuf = nil
+        params.Whid = ep.Whid
+        params.Callid = call_id
+        params.Timeout = 200
+        state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+       routestatus := strings.Split(retstr," ")
+        if state != C.RMR_OK && routestatus[0] == "OK" {
+              xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+              return false
+        } else {
+               xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+              return true
+        }
+
+        xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
+        return false
+}
+
+func (c *RmrPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
+       return c.createEndpoint(payload, c)
+}
+
+func (c *RmrPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
+       xapp.Logger.Debug("args: %v", *policies)
+
+       if rmrdynamiccallid == 255 {
+                rmrdynamiccallid = 201
+        }
+
+       go c.send_data(ep, policies,rmrdynamiccallid)
+       rmrdynamiccallid++
+
+       return nil
 }
 }