Adding RMR Wormhole support 67/2667/2 ad0a271 v0.0.29 v0.0.30
authorwahidw <abdulwahid.w@nokia.com>
Wed, 4 Mar 2020 09:54:15 +0000 (09:54 +0000)
committerwahidw <abdulwahid.w@nokia.com>
Wed, 4 Mar 2020 10:15:20 +0000 (10:15 +0000)
Change-Id: I6e632110fff39e01ca754e0756aa060ab9cb3348
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
ci/Dockerfile
config/config-file.yaml
pkg/xapp/mtypes.go
pkg/xapp/rmr.go
pkg/xapp/xapp_test.go

index 3372520..fadb85a 100755 (executable)
@@ -18,7 +18,6 @@
 #----------------------------------------------------------
 
 FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:4-u18.04-nng as xapp-base
-
 RUN apt-get update -y \
     &&apt-get install -y \
     apt-utils \
@@ -36,7 +35,7 @@ RUN apt-get update -y \
 RUN curl -s https://packagecloud.io/install/repositories/o-ran-sc/master/script.deb.sh | bash
 
 # RMR
-ARG RMRVERSION=1.13.1
+ARG RMRVERSION=3.2.6
 #RUN apt-get install -y rmr=${RMRVERSION} rmr-dev=${RMRVERSION}
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb
index 7678c5a..42236ec 100755 (executable)
@@ -22,6 +22,7 @@
   "maxSize": 2072
   "numWorkers": 1
   "maxRetryOnFailure": 5
+  "threadType": 0
 "subscription":
     "host": "localhost:8088"
     "timeout": 2
index 5577823..856d7ad 100644 (file)
@@ -107,6 +107,9 @@ var RICMessageTypes = map[string]int{
        "RIC_E2_MANAGER_HC_RESPONSE":          C.RIC_E2_MANAGER_HC_RESPONSE,
        "RIC_CONTROL_XAPP_CONFIG_REQUEST":     C.RIC_CONTROL_XAPP_CONFIG_REQUEST,
        "RIC_CONTROL_XAPP_CONFIG_RESPONSE":    C.RIC_CONTROL_XAPP_CONFIG_RESPONSE,
+       "RMRRM_TABLE_DATA":                    C.RMRRM_TABLE_DATA,
+       "RMRRM_REQ_TABLE":                     C.RMRRM_REQ_TABLE,
+       "RMRRM_TABLE_STATE":                   C.RMRRM_TABLE_STATE,
 }
 
 //-----------------------------------------------------------------------------
@@ -192,6 +195,9 @@ const (
        RIC_E2_MANAGER_HC_RESPONSE          = C.RIC_E2_MANAGER_HC_RESPONSE
        RIC_CONTROL_XAPP_CONFIG_REQUEST     = C.RIC_CONTROL_XAPP_CONFIG_REQUEST
        RIC_CONTROL_XAPP_CONFIG_RESPONSE    = C.RIC_CONTROL_XAPP_CONFIG_RESPONSE
+       RMRRM_TABLE_DATA                    = C.RMRRM_TABLE_DATA
+       RMRRM_REQ_TABLE                     = C.RMRRM_REQ_TABLE
+       RMRRM_TABLE_STATE                   = C.RMRRM_TABLE_STATE
 )
 
 //-----------------------------------------------------------------------------
@@ -277,4 +283,7 @@ var RicMessageTypeToName = map[int]string{
        RIC_E2_MANAGER_HC_RESPONSE:          "RIC_E2_MANAGER_HC_RESPONSE",
        RIC_CONTROL_XAPP_CONFIG_REQUEST:     "RIC_CONTROL_XAPP_CONFIG_REQUEST",
        RIC_CONTROL_XAPP_CONFIG_RESPONSE:    "RIC_CONTROL_XAPP_CONFIG_RESPONSE",
+       RMRRM_TABLE_DATA:                    "RMRRM_TABLE_DATA",
+       RMRRM_REQ_TABLE:                     "RMRRM_REQ_TABLE",
+       RMRRM_TABLE_STATE:                   "RMRRM_TABLE_STATE",
 }
index 49a28d9..6707ab9 100755 (executable)
@@ -81,15 +81,19 @@ type RMRParams struct {
        SubId      int
        Src        string
        Mbuf       *C.rmr_mbuf_t
+       Whid       int
        status     int
 }
 
-func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDesc string) *RMRClient {
+func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, threadType int, statDesc string) *RMRClient {
        p := C.CString(protPort)
        m := C.int(maxSize)
+       c := C.int(threadType)
        defer C.free(unsafe.Pointer(p))
 
-       ctx := C.rmr_init(p, m, C.int(0))
+       //ctx := C.rmr_init(p, m, C.int(0))
+       //ctx := C.rmr_init(p, m, C.RMRFL_NOTHREAD)
+       ctx := C.rmr_init(p, m, c)
        if ctx == nil {
                Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
        }
@@ -104,7 +108,7 @@ func NewRMRClientWithParams(protPort string, maxSize int, numWorkers int, statDe
 }
 
 func NewRMRClient() *RMRClient {
-       return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), "RMR")
+       return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.numWorkers"), viper.GetInt("rmr.threadType"), "RMR")
 }
 
 func (m *RMRClient) Start(c MessageConsumer) {
@@ -262,24 +266,28 @@ func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
        }
        C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
 
-       params.status = m.SendBuf(txBuffer, isRts)
+       params.status = m.SendBuf(txBuffer, isRts, params.Whid)
        if params.status == int(C.RMR_OK) {
                return true
        }
        return false
 }
 
-func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
+func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
        var (
                currBuffer  *C.rmr_mbuf_t
                counterName string = "Transmitted"
        )
 
        txBuffer.state = 0
-       if isRts {
-               currBuffer = C.rmr_rts_msg(m.context, txBuffer)
+       if whid != 0 {
+               currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
        } else {
-               currBuffer = C.rmr_send_msg(m.context, txBuffer)
+               if isRts {
+                       currBuffer = C.rmr_rts_msg(m.context, txBuffer)
+               } else {
+                       currBuffer = C.rmr_send_msg(m.context, txBuffer)
+               }
        }
 
        if currBuffer == nil {
@@ -294,10 +302,14 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
        }
 
        for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
-               if isRts {
-                       currBuffer = C.rmr_rts_msg(m.context, currBuffer)
+               if whid != 0 {
+                       currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
                } else {
-                       currBuffer = C.rmr_send_msg(m.context, currBuffer)
+                       if isRts {
+                               currBuffer = C.rmr_rts_msg(m.context, txBuffer)
+                       } else {
+                               currBuffer = C.rmr_send_msg(m.context, txBuffer)
+                       }
                }
        }
 
@@ -312,6 +324,23 @@ func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) int {
        return int(currBuffer.state)
 }
 
+func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
+       return m.Wh_open(target)
+}
+
+func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
+       endpoint := C.CString(target)
+       return C.rmr_wh_open(m.context, endpoint)
+}
+
+func (m *RMRClient) Closewh(whid int) {
+       m.Wh_close(C.rmr_whid_t(whid))
+}
+
+func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
+       C.rmr_wh_close(m.context, whid)
+}
+
 func (m *RMRClient) IsRetryError(params *RMRParams) bool {
        if params.status == int(C.RMR_ERR_RETRY) {
                return true
index f0e3070..e36d1cb 100755 (executable)
@@ -143,6 +143,52 @@ func TestMessagesReceivedSuccessfully(t *testing.T) {
        }
 }
 
+func TestMessagesReceivedSuccessfullyUsingWh(t *testing.T) {
+       time.Sleep(time.Duration(5) * time.Second)
+       whid := Rmr.Openwh("localhost:4560")
+       time.Sleep(time.Duration(1) * time.Second)
+       for i := 0; i < 100; i++ {
+               params := &RMRParams{}
+               params.Mtype = 10004
+               params.SubId = -1
+               params.Payload = []byte{1, 2, 3, 4, 5, 6}
+               params.Meid = &RMRMeid{PlmnID: "1234", EnbID: "7788", RanName: "RanName-1234"}
+               params.Xid = "TestXID"
+               params.Whid = int(whid)
+               Rmr.SendMsg(params)
+       }
+
+       // Allow time to process the messages
+       time.Sleep(time.Duration(2) * time.Second)
+
+       waitForSdl := viper.GetBool("db.waitForSdl")
+       stats := getMetrics(t)
+       if !strings.Contains(stats, "ricxapp_RMR_Transmitted 200") {
+               t.Errorf("Error: ricxapp_RMR_Transmitted value incorrect: %v", stats)
+       }
+
+       if !strings.Contains(stats, "ricxapp_RMR_Received 200") {
+               t.Errorf("Error: ricxapp_RMR_Received value incorrect: %v", stats)
+       }
+
+       if !strings.Contains(stats, "ricxapp_RMR_TransmitError 0") {
+               t.Errorf("Error: ricxapp_RMR_TransmitError value incorrect")
+       }
+
+       if !strings.Contains(stats, "ricxapp_RMR_ReceiveError 0") {
+               t.Errorf("Error: ricxapp_RMR_ReceiveError value incorrect")
+       }
+
+       if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_Stored 200") {
+               t.Errorf("Error: ricxapp_SDL_Stored value incorrect")
+       }
+
+       if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_StoreError 0") {
+               t.Errorf("Error: ricxapp_SDL_StoreError value incorrect")
+       }
+       Rmr.Closewh(int(whid))
+}
+
 func TestSubscribeChannels(t *testing.T) {
        if !viper.GetBool("db.waitForSdl") {
                return