Subscription manager v0.10.0 19/1019/1
authorkalnagy <kalman.nagy@nokia.com>
Thu, 26 Sep 2019 14:28:25 +0000 (16:28 +0200)
committerkalnagy <kalman.nagy@nokia.com>
Thu, 26 Sep 2019 14:28:25 +0000 (16:28 +0200)
Contains v0.8.0-v0.10.0
Handle RICsubscriptionDeleteResponse message
Communicating RICsubscriptionDeleteResponse to routing manager
Updated transaction handling
Tracking Mbuf in transaction table

Change-Id: I0d4964b7bd717941a0e50ede3e9a878590079141
Signed-off-by: kalnagy <kalman.nagy@nokia.com>
16 files changed:
Dockerfile
RELNOTES
api/routing_manager.yaml
container-tag.yaml
e2ap/wrapper.c
e2ap/wrapper.h
pkg/control/client.go
pkg/control/control.go
pkg/control/registry.go
pkg/control/tracker.go
pkg/control/types.go
test/e2t/container/Dockerfile
test/rco/container/Dockerfile
test/rco/manifests/rco-dep.yaml
test/rco/rco.go
tmp/rmr.go [new file with mode: 0644]

index d1f9737..4ae45f0 100644 (file)
 #
 FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:1-u18.04-nng1.1.1 as submgrbuild
 
-COPY . /opt/submgr
+WORKDIR /tmp
 
 # Install RMr shared library
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.6.0_amd64.deb/download.deb && dpkg -i rmr_1.6.0_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.6.0_amd64.deb/download.deb && dpkg -i rmr_1.6.0_amd64.deb && rm -rf rmr_1.6.0_amd64.deb
 # Install RMr development header files
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.6.0_amd64.deb/download.deb && dpkg -i rmr-dev_1.6.0_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.6.0_amd64.deb/download.deb && dpkg -i rmr-dev_1.6.0_amd64.deb && rm -rf rmr-dev_1.6.0_amd64.deb
 
 # "PULLING LOG and COMPILING LOG"
 RUN git clone "https://gerrit.o-ran-sc.org/r/com/log" /opt/log && cd /opt/log && \
  ./autogen.sh && ./configure && make install && ldconfig
 
-# "COMPILING E2AP Wrapper"
-RUN cd /opt/submgr/e2ap && \
- gcc -c -fPIC -Iheaders/ lib/*.c wrapper.c && \
- gcc *.o -shared -o libwrapper.so && \
- cp libwrapper.so /usr/local/lib/ && \
- cp wrapper.h headers/*.h /usr/local/include/ && \
- ldconfig
-
 # "Installing Swagger"
 RUN cd /usr/local/go/bin \
     && wget --quiet https://github.com/go-swagger/go-swagger/releases/download/v0.19.0/swagger_linux_amd64 \
     && mv swagger_linux_amd64 swagger \
     && chmod +x swagger
 
+
+WORKDIR /opt/submgr
+COPY e2ap e2ap
+
+# "COMPILING E2AP Wrapper"
+RUN cd e2ap && \
+    gcc -c -fPIC -Iheaders/ lib/*.c wrapper.c && \
+    gcc *.o -shared -o libwrapper.so && \
+    cp libwrapper.so /usr/local/lib/ && \
+    cp wrapper.h headers/*.h /usr/local/include/ && \
+    ldconfig
+
+COPY api api
+
 # "Getting and generating routing managers api client"
-RUN cd /opt/submgr \
-    && git clone "https://gerrit.o-ran-sc.org/r/ric-plt/rtmgr" \
+RUN git clone "https://gerrit.o-ran-sc.org/r/ric-plt/rtmgr" \
     && cp rtmgr/api/routing_manager.yaml api/ \
     && rm -rf rtmgr
 
-RUN cd /opt/submgr \
-    && mkdir -p /root/go \
-    && /usr/local/go/bin/swagger generate client -f api/routing_manager.yaml -t pkg/ -m rtmgr_models -c rtmgr_client
+RUN mkdir pkg
+
+COPY go.mod go.mod
+
+RUN mkdir -p /root/go && \
+    /usr/local/go/bin/swagger generate client -f api/routing_manager.yaml -t pkg/ -m rtmgr_models -c rtmgr_client
+
+
+COPY pkg pkg
+COPY cmd cmd
+
+RUN /usr/local/go/bin/go mod tidy
+
+RUN git clone -b v0.0.8 "https://gerrit.o-ran-sc.org/r/ric-plt/xapp-frame" /tmp/xapp-frame
+COPY tmp/rmr.go /tmp/xapp-frame/pkg/xapp/rmr.go
+
+RUN /usr/local/go/bin/go mod edit -replace "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame"="/tmp/xapp-frame"
 
 # "COMPILING Subscription manager"
-RUN mkdir -p /opt/bin && cd /opt/submgr && \
- /usr/local/go/bin/go get && \
-  /usr/local/go/bin/go build -o /opt/bin/submgr ./cmd/submgr.go && \
+RUN mkdir -p /opt/bin && \
+  /usr/local/go/bin/go build -o /opt/bin/submgr cmd/submgr.go && \
      mkdir -p /opt/build/container/usr/local
 
+COPY config config
+
 FROM ubuntu:18.04
 
 COPY --from=submgrbuild /opt/bin/submgr /opt/submgr/config/submgr.yaml /
@@ -72,3 +92,4 @@ COPY --from=submgrbuild /usr/local/lib /usr/local/lib
 RUN ldconfig
 
 RUN chmod 755 /run_submgr.sh
+CMD /run_submgr.sh
index 83b6eca..72ef0b3 100644 (file)
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,13 @@
+### v0.10.0
+* Tracking Mbuf in transaction table
+
+### v0.9.0
+* Communicating RICsubscriptionDeleteResponse to routing manager
+* Updated transaction handling
+
+### v0.8.0
+* Handle RICsubscriptionDeleteResponse message
+
 ### v0.7.2
 * Correction for E2AP PDU handling
 * Correction for Transaction Table handling
index 8fabfb4..7bfbfea 100644 (file)
 #
 #
 #   Abstract:   Routing Manager's RESTful API definition
-#   Date:      29 March 2019
+#   Date:       28 August 2019
 #
 swagger: "2.0"
 info:
   title: Routing Manager
   description: "This is the Swagger/OpenAPI 2.0 definition of Routing Manager's Northbound API."
-  version: "0.3.0"
+  version: "0.4.0"
   license:
     name: "Apache 2.0"
     url: "http://www.apache.org/licenses/LICENSE-2.0.html"
@@ -126,8 +126,58 @@ paths:
           description: "Invalid data"
         201:
           description: "Xapp Subscription data received"
+    delete:
+      tags:
+      - "handle"
+      summary: "API for deleting an xapp subscription"
+      description: "By performing the delete operation on xapp-subscription-handle resource, the API caller will be able to update routing manager about the deletion of an xapp's subscription"
+      operationId: "delete_xapp_subscription_handle"
+      consumes:
+      - "application/json"
+      parameters:
+      - in: "body"
+        name: "xapp-subscription-data"
+        description: "xApp related subscription data"
+        required: true
+        schema:
+          $ref: "#/definitions/xapp-subscription-data"
+      responses:
+        204:
+          description: "Content not found"
+        200:
+          description: "Xapp Subscription deleted"
+  /handles/xapp-subscription-handle/{subscription_id}:
+    put:
+      tags:
+      - "handle"
+      summary: "API for updating the subscriber xApp list"
+      description: "By performing a PUT method on a xapp-subscription-handle/{subscription_id} resource, the API caller is able to update the Routing manager about the list of subscriber xApps related to the subscription denoted by the {subsription_id}."
+      operationId: "update_xapp_subscription_handle"
+      consumes:
+      - "application/json"
+#      - "application/yaml"
+      produces:
+      - "application/json"
+#      - "application/yaml"
+      parameters:
+        - in: path
+          name: subscription_id
+          required: true
+          type: integer
+          format: "uint16"
+          description: "Subscription ID"
+        - in: body
+          name: xapp-list
+          description: "xApp list"
+          required: true
           schema:
-            $ref: "#/definitions/xapp-subscription-data-response"
+           $ref: "#/definitions/xapp-list"
+      responses:
+        400:
+          description: "Invalid data"
+        201:
+          description: "Xapp list received"
+  
 definitions:
   health-status:
     type: "object"
@@ -141,8 +191,7 @@ definitions:
     type: "object"
     properties:
       id:
-        type: "integer"
-        format: "int64"
+        type: "string"
       event:
         type: "string"
       version:
@@ -166,16 +215,27 @@ definitions:
         maximum: 65535
       subscription_id: #subscription sequence number
         type: "integer"
-        format: "int32" 
-  xapp-subscription-data-response:
+        format: "int32"
+  xapp-list:
+    type: "array"
+    items:
+      $ref: '#/definitions/xapp-element'
+  xapp-element:
     type: "object"
     required:
-      - "location"
+      - "address"
+      - "port"
     properties:
-      location:
-        type: "string"
+      address:
+        type: "string" #This is the xapp instance hostname or ip address
+      port: #xapp instance port address
+        type: "integer"
+        format: "uint16"
+        minimum: 0
+        maximum: 65535
 
 externalDocs:
   description: "Routing Manager"
   url: "http://placeholder"
 
+
index 540b2fe..68891cd 100644 (file)
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.7.2
+tag: 0.10.0
index 015693d..f31cd94 100644 (file)
@@ -24,41 +24,22 @@ E2AP_PDU_t* decode_E2AP_PDU(const void* buffer, size_t buf_size)
     }
 }
 
-ssize_t encode_RIC_subscription_request(RICsubscriptionRequest_t* pdu, void* buffer, size_t buf_size)
-{
-    asn_enc_rval_t encode_result;
-    encode_result = aper_encode_to_buffer(&asn_DEF_RICsubscriptionRequest, NULL, pdu, buffer, buf_size);
-    if(encode_result.encoded == -1) {
-        return -1;
-    }
-    return encode_result.encoded;
-}
-
-RICsubscriptionRequest_t* decode_RIC_subscription_request(const void *buffer, size_t buf_size)
-{
-    asn_dec_rval_t decode_result;
-    RICsubscriptionRequest_t *pdu = 0;
-    decode_result = aper_decode_complete(NULL, &asn_DEF_RICsubscriptionRequest, (void **)&pdu, buffer, buf_size);
-    if(decode_result.code == RC_OK) {
-        return pdu;
-    } else {
-        ASN_STRUCT_FREE(asn_DEF_RICsubscriptionRequest, pdu);
-        return 0;
-    }
-}
-
 long e2ap_get_ric_subscription_request_sequence_number(void *buffer, size_t buf_size)
 {
     E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
     if ( pdu != NULL && pdu->present == E2AP_PDU_PR_initiatingMessage)
     {
         InitiatingMessageE2_t* initiatingMessage = pdu->choice.initiatingMessage;
-        RICsubscriptionRequest_t *ric_subscription_request = &(initiatingMessage->value.choice.RICsubscriptionRequest);
-        for (int i = 0; i < ric_subscription_request->protocolIEs.list.count; ++i )
+        if ( initiatingMessage->procedureCode == ProcedureCode_id_ricSubscription
+            && initiatingMessage->value.present == InitiatingMessageE2__value_PR_RICsubscriptionRequest)
         {
-            if ( ric_subscription_request->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionRequest_t *ric_subscription_request = &(initiatingMessage->value.choice.RICsubscriptionRequest);
+            for (int i = 0; i < ric_subscription_request->protocolIEs.list.count; ++i )
             {
-                return ric_subscription_request->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                if ( ric_subscription_request->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    return ric_subscription_request->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                }
             }
         }
     }
@@ -71,13 +52,17 @@ ssize_t  e2ap_set_ric_subscription_request_sequence_number(void *buffer, size_t
     if ( pdu != NULL && pdu->present == E2AP_PDU_PR_initiatingMessage)
     {
         InitiatingMessageE2_t* initiatingMessage = pdu->choice.initiatingMessage;
-        RICsubscriptionRequest_t *ric_subscription_request = &(initiatingMessage->value.choice.RICsubscriptionRequest);
-        for (int i = 0; i < ric_subscription_request->protocolIEs.list.count; ++i )
+        if ( initiatingMessage->procedureCode == ProcedureCode_id_ricSubscription
+            && initiatingMessage->value.present == InitiatingMessageE2__value_PR_RICsubscriptionRequest)
         {
-            if ( ric_subscription_request->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionRequest_t *ricSubscriptionRequest = &initiatingMessage->value.choice.RICsubscriptionRequest;
+            for (int i = 0; i < ricSubscriptionRequest->protocolIEs.list.count; ++i )
             {
-                ric_subscription_request->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
-                return encode_E2AP_PDU(pdu, buffer, buf_size);
+                if ( ricSubscriptionRequest->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    ricSubscriptionRequest->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
+                    return encode_E2AP_PDU(pdu, buffer, buf_size);
+                }
             }
         }
     }
@@ -85,40 +70,22 @@ ssize_t  e2ap_set_ric_subscription_request_sequence_number(void *buffer, size_t
 }
 
 /* RICsubscriptionResponse */
-ssize_t encode_RIC_subscription_response(RICsubscriptionResponse_t* pdu, void* buffer, size_t buf_size)
-{
-    asn_enc_rval_t encode_result;
-    encode_result = aper_encode_to_buffer(&asn_DEF_RICsubscriptionResponse, NULL, pdu, buffer, buf_size);
-    if(encode_result.encoded == -1) {
-        return -1;
-    }
-    return encode_result.encoded;
-}
-
-RICsubscriptionResponse_t* decode_RIC_subscription_response(const void *buffer, size_t buf_size)
-{
-    asn_dec_rval_t decode_result;
-    RICsubscriptionResponse_t *pdu = 0;
-    decode_result = aper_decode_complete(NULL, &asn_DEF_RICsubscriptionResponse, (void **)&pdu, buffer, buf_size);
-    if(decode_result.code == RC_OK) {
-        fprintf(stdout, "decoded bytes: %ld\n", decode_result.consumed);
-        return pdu;
-    } else {
-        ASN_STRUCT_FREE(asn_DEF_RICsubscriptionResponse, pdu);
-        return 0;
-    }
-}
-
 long e2ap_get_ric_subscription_response_sequence_number(void *buffer, size_t buf_size)
 {
-    RICsubscriptionResponse_t *pdu = decode_RIC_subscription_response(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_successfulOutcome )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        SuccessfulOutcomeE2_t* successfulOutcome = pdu->choice.successfulOutcome;
+        if ( successfulOutcome->procedureCode == ProcedureCode_id_ricSubscription
+            && successfulOutcome->value.present == SuccessfulOutcomeE2__value_PR_RICsubscriptionResponse)
         {
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionResponse_t *ricSubscriptionResponse = &successfulOutcome->value.choice.RICsubscriptionResponse;
+            for (int i = 0; i < ricSubscriptionResponse->protocolIEs.list.count; ++i )
             {
-                return pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                if ( ricSubscriptionResponse->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    return ricSubscriptionResponse->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                }
             }
         }
     }
@@ -127,15 +94,21 @@ long e2ap_get_ric_subscription_response_sequence_number(void *buffer, size_t buf
 
 ssize_t  e2ap_set_ric_subscription_response_sequence_number(void *buffer, size_t buf_size, long sequence_number)
 {
-    RICsubscriptionResponse_t *pdu = decode_RIC_subscription_response(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_successfulOutcome )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        SuccessfulOutcomeE2_t* successfulOutcome = pdu->choice.successfulOutcome;
+        if ( successfulOutcome->procedureCode == ProcedureCode_id_ricSubscription
+            && successfulOutcome->value.present == SuccessfulOutcomeE2__value_PR_RICsubscriptionResponse)
         {
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionResponse_t *ricSubscriptionResponse = &successfulOutcome->value.choice.RICsubscriptionResponse;
+            for (int i = 0; i < ricSubscriptionResponse->protocolIEs.list.count; ++i )
             {
-                pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
-                return encode_RIC_subscription_response(pdu, buffer, buf_size);
+                if ( ricSubscriptionResponse->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    ricSubscriptionResponse->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
+                    return encode_E2AP_PDU(pdu, buffer, buf_size);
+                }
             }
         }
     }
@@ -143,41 +116,22 @@ ssize_t  e2ap_set_ric_subscription_response_sequence_number(void *buffer, size_t
 }
 
 /* RICsubscriptionDeleteRequest */
-ssize_t encode_RIC_subscription_delete_request(RICsubscriptionDeleteRequest_t* pdu, void* buffer, size_t buf_size)
-{
-    asn_enc_rval_t encode_result;
-    encode_result = aper_encode_to_buffer(&asn_DEF_RICsubscriptionDeleteRequest, NULL, pdu, buffer, buf_size);
-    if(encode_result.encoded == -1) {
-        return -1;
-    }
-    return encode_result.encoded;
-}
-
-RICsubscriptionDeleteRequest_t* decode_RIC_subscription_delete_request(const void *buffer, size_t buf_size)
-{
-    asn_dec_rval_t decode_result;
-    RICsubscriptionDeleteRequest_t *pdu = 0;
-    decode_result = aper_decode_complete(NULL, &asn_DEF_RICsubscriptionDeleteRequest, (void **)&pdu, buffer, buf_size);
-    if(decode_result.code == RC_OK) {
-        fprintf(stdout, "decoded bytes: %ld\n", decode_result.consumed);
-        return pdu;
-    } else {
-        ASN_STRUCT_FREE(asn_DEF_RICsubscriptionDeleteRequest, pdu);
-        return 0;
-    }
-}
-
 long e2ap_get_ric_subscription_delete_request_sequence_number(void *buffer, size_t buf_size)
 {
-    RICsubscriptionDeleteRequest_t *pdu = decode_RIC_subscription_delete_request(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_initiatingMessage )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        InitiatingMessageE2_t* initiatingMessage = pdu->choice.initiatingMessage;
+        if ( initiatingMessage->procedureCode == ProcedureCode_id_ricSubscriptionDelete
+            && initiatingMessage->value.present == InitiatingMessageE2__value_PR_RICsubscriptionDeleteRequest )
         {
-            /* TODO */
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionDeleteRequest_t *subscriptionDeleteRequest = &initiatingMessage->value.choice.RICsubscriptionDeleteRequest;
+            for (int i = 0; i < subscriptionDeleteRequest->protocolIEs.list.count; ++i )
             {
-                return pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                if ( subscriptionDeleteRequest->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    return subscriptionDeleteRequest->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                }
             }
         }
     }
@@ -186,16 +140,21 @@ long e2ap_get_ric_subscription_delete_request_sequence_number(void *buffer, size
 
 ssize_t  e2ap_set_ric_subscription_delete_request_sequence_number(void *buffer, size_t buf_size, long sequence_number)
 {
-    RICsubscriptionDeleteRequest_t *pdu = decode_RIC_subscription_delete_request(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_initiatingMessage )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        InitiatingMessageE2_t* initiatingMessage = pdu->choice.initiatingMessage;
+        if ( initiatingMessage->procedureCode == ProcedureCode_id_ricSubscriptionDelete
+            && initiatingMessage->value.present == InitiatingMessageE2__value_PR_RICsubscriptionDeleteRequest )
         {
-            /* TODO */
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionDeleteRequest_t* subscriptionDeleteRequest = &initiatingMessage->value.choice.RICsubscriptionDeleteRequest;
+            for (int i = 0; i < subscriptionDeleteRequest->protocolIEs.list.count; ++i )
             {
-                pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
-                return encode_RIC_subscription_delete_request(pdu, buffer, buf_size);
+                if ( subscriptionDeleteRequest->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    subscriptionDeleteRequest->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
+                    return encode_E2AP_PDU(pdu, buffer, buf_size);
+                }
             }
         }
     }
@@ -203,41 +162,22 @@ ssize_t  e2ap_set_ric_subscription_delete_request_sequence_number(void *buffer,
 }
 
 /* RICsubscriptionDeleteResponse */
-ssize_t encode_RIC_subscription_delete_response(RICsubscriptionDeleteResponse_t* pdu, void* buffer, size_t buf_size)
-{
-    asn_enc_rval_t encode_result;
-    encode_result = aper_encode_to_buffer(&asn_DEF_RICsubscriptionDeleteResponse, NULL, pdu, buffer, buf_size);
-    if(encode_result.encoded == -1) {
-        return -1;
-    }
-    return encode_result.encoded;
-}
-
-RICsubscriptionDeleteResponse_t* decode_RIC_subscription_delete_response(const void *buffer, size_t buf_size)
-{
-    asn_dec_rval_t decode_result;
-    RICsubscriptionDeleteResponse_t *pdu = 0;
-    decode_result = aper_decode_complete(NULL, &asn_DEF_RICsubscriptionDeleteResponse, (void **)&pdu, buffer, buf_size);
-    if(decode_result.code == RC_OK) {
-        fprintf(stdout, "decoded bytes: %ld\n", decode_result.consumed);
-        return pdu;
-    } else {
-        ASN_STRUCT_FREE(asn_DEF_RICsubscriptionDeleteResponse, pdu);
-        return 0;
-    }
-}
-
 long e2ap_get_ric_subscription_delete_response_sequence_number(void *buffer, size_t buf_size)
 {
-    RICsubscriptionDeleteResponse_t *pdu = decode_RIC_subscription_delete_response(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_successfulOutcome )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        SuccessfulOutcomeE2_t* successfulOutcome = pdu->choice.successfulOutcome;
+        if ( successfulOutcome->procedureCode == ProcedureCode_id_ricSubscriptionDelete
+            && successfulOutcome->value.present == SuccessfulOutcomeE2__value_PR_RICsubscriptionDeleteResponse )
         {
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionDeleteResponse_t* subscriptionDeleteResponse = &successfulOutcome->value.choice.RICsubscriptionDeleteResponse;
+            for (int i = 0; i < subscriptionDeleteResponse->protocolIEs.list.count; ++i )
             {
-                /* TODO */
-                return pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                if ( subscriptionDeleteResponse->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    return subscriptionDeleteResponse->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                }
             }
         }
     }
@@ -246,16 +186,21 @@ long e2ap_get_ric_subscription_delete_response_sequence_number(void *buffer, siz
 
 ssize_t  e2ap_set_ric_subscription_delete_response_sequence_number(void *buffer, size_t buf_size, long sequence_number)
 {
-    RICsubscriptionDeleteResponse_t *pdu = decode_RIC_subscription_delete_response(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_successfulOutcome )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        SuccessfulOutcomeE2_t* successfulOutcome = pdu->choice.successfulOutcome;
+        if ( successfulOutcome->procedureCode == ProcedureCode_id_ricSubscriptionDelete
+            && successfulOutcome->value.present == SuccessfulOutcomeE2__value_PR_RICsubscriptionDeleteResponse )
         {
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionDeleteResponse_t* subscriptionDeleteResponse;
+            for (int i = 0; i < subscriptionDeleteResponse->protocolIEs.list.count; ++i )
             {
-                /* todo */
-                pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
-                return encode_RIC_subscription_delete_response(pdu, buffer, buf_size);
+                if ( subscriptionDeleteResponse->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    subscriptionDeleteResponse->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
+                    return encode_E2AP_PDU(pdu, buffer, buf_size);
+                }
             }
         }
     }
index 026e676..9bc1549 100644 (file)
@@ -7,38 +7,26 @@
 #include "RICsubscriptionDeleteResponse.h"
 #include "E2AP-PDU.h"
 #include "InitiatingMessageE2.h"
+#include "SuccessfulOutcomeE2.h"
 #include "ProtocolIE-Container.h"
 #include "ProtocolIE-Field.h"
 
 size_t encode_E2AP_PDU(E2AP_PDU_t* pdu, void* buffer, size_t buf_size);
 E2AP_PDU_t* decode_E2AP_PDU(const void* buffer, size_t buf_size);
 
-/* RICsubscriptionRequest */
-ssize_t encode_RIC_subscription_request(RICsubscriptionRequest_t* pdu, void* buffer, size_t buf_size);
-RICsubscriptionRequest_t* decode_RIC_subscription_request(const void *buffer, size_t buf_size);
-
 long e2ap_get_ric_subscription_request_sequence_number(void *buffer, size_t buf_size);
 ssize_t  e2ap_set_ric_subscription_request_sequence_number(void *buffer, size_t buf_size, long sequence_number);
 RICsubscription_t* e2ap_get_ric_subscription_request_ric_subscription(void *buffer, size_t buffer_size);
 
 /* RICsubscriptionResponse */
-ssize_t encode_RIC_subscription_response(RICsubscriptionResponse_t* pdu, void* buffer, size_t buf_size);
-RICsubscriptionResponse_t* decode_RIC_subscription_response(const void *buffer, size_t buf_size);
-
 long e2ap_get_ric_subscription_response_sequence_number(void *buffer, size_t buf_size);
 ssize_t  e2ap_set_ric_subscription_response_sequence_number(void *buffer, size_t buf_size, long sequence_number);
 
 /* RICsubscriptionDeleteRequest */
-ssize_t encode_RIC_subscription_delete_request(RICsubscriptionDeleteRequest_t* pdu, void* buffer, size_t buf_size);
-RICsubscriptionDeleteRequest_t* decode_RIC_subscription_delete_request(const void *buffer, size_t buf_size);
-
 long e2ap_get_ric_subscription_delete_request_sequence_number(void *buffer, size_t buf_size);
 ssize_t  e2ap_set_ric_subscription_delete_request_sequence_number(void *buffer, size_t buf_size, long sequence_number);
 
 /* RICsubscriptionDeleteResponse */
-ssize_t encode_RIC_subscription_delete_response(RICsubscriptionDeleteResponse_t* pdu, void* buffer, size_t buf_size);
-RICsubscriptionDeleteResponse_t* decode_RIC_subscription_delete_response(const void *buffer, size_t buf_size);
-
 long e2ap_get_ric_subscription_delete_response_sequence_number(void *buffer, size_t buf_size);
 ssize_t  e2ap_set_ric_subscription_delete_response_sequence_number(void *buffer, size_t buf_size, long sequence_number);
 
index 598c7ef..1c07ad4 100644 (file)
@@ -32,6 +32,7 @@ import (
 type RtmgrClient struct {
        rtClient         *rtmgrclient.RoutingManager
        xappHandleParams *rtmgrhandle.ProvideXappSubscriptionHandleParams
+       xappDeleteParams *rtmgrhandle.DeleteXappSubscriptionHandleParams
 }
 
 func (rc *RtmgrClient) SubscriptionRequestUpdate() error {
@@ -54,6 +55,15 @@ func (rc *RtmgrClient) SubscriptionRequestUpdate() error {
                        xapp.Logger.Info("Succesfully updated routing manager about the subscription: %d", subID)
                        return nil
                }
+       case DELETE:
+               _, _, deleteErr := rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq))
+               if deleteErr != nil && !(strings.Contains(deleteErr.Error(), "status 200"))  {
+                       xapp.Logger.Error("Deleting subscription id = %d  in routing manager, failed with error: %v", subID, deleteErr)
+                       return deleteErr
+               } else {
+                       xapp.Logger.Info("Succesfully deleted subscription: %d in routing manager.", subID)
+                       return nil
+               }
        default:
                return nil
        }
index f6cd771..ec6419e 100644 (file)
@@ -53,17 +53,6 @@ type RMRMeid struct {
        EnbID  string
 }
 
-type RMRParams struct {
-       Mtype           int
-       Payload         []byte
-       PayloadLen      int
-       Meid            *RMRMeid
-       Xid             string
-       SubId           int
-       Src             string
-       Mbuf            *C.rmr_mbuf_t
-}
-
 var SEEDSN uint16
 var SubscriptionReqChan = make(chan subRouteInfo, 10)
 
@@ -98,7 +87,8 @@ func NewControl() Control {
        transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
        client := rtmgrclient.New(transport, strfmt.Default)
        handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
-       rtmgrClient := RtmgrClient{client, handle}
+       delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+       rtmgrClient := RtmgrClient{client, handle, delete_handle}
 
        return Control{new(E2ap), registry, &rtmgrClient, tracker}
 }
@@ -115,6 +105,8 @@ func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
                err = c.handleSubscriptionResponse(rp)
        case C.RIC_SUB_DEL_REQ:
                err = c.handleSubscriptionDeleteRequest(rp)
+       case C.RIC_SUB_DEL_RESP:
+               err = c.handleSubscriptionDeleteResponse(rp)
        default:
                err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
        }
@@ -128,6 +120,13 @@ func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
        return
 }
 
+func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
+       if !xapp.Rmr.Send(params, true) {
+               err = errors.New("rmr.Send() failed")
+       }
+       return
+}
+
 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
        payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
        if err != nil {
@@ -153,7 +152,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error)
 
        /* Create transatcion records for every subscription request */
        xact_key := Transaction_key{new_sub_id, CREATE}
-       xact_value := Transaction{*src_addr, *src_port, params.Payload}
+       xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
        err = c.tracker.Track_transaction(xact_key, xact_value)
        if err != nil {
                xapp.Logger.Error("Failed to create a transaction record due to %v", err)
@@ -170,6 +169,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error)
 
        xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
        c.rmrSend(params)
+       xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
        return
 }
 
@@ -186,7 +186,14 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error)
        }
        c.registry.setSubscriptionToConfirmed(payload_seq_num)
        xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
-       c.rmrSend(params)
+       transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
+       if err != nil {
+               xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+               return
+       }
+       xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
+       params.Mbuf = transaction.Mbuf
+       c.rmrReplyToSender(params)
        return
 }
 
@@ -221,8 +228,47 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err e
        xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
        if c.registry.IsValidSequenceNumber(payload_seq_num) {
                c.registry.deleteSubscription(payload_seq_num)
+               trackErr := c.trackDeleteTransaction(params, payload_seq_num)
+               if trackErr != nil {
+                       xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+                       return trackErr
+               }
        }
        xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
        c.rmrSend(params)
        return
 }
+
+func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
+       src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
+       xact_key := Transaction_key{payload_seq_num, DELETE}
+       xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
+       err = c.tracker.Track_transaction(xact_key, xact_value)
+       return
+}
+
+func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
+       payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
+       if err != nil {
+               err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+               return
+       }
+    var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
+       sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
+       go c.rtmgrClient.SubscriptionRequestUpdate()
+       SubscriptionReqChan <- sub_route_action
+
+       xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
+       if c.registry.releaseSequenceNumber(payload_seq_num) {
+               transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
+               if err != nil {
+                       xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+                       return
+               }
+               xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
+               //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
+               params.Mbuf = transaction.Mbuf
+               c.rmrReplyToSender(params)
+       }
+       return
+}
index c349921..2b04f38 100644 (file)
@@ -71,6 +71,11 @@ func (r *Registry) deleteSubscription(sn uint16) {
 }
 
 //This function releases the given id as unused in the register
-//func (r *Registry) releaseSequenceNumber(sn uint16) {
-//     delete(r.register, sn)
-//}
\ No newline at end of file
+func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+       if r.register[sn] {
+               return false
+       } else {
+               delete(r.register, sn)
+               return true
+       }
+}
\ No newline at end of file
index fdbbeaf..e08f8db 100644 (file)
@@ -21,7 +21,6 @@ package control
 
 import (
        "fmt"
-//     "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 )
 
 /*
@@ -49,6 +48,21 @@ func (t *Tracker) Track_transaction(key Transaction_key, xact Transaction) error
        return nil
 }
 
+/*
+Retreives the transaction table entry for the given request.
+Returns error in case the transaction cannot be found.
+*/
+func (t *Tracker) Update_transaction(SubID uint16, trans_type Action, xact Transaction) error{
+       key := Transaction_key{SubID, trans_type}
+       if _, ok := t.transaction_table[key]; ok {
+               // TODO: Implement merge related check here. If the key is same but the value is different.
+               err := fmt.Errorf("Transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.trans_type )
+               return err
+       }
+       t.transaction_table[key] = xact
+       return nil
+}
+
 /*
 Retreives the transaction table entry for the given request.
 Returns error in case the transaction cannot be found.
@@ -67,14 +81,13 @@ func (t *Tracker) Retrive_transaction(subID uint16, act Action) (Transaction, er
 Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference.
 Returns error in case the transaction cannot be found.
 */
-func (t *Tracker) complete_transaction(subID uint16, act Action) (string, uint16, error){
+func (t *Tracker) complete_transaction(subID uint16, act Action) (Transaction, error){
        key := Transaction_key{subID, act}
-       var empty_address string
-       var empty_port uint16
+       var empty_transaction Transaction
        if xact, ok := t.transaction_table[key]; ok {
                delete(t.transaction_table, key)
-               return xact.Xapp_instance_address, xact.Xapp_port, nil
+               return xact, nil
        }
        err := fmt.Errorf("Tranaction record for Subscription ID %d and action %s does not exist", subID, act)
-       return empty_address, empty_port, err
+       return empty_transaction, err
 }
index 1a2c92f..d12233c 100644 (file)
 
 package control
 
+import (
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
 type RmrDatagram struct {
        MessageType    int
        SubscriptionId uint16
@@ -44,4 +48,5 @@ type Transaction struct {
        Xapp_instance_address string
        Xapp_port             uint16
        Ric_sub_req           []byte
+       Mbuf                  *xapp.RMRMbuf
 }
index 0f726e7..7b9b3da 100644 (file)
@@ -77,4 +77,4 @@ COPY --from=submgrbuild /usr/local/lib /usr/local/lib
 
 RUN ldconfig
 
-RUN chmod 755 /run_e2t.sh
+RUN chmod 755 /run_e2t.sh
\ No newline at end of file
index 328c85a..557afd2 100644 (file)
@@ -76,4 +76,4 @@ COPY --from=submgrbuild /usr/local/lib /usr/local/lib
 
 RUN ldconfig
 RUN chmod 755 /run_rco.sh
-RUN chmod 755 /rco
+RUN chmod 755 /rco
\ No newline at end of file
index 25926b8..871c3a8 100644 (file)
@@ -37,7 +37,7 @@ spec:
     spec:
       containers:
       - name: rco
-        image: rco:builder
+        image: jenkins:5000/rco:test
         command: ["/run_rco.sh"]
         env:
         - name: DBAAS_SERVICE_HOST
index 6ba36d0..21062eb 100644 (file)
@@ -59,7 +59,11 @@ func init() {
        if SEEDSN == 0 || SEEDSN > 65535 {
                SEEDSN = 12345
        }
-       DELETESEEDSN = SEEDSN
+       DELETESEEDSN = uint16(viper.GetInt("delete_seed_sn"))
+       if DELETESEEDSN == 0 || DELETESEEDSN > 65535 {
+               DELETESEEDSN = SEEDSN
+       }
+
        xapp.Logger.Info("Initial SEQUENCE NUMBER: %v", SEEDSN)
 }
 
@@ -77,9 +81,7 @@ func (r *Rco) GenerateDeletePayload(sub_id uint16) (payload []byte, err error) {
        if err != nil {
                return make([]byte, 0), errors.New("Unable to decode data provided in RCO_DELETE RAWDATA environment variable")
        }
-       xapp.Logger.Info("SetSubscriptionDeleteRequestSequenceNumber1")
        payload, err = r.SetSubscriptionDeleteRequestSequenceNumber(skeleton, sub_id)
-       xapp.Logger.Info("SetSubscriptionDeleteRequestSequenceNumber2")
        return
 }
 
@@ -106,6 +108,7 @@ func (r *Rco) SendRequests() (err error) {
        for {
                time.Sleep(2 * time.Second)
                c <- submgr.RmrDatagram{12010, SEEDSN, message}
+               SEEDSN++
                time.Sleep(2 * time.Second)
                c <- submgr.RmrDatagram{12020, DELETESEEDSN, deletemessage}
                DELETESEEDSN++
@@ -118,7 +121,7 @@ func (r *Rco) Run() {
                message := <-c
                payload_seq_num, err := r.GetSubscriptionRequestSequenceNumber(message.Payload)
                if err != nil {
-                       xapp.Logger.Debug("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())     
+                       xapp.Logger.Debug("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
                }
                params.SubId = int(message.SubscriptionId)
                params.Mtype = message.MessageType
diff --git a/tmp/rmr.go b/tmp/rmr.go
new file mode 100644 (file)
index 0000000..49cdf41
--- /dev/null
@@ -0,0 +1,284 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+/*
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <rmr/rmr.h>
+#include <rmr/RIC_message_types.h>
+
+void write_bytes_array(unsigned char *dst, void *data, int len) {
+    memcpy((void *)dst, (void *)data, len);
+}
+
+#cgo CFLAGS: -I../
+#cgo LDFLAGS: -lrmr_nng -lnng
+*/
+import "C"
+
+import (
+       "github.com/spf13/viper"
+       "strconv"
+       "strings"
+       "time"
+       "unsafe"
+)
+
+type RMRMbuf C.rmr_mbuf_t
+
+var RMRCounterOpts = []CounterOpts{
+       {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
+       {Name: "Received", Help: "The total number of received RMR messages"},
+       {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
+       {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
+}
+
+type RMRParams struct {
+       Mtype           int
+       Payload         []byte
+       PayloadLen      int
+       Meid            *RMRMeid
+       Xid             string
+       SubId           int
+       Src             string
+       Mbuf            *RMRMbuf
+}
+
+func NewRMRClient() *RMRClient {
+       p := C.CString(viper.GetString("rmr.protPort"))
+       m := C.int(viper.GetInt("rmr.maxSize"))
+       defer C.free(unsafe.Pointer(p))
+
+       ctx := C.rmr_init(p, m, C.int(0))
+       if ctx == nil {
+               Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
+       }
+
+       return &RMRClient{
+               context:   ctx,
+               consumers: make([]MessageConsumer, 0),
+               stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+       }
+}
+
+func (m *RMRClient) Start(c MessageConsumer) {
+       if c != nil {
+               m.consumers = append(m.consumers, c)
+       }
+
+       for {
+               Logger.Info("rmrClient: Waiting for RMR to be ready ...")
+
+               if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
+                       break
+               }
+               time.Sleep(10 * time.Second)
+       }
+       m.wg.Add(viper.GetInt("rmr.numWorkers"))
+
+       if m.readyCb != nil {
+               go m.readyCb(m.readyCbParams)
+       }
+
+       for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
+               go m.Worker("worker-"+strconv.Itoa(w), 0)
+       }
+       m.Wait()
+}
+
+func (m *RMRClient) Worker(taskName string, msgSize int) {
+       p := viper.GetString("rmr.protPort")
+       Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
+
+       defer m.wg.Done()
+       for {
+               rxBuffer := (*RMRMbuf)(C.rmr_rcv_msg(m.context, nil))
+               if rxBuffer == nil {
+                       m.UpdateStatCounter("ReceiveError")
+                       continue
+               }
+               m.UpdateStatCounter("Received")
+
+               go m.parseMessage(rxBuffer)
+       }
+}
+
+func (m *RMRClient) parseMessage(rxBuffer *RMRMbuf) {
+       if len(m.consumers) == 0 {
+               Logger.Info("rmrClient: No message handlers defined, message discarded!")
+               return
+       }
+
+       params := &RMRParams{}
+       params.Mbuf = rxBuffer
+       params.Mtype = int(rxBuffer.mtype)
+       params.SubId = int(rxBuffer.sub_id)
+       params.Meid = &RMRMeid{}
+
+       meidBuf := make([]byte, int(C.RMR_MAX_MEID))
+       if meidCstr := C.rmr_get_meid((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
+               params.Meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
+               params.Meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
+       }
+
+       xidBuf := make([]byte, int(C.RMR_MAX_XID))
+       if xidCstr := C.rmr_get_xact((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
+               params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
+       }
+
+       srcBuf := make([]byte, int(C.RMR_MAX_SRC))
+       if srcStr := C.rmr_get_src((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
+               params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
+       }
+
+       for _, c := range m.consumers {
+               cptr := unsafe.Pointer(rxBuffer.payload)
+               params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
+               params.PayloadLen = int(rxBuffer.len)
+
+               err := c.Consume(params)
+               if err != nil {
+                       Logger.Warn("rmrClient: Consumer returned error: %v", err)
+               }
+       }
+}
+
+func (m *RMRClient) Allocate() *RMRMbuf {
+       buf := C.rmr_alloc_msg(m.context, 0)
+       if buf == nil {
+               Logger.Error("rmrClient: Allocating message buffer failed!")
+       }
+
+       return (*RMRMbuf)(buf)
+}
+
+func (m *RMRClient) SendMsg(params *RMRParams) bool {
+       return m.Send(params, false)
+}
+
+func (m *RMRClient) SendRts(params *RMRParams) bool {
+       return m.Send(params, true)
+}
+
+func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
+       buf := params.Mbuf
+       if buf == nil {
+               buf = m.Allocate()
+       }
+
+       buf.mtype = C.int(params.Mtype)
+       buf.sub_id = C.int(params.SubId)
+       buf.len = C.int(len(params.Payload))
+       datap := C.CBytes(params.Payload)
+       defer C.free(datap)
+
+       if params != nil {
+               if params.Meid != nil {
+                       b := make([]byte, int(C.RMR_MAX_MEID))
+                       copy(b, []byte(params.Meid.PlmnID))
+                       copy(b[16:], []byte(params.Meid.EnbID))
+                       C.rmr_bytes2meid((*C.rmr_mbuf_t)(buf), (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
+               }
+               xidLen := len(params.Xid)
+               if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
+                       b := make([]byte, int(C.RMR_MAX_MEID))
+                       copy(b, []byte(params.Xid))
+                       C.rmr_bytes2xact((*C.rmr_mbuf_t)(buf), (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
+               }
+       }
+       C.write_bytes_array(buf.payload, datap, buf.len)
+
+       return m.SendBuf(buf, isRts)
+}
+
+func (m *RMRClient) SendBuf(txBuffer *RMRMbuf, isRts bool) bool {
+       for i := 0; i < 10; i++ {
+               txBuffer.state = 0
+               if isRts {
+                       txBuffer = (*RMRMbuf)(C.rmr_rts_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
+               } else {
+                       txBuffer = (*RMRMbuf)(C.rmr_send_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
+               }
+
+               if txBuffer == nil {
+                       break
+               } else if txBuffer.state != C.RMR_OK {
+                       if txBuffer.state != C.RMR_ERR_RETRY {
+                               time.Sleep(100 * time.Microsecond)
+                               m.UpdateStatCounter("TransmitError")
+                       }
+                       for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
+                               txBuffer = (*RMRMbuf)(C.rmr_send_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
+                       }
+               }
+
+               if txBuffer.state == C.RMR_OK {
+                       m.UpdateStatCounter("Transmitted")
+                       return true
+               }
+       }
+       m.UpdateStatCounter("TransmitError")
+       return false
+}
+
+func (m *RMRClient) UpdateStatCounter(name string) {
+       m.mux.Lock()
+       m.stat[name].Inc()
+       m.mux.Unlock()
+}
+
+func (m *RMRClient) RegisterMetrics() {
+       m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
+}
+
+func (m *RMRClient) Wait() {
+       m.wg.Wait()
+}
+
+func (m *RMRClient) IsReady() bool {
+       return m.ready != 0
+}
+
+func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
+       m.readyCb = cb
+       m.readyCbParams = params
+}
+
+func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
+       id, ok := RICMessageTypes[name]
+       return id, ok
+}
+
+func (m *RMRClient) GetRicMessageName(id int) (s string) {
+       for k, v := range RICMessageTypes {
+               if id == v {
+                       return k
+               }
+       }
+       return
+}
+
+// To be removed ...
+func (m *RMRClient) GetStat() (r RMRStatistics) {
+       return
+}