Add E2AP package that supports asn1 encoding/decoding function for E2AP. 80/4880/5
authorHeewon Park <h_w.park@samsung.com>
Thu, 22 Oct 2020 08:36:04 +0000 (17:36 +0900)
committerHeewon Park <h_w.park@samsung.com>
Mon, 14 Jun 2021 02:21:25 +0000 (11:21 +0900)
   - indication
   - subscription request/response
   - control request

Issue-ID : RIC-664

Signed-off-by: Heewon Park <h_w.park@samsung.com>
Change-Id: Id1bb533dd4cf16006f73c8e6b928cca65a832e4b

Dockerfile-Unit-Test
e2ap-version.yaml [new file with mode: 0644]
ricxappframe/e2ap/__init__.py [new file with mode: 0644]
ricxappframe/e2ap/asn1.py [new file with mode: 0644]
ricxappframe/e2ap/asn1clib/__init__.py [new file with mode: 0644]
ricxappframe/e2ap/asn1clib/asn1clib.py [new file with mode: 0644]
ricxappframe/e2ap/asn1clib/types.py [new file with mode: 0644]
tests/test_e2ap.py [new file with mode: 0644]

index ee754ce..575f4ea 100644 (file)
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-FROM python:3.8-alpine
+FROM python:3.8-slim as stretch
 
 # sdl uses hiredis which needs gcc
-RUN apk update && apk add gcc musl-dev
+RUN apt-get update && \
+    apt-get install -y \
+        wget gcc musl-dev
 
-# copy rmr libraries from builder image in lieu of an Alpine package
-COPY --from=nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-alpine3-rmr:4.1.2 /usr/local/lib64/librmr* /usr/local/lib64/
+ARG rmr_version=4.1.2
+ARG e2ap_version=1.1.0
+
+# download rmr and e2ap libraries from package cloud
+RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr_${rmr_version}_amd64.deb/download.deb
+RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr-dev_${rmr_version}_amd64.deb/download.deb
+
+RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/riclibe2ap_${e2ap_version}_amd64.deb/download.deb
+RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/riclibe2ap-dev_${e2ap_version}_amd64.deb/download.deb
+
+RUN dpkg -i rmr_${rmr_version}_amd64.deb
+RUN dpkg -i rmr-dev_${rmr_version}_amd64.deb
+
+RUN dpkg -i riclibe2ap_${e2ap_version}_amd64.deb
+RUN dpkg -i riclibe2ap-dev_${e2ap_version}_amd64.deb
+
+RUN ls /usr/local/lib/
+
+FROM python:3.8-slim
+
+ARG rmr_version=4.1.2
+ARG e2ap_version=1.1.0
+
+COPY --from=stretch /usr/local/lib/librmr_si.so.${rmr_version} /usr/local/lib/librmr_si.so
+COPY --from=stretch /usr/local/lib/libriclibe2ap.so.${e2ap_version} /usr/local/lib/libriclibe2ap.so
 
 # Upgrade pip, install tox
 RUN pip install --upgrade pip && pip install tox
diff --git a/e2ap-version.yaml b/e2ap-version.yaml
new file mode 100644 (file)
index 0000000..14d9805
--- /dev/null
@@ -0,0 +1,3 @@
+# CI script installs asn1 for E2AP from PackageCloud using this version
+---
+version: 1.1.0
diff --git a/ricxappframe/e2ap/__init__.py b/ricxappframe/e2ap/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/ricxappframe/e2ap/asn1.py b/ricxappframe/e2ap/asn1.py
new file mode 100644 (file)
index 0000000..d7afe72
--- /dev/null
@@ -0,0 +1,465 @@
+# *******************************************************************************
+#  * Copyright 2020 Samsung Electronics All Rights Reserved.
+#  *
+#  * Licensed under the Apache License, Version 2.0 (the "License");
+#  * you may not use this file except in compliance with the License.
+#  * You may obtain a copy of the License at
+#  *
+#  * http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  *
+#  *******************************************************************************
+from sys import getsizeof
+from typing import List, Tuple
+from ctypes import POINTER, ARRAY
+from ctypes import c_ulong, c_void_p, c_long, c_size_t, c_int, c_ssize_t, c_uint8
+from ricxappframe.e2ap.asn1clib.asn1clib import asn1_c_lib
+from ricxappframe.e2ap.asn1clib.types import indication_msg_t, subResp_msg_t, ric_action_definition_t, ric_subsequent_action_t
+
+
+def _wrap_asn1_function(funcname, restype, argtypes):
+    """
+    Simplify wrapping ctypes functions.
+
+    Parameters
+    ----------
+    funcname: str
+        Name of library method
+    restype: class
+        Name of ctypes class; e.g., c_char_p
+    argtypes: list
+        List of ctypes classes; e.g., [ c_char_p, int ]
+
+    Returns
+    -------
+    _FuncPointer:
+        Pointer to C library function
+    """
+    func = asn1_c_lib.__getattr__(funcname)
+    func.restype = restype
+    func.argtypes = argtypes
+    return func
+
+
+_asn1_decode_indicationMsg = _wrap_asn1_function(
+    'e2ap_decode_ric_indication_message', POINTER(indication_msg_t), [c_void_p, c_ulong])
+_asn1_free_indicationMsg = _wrap_asn1_function(
+    'e2ap_free_decoded_ric_indication_message', c_void_p, [POINTER(indication_msg_t)])
+
+
+class IndicationMsg:
+    """
+    A class of E2AP's RICIndicationMessage
+    """
+    __slots__ = ('__request_id', '__request_sequence_number', '__function_id', '__action_id',
+                 '__indication_sequence_number', '__indication_type', '__indication_header', '__indication_message',
+                 '__call_process_id')
+    __request_id: int
+    __request_sequence_number: int
+    __function_id: int
+    __action_id: int
+    __indication_sequence_number: int
+    __indication_type: int
+    __indication_header: bytes
+    __indication_message: bytes
+    __call_process_id: bytes
+
+    def __init__(self):
+        return
+
+    @property
+    def request_id(self):
+        return self.__request_id
+
+    @property
+    def request_sequence_number(self):
+        return self.__request_sequence_number
+
+    @property
+    def function_id(self):
+        return self.__function_id
+
+    @property
+    def action_id(self):
+        return self.__action_id
+
+    @property
+    def indication_sequence_number(self):
+        return self.__indication_sequence_number
+
+    @property
+    def indication_type(self):
+        return self.__indication_type
+
+    @property
+    def indication_header(self):
+        return self.__indication_header
+
+    @property
+    def indication_message(self):
+        return self.__indication_message
+
+    @property
+    def call_process_id(self):
+        return self.__call_process_id
+
+    def decode(self, payload: c_void_p):
+        """
+        Function that sets fields of IndicationMsg class
+        through msg payload (bytes) of RICIndication type.
+
+        Raise Exception when payload is not RICIndication.
+
+        Parameters
+        ----------
+        payload: c_void_p
+            RICIndication type payload received via rmr
+
+        Returns
+        -------
+        """
+        indication: indication_msg_t = _asn1_decode_indicationMsg(
+            payload, getsizeof(payload))
+
+        if indication is None:
+            raise Exception("Payload is not matched with RICIndication")
+        indication.contents.request_id = 1
+        self.__request_id = indication.contents.request_id
+        self.__request_sequence_number = indication.contents.request_sequence_number
+        self.__function_id = indication.contents.function_id
+        self.__action_id = indication.contents.action_id
+        self.__indication_sequence_number = indication.contents.indication_sequence_number
+        self.__indication_type = indication.contents.indication_type
+        self.__indication_header = bytes(
+            indication.contents.indication_header[:indication.contents.indication_header_length])
+        self.__indication_message = bytes(
+            indication.contents.indication_message[:indication.contents.indication_message_length])
+        self.__call_process_id = bytes(
+            indication.contents.call_process_id[:indication.contents.call_process_id_length])
+
+        # _asn1_free_indicationMsg(indication)
+        return
+
+
+class CauseItem:
+    __slots__ = ('__cause_type', '__cause_id')
+    __cause_type: int
+    __cause_id: int
+
+    def __init__(self, causeType: int, causeID: int):
+        self.__cause_type = causeType
+        self.__cause_id = causeID
+        return
+
+    @property
+    def cause_type(self):
+        return self.__cause_type
+
+    @property
+    def cause_id(self):
+        return self.__cause_id
+
+
+class ActionAdmittedList:
+    __slots__ = ('__request_id', '__count')
+    __request_id: List[int]
+    __count: int
+
+    def __init__(self, request_id: List[int], count: int):
+        self.__request_id = request_id
+        self.__count = count
+        return
+
+    @property
+    def request_id(self):
+        return self.__request_id
+
+    @property
+    def count(self):
+        return self.__count
+
+
+class ActionNotAdmittedList:
+    __slots__ = ('__request_id', '__cause', '__count')
+    __request_id: List[int]
+    __cause: List[CauseItem]
+    __count: int
+
+    def __init__(self, request_id: List[int], cause: List[CauseItem], count: int):
+        self.__request_id = request_id
+        self.__cause = cause
+        self.__count = count
+        return
+
+    @property
+    def request_id(self):
+        return self.__request_id
+
+    @property
+    def cause(self):
+        return self.__cause
+
+    @property
+    def count(self):
+        return self.__count
+
+
+_asn1_decode_subRespMsg = _wrap_asn1_function(
+    'e2ap_decode_ric_subscription_response_message', POINTER(subResp_msg_t), [c_void_p, c_ulong])
+
+
+class SubResponseMsg:
+    """
+    A class of E2AP's RICsubscriptionResponseMessage
+    """
+    __slots__ = ('__request_id', '__request_sequence_number', '__function_id',
+                 '__action_admitted_list', '__action_not_admitted_list')
+    __request_id: int
+    __request_sequence_number: int
+    __function_id: int
+    __action_admitted_list: ActionAdmittedList
+    __action_not_admitted_list: ActionNotAdmittedList
+
+    @property
+    def request_id(self):
+        return self.__request_id
+
+    @property
+    def request_sequence_number(self):
+        return self.__request_sequence_number
+
+    @property
+    def function_id(self):
+        return self.__function_id
+
+    @property
+    def action_admitted_list(self):
+        return self.__action_admitted_list
+
+    @property
+    def action_not_admitted_list(self):
+        return self.__action_not_admitted_list
+
+    def __init__(self):
+        return
+
+    def decode(self, payload: c_void_p):
+        """
+        Function that sets fields of SubRespMsg class
+        through msg payload (bytes) of RICsubscriptionResponseMessage type.
+
+        Raise Exception when payload is not RICsubscriptionResponseMessage.
+
+        Parameters
+        ----------
+        payload: c_void_p
+            RICsubscriptionResponseMessage type payload received via rmr
+
+        Returns
+        -------
+        """
+        subResp: subResp_msg_t = _asn1_decode_subRespMsg(
+            payload, getsizeof(payload))
+
+        if subResp is None:
+            raise Exception(
+                "Payload is not matched with RICsubscriptionResponseMessage")
+
+        self.__request_id = subResp.contents.request_id
+        self.__request_sequence_number = subResp.contents.request_sequence_number
+        self.__function_id = subResp.contents.function_id
+        self.__action_admitted_list = ActionAdmittedList(subResp.contents.action_admitted_list.request_id,
+                                                         subResp.contents.action_admitted_list.count)
+        causeList = [CauseItem(item.cause_type, item.cause_id)
+                     for item in subResp.contents.action_not_admitted_list.cause]
+        self.__action_not_admitted_list = ActionNotAdmittedList(subResp.contents.action_not_admitted_list.request_id, causeList,
+                                                                subResp.contents.action_not_admitted_list.count)
+        return
+
+
+class ActionDefinition:
+    """
+    A class that mirrored E2AP's RICactionDefinition with python
+    """
+    __slots__ = ('action_definition', 'size')
+
+    action_definition: bytes
+    size: int
+
+    def __init__(self):
+        self.action_definition = []
+        self.size = 0
+        return
+
+
+class SubsequentAction:
+    """
+    A class that mirrored E2AP's RICSubsequentAction with python
+    """
+    __slots__ = ('is_valid', 'subsequent_action_type', 'time_to_wait')
+
+    is_valid: int
+    subsequent_action_type: int
+    time_to_wait: int
+
+    def __init__(self):
+        self.is_valid = 0
+        self.subsequent_action_type = 0
+        self.time_to_wait = 0
+        return
+
+
+_asn1_encode_subReqMsg = _wrap_asn1_function('e2ap_encode_ric_subscription_request_message', c_ssize_t, [
+                                             c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_size_t, POINTER(c_long), POINTER(c_long), POINTER(ric_action_definition_t), POINTER(ric_subsequent_action_t)])
+
+
+class SubRequestMsg:
+    """
+    A class that provides a function to make payload of e2ap RICSubscriptionRequestMessage.
+    """
+
+    def __init__(self):
+        return
+
+    def encode(self, requestor_id: int, request_sequence_number: int,
+               ran_function_id: int, event_trigger_definition: bytes, action_ids: List[int],
+               action_types: List[int], action_definitions: List[ActionDefinition],
+               sub_sequent_actions: List[SubsequentAction]) -> Tuple[int, bytes]:
+        """
+        Function that creates and returns a payload
+        according to e2ap's RICSubscriptionRequestMessage.
+
+        Raise Exception when the payload of RICSubscriptionRequestMessage cannot be created.
+
+        Parameters
+        ----------
+        requestor_id: int
+        request_sequence_number: int
+        ran_function_id: int
+        event_trigger_definition: bytes
+        action_ids: List[int]
+        action_types: List[int]
+        action_definitions: List[ActionDefinition]
+        sub_sequent_actions: List[SubsequentAction]
+
+        Returns
+        Tuple[int, bytes]
+          int :
+            RICSubscriptionRequestMessage type payload length
+          bytes :
+            RICSubscriptionRequestMessage type payload
+        -------
+        """
+        action_count = len(action_ids)
+        action_id_array = ARRAY(c_long, action_count)()
+        for idx in range(action_count):
+            action_id_array[idx] = c_long(action_ids[idx])
+
+        action_type_count = len(action_types)
+        action_type_array = ARRAY(c_long, action_type_count)()
+        for idx in range(action_type_count):
+            action_type_array[idx] = c_long(action_types[idx])
+
+        event_definition_count = len(event_trigger_definition)
+        event_trigger_definition_array = ARRAY(
+            c_uint8, event_definition_count)()
+        for idx in range(event_definition_count):
+            event_trigger_definition_array[idx] = c_uint8(
+                event_trigger_definition[idx])
+
+        action_definition_count = len(action_definitions)
+        acttion_definition_array = ARRAY(
+            ric_action_definition_t, action_definition_count)()
+        for idx in range(action_definition_count):
+            action_definition_buffer = ARRAY(
+                c_uint8, action_definitions[idx].size)()
+            for buf_idx in range(action_definitions[idx].size):
+                action_definition_buffer[buf_idx] = c_uint8(
+                    action_definitions[idx].action_definition[buf_idx])
+            acttion_definition_array[idx].action_definition = action_definition_buffer
+            acttion_definition_array[idx].size = c_int(
+                action_definitions[idx].size)
+
+        subsequent_action_count = len(sub_sequent_actions)
+        subsequent_action_array = ARRAY(
+            ric_subsequent_action_t, subsequent_action_count)()
+        for idx in range(subsequent_action_count):
+            subsequent_action_array[idx].is_valid = sub_sequent_actions[idx].is_valid
+            subsequent_action_array[idx].subsequent_action_type = sub_sequent_actions[idx].subsequent_action_type
+            subsequent_action_array[idx].time_to_wait = sub_sequent_actions[idx].time_to_wait
+
+        buf = ARRAY(c_uint8, 1024)()
+        size: int = _asn1_encode_subReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number),
+                                           c_long(ran_function_id), event_trigger_definition_array, c_size_t(
+                                               event_definition_count), c_size_t(action_count), action_id_array,
+                                           action_type_array, acttion_definition_array, subsequent_action_array)
+        if size < 0:
+            raise Exception("Could not create payload.")
+
+        return size, bytes(buf)
+
+
+_asn1_encode_controlReqMsg = _wrap_asn1_function('e2ap_encode_ric_control_request_message', c_ssize_t, [
+                                                 c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_void_p, c_size_t, c_void_p, c_size_t, c_long])
+
+
+class ControlRequestMsg:
+    """
+    A class that provides a function to make payload of e2ap RICControlRequestMessage.
+    """
+
+    def __init__(self):
+        return
+
+    def encode(self, requestor_id: int, request_sequence_number: int,
+               ran_function_id: int, call_process_id: bytes,
+               control_header: bytes, control_message: bytes,
+               control_ack_request: int) -> Tuple[int, bytes]:
+        """
+        Function that creates and returns a payload
+        according to e2ap's RICControlRequestMessage.
+
+        Raise Exception when the payload of RICControlRequestMessage cannot be created.
+
+        Parameters
+        ----------
+        requestor_id: int
+        request_sequence_number: int
+        ran_function_id: int
+        call_process_id: bytes
+        control_header: bytes
+        control_message: bytes
+        control_ack_request: int
+
+        Returns
+        Tuple[int, bytes]
+          int :
+            RICControlRequestMessage type payload length
+          bytes :
+            RICControlRequestMessage type payload
+        -------
+        """
+        call_process_id_buffer = ARRAY(c_uint8, len(call_process_id))()
+        for idx in range(len(call_process_id)):
+            call_process_id_buffer[idx] = c_uint8(call_process_id[idx])
+
+        call_header_buffer = ARRAY(c_uint8, len(control_header))()
+        for idx in range(len(control_header)):
+            call_header_buffer[idx] = c_uint8(control_header[idx])
+
+        call_message_buffer = ARRAY(c_uint8, len(control_message))()
+        for idx in range(len(control_message)):
+            call_message_buffer[idx] = c_uint8(control_message[idx])
+
+        buf = ARRAY(c_uint8, 1024)()
+        size: int = _asn1_encode_controlReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number), c_long(ran_function_id), call_process_id_buffer, c_size_t(
+            len(call_process_id_buffer)), call_header_buffer, c_size_t(len(call_header_buffer)), call_message_buffer, c_size_t(len(call_message_buffer)), c_long(control_ack_request))
+        if size < 0:
+            raise Exception("Could not create payload.")
+
+        return size, bytes(buf)
diff --git a/ricxappframe/e2ap/asn1clib/__init__.py b/ricxappframe/e2ap/asn1clib/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/ricxappframe/e2ap/asn1clib/asn1clib.py b/ricxappframe/e2ap/asn1clib/asn1clib.py
new file mode 100644 (file)
index 0000000..b945e8b
--- /dev/null
@@ -0,0 +1,22 @@
+# *******************************************************************************
+#  * Copyright 2020 Samsung Electronics All Rights Reserved.
+#  *
+#  * Licensed under the Apache License, Version 2.0 (the "License");
+#  * you may not use this file except in compliance with the License.
+#  * You may obtain a copy of the License at
+#  *
+#  * http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  *
+#  *******************************************************************************
+import ctypes
+
+# https://docs.python.org/3.7/library/ctypes.html
+# https://stackoverflow.com/questions/2327344/ctypes-loading-a-c-shared-library-that-has-dependencies/30845750#30845750
+# make sure you do a set -x LD_LIBRARY_PATH /usr/local/lib/;
+asn1_c_lib = ctypes.CDLL("libriclibe2ap.so", mode=ctypes.RTLD_GLOBAL)
diff --git a/ricxappframe/e2ap/asn1clib/types.py b/ricxappframe/e2ap/asn1clib/types.py
new file mode 100644 (file)
index 0000000..4b4a379
--- /dev/null
@@ -0,0 +1,174 @@
+# *******************************************************************************
+#  * Copyright 2020 Samsung Electronics All Rights Reserved.
+#  *
+#  * Licensed under the Apache License, Version 2.0 (the "License");
+#  * you may not use this file except in compliance with the License.
+#  * You may obtain a copy of the License at
+#  *
+#  * http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  *
+#  *******************************************************************************
+from ctypes import POINTER, Structure
+from ctypes import c_long, c_size_t, c_int, c_uint8
+
+
+class indication_msg_t(Structure):
+    """
+    A class that mirrored E2AP's RICIndicationMessage with python
+
+    c-type struct of RICIndicationMessage
+    ---------------------------------------
+    typedef struct RICindicationMessage {
+        long requestorID;
+        long requestSequenceNumber;
+        long ranfunctionID;
+        long actionID;
+        long indicationSN;
+        long indicationType;
+        uint8_t *indicationHeader;
+        size_t indicationHeaderSize;
+        uint8_t *indicationMessage;
+        size_t indicationMessageSize;
+        uint8_t *callProcessID;
+        size_t callProcessIDSize;
+    } RICindicationMsg;
+    ---------------------------------------
+    """
+    _fields_ = [
+        ("request_id", c_long),
+        ("request_sequence_number", c_long),
+        ("function_id", c_long),
+        ("action_id", c_long),
+        ("indication_sequence_number", c_long),
+        ("indication_type", c_long),
+        ("indication_header", POINTER(c_uint8)),
+        ("indication_header_length", c_size_t),
+        ("indication_message", POINTER(c_uint8)),
+        ("indication_message_length", c_size_t),
+        ("call_process_id", POINTER(c_uint8)),
+        ("call_process_id_length", c_size_t),
+    ]
+
+
+class causeItem_msg_t(Structure):
+    """
+    A class that mirrored E2AP's RICcauseItem with python
+
+    c-type struct of RICcauseItem
+    -----------------------------
+    typedef struct RICcauseItem {
+        int ricCauseType;
+        long ricCauseID;
+    } RICcauseItem;
+    -----------------------------
+    """
+    _fields_ = [
+        ("cause_type", c_int),
+        ("cause_id", c_long),
+    ]
+
+
+class actionAdmittedList_msg_t(Structure):
+    """
+    A class that mirrored E2AP's RICactionAdmittedList with python
+
+    c-type struct of RICactionAdmittedList
+    --------------------------------------
+    typedef struct RICactionAdmittedList {
+        long ricActionID[16];
+        int count;
+    } RICactionAdmittedList;
+    --------------------------------------
+    """
+    _fields_ = [
+        ("request_id", c_long * 16),
+        ("count", c_int),
+    ]
+
+
+class actionNotAdmittedList_msg_t(Structure):
+    """
+    A class that mirrored E2AP's RICactionNotAdmittedList with python
+
+    c-type struct of RICactionNotAdmittedList
+    -------------------------------------------------------
+    typedef struct RICactionNotAdmittedList {
+        long ricActionID[16];
+        RICcauseItem ricCause[16];
+        int count;
+    } RICactionNotAdmittedList;
+    -------------------------------------------------------
+    """
+    _fields_ = [
+        ("request_id", c_long * 16),
+        ("cause", causeItem_msg_t * 16),
+        ("count", c_int),
+    ]
+
+
+class subResp_msg_t(Structure):
+    """
+    A class that mirrored E2AP's subscriptionResponseMessage with python
+
+    c-type struct of subscriptionResponseMessage
+    -------------------------------------------------------
+    typedef struct RICsubscriptionResponseMessage {
+        long requestorID;
+        long requestSequenceNumber;
+        long ranfunctionID;
+        RICactionAdmittedList ricActionAdmittedList;
+        RICactionNotAdmittedList ricActionNotAdmittedList;
+    } RICsubscriptionResponseMsg;
+    -------------------------------------------------------
+    """
+    _fields_ = [
+        ("request_id", c_long),
+        ("request_sequence_number", c_long),
+        ("function_id", c_long),
+        ("action_admitted_list", actionAdmittedList_msg_t),
+        ("action_not_admitted_list", actionNotAdmittedList_msg_t),
+    ]
+
+
+class ric_action_definition_t(Structure):
+    """
+    A class that mirrored E2AP's RICactionDefinition with python
+
+    c-type struct of RICactionDefinition
+    -------------------------------------------------------
+    typedef struct RICactionDefinition {
+        uint8_t *actionDefinition;
+        int size;
+    } RICactionDefinition;
+    -------------------------------------------------------
+    """
+    _fields_ = [
+        ("action_definition", POINTER(c_uint8)),
+        ("size", c_int),
+    ]
+
+
+class ric_subsequent_action_t(Structure):
+    """
+    A class that mirrored E2AP's RICSubsequentAction with python
+
+    c-type struct of RICSubsequentAction
+    -------------------------------------------------------
+    typedef struct RICSubsequentAction {
+        int isValid;
+        long subsequentActionType;
+        long timeToWait;
+    } RICSubsequentAction;
+    -------------------------------------------------------
+    """
+    _fields_ = [
+        ("is_valid", c_int),
+        ("subsequent_action_type", c_long),
+        ("time_to_wait", c_long),
+    ]
diff --git a/tests/test_e2ap.py b/tests/test_e2ap.py
new file mode 100644 (file)
index 0000000..99f6cd8
--- /dev/null
@@ -0,0 +1,340 @@
+# *******************************************************************************
+#  * Copyright 2020 Samsung Electronics All Rights Reserved.
+#  *
+#  * Licensed under the Apache License, Version 2.0 (the "License");
+#  * you may not use this file except in compliance with the License.
+#  * You may obtain a copy of the License at
+#  *
+#  * http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  *
+#  *******************************************************************************
+from ricxappframe.e2ap.asn1 import IndicationMsg, SubResponseMsg, SubRequestMsg, ControlRequestMsg, ActionDefinition, SubsequentAction, ARRAY, c_uint8
+
+"""
+fake class for c-type Structure
+"""
+
+
+class indication_msg_type:
+    def __init__(self):
+        self.contents = _indication_contents()
+
+
+class _indication_contents:
+    def __init__(self):
+        self.request_id = 0
+        self.request_sequence_number = 0
+        self.function_id = 0
+        self.action_id = 0
+        self.indication_sequence_number = 0
+        self.indication_type = 0
+        self.indication_header = ARRAY(c_uint8, 1)()
+        self.indication_header_length = 1
+        self.indication_message = ARRAY(c_uint8, 1)()
+        self.indication_message_length = 1
+        self.call_process_id = ARRAY(c_uint8, 1)()
+        self.call_process_id_length = 1
+
+
+class actionAdmittedList_msg_type:
+    def __init__(self):
+        self.request_id = []
+        self.count = 0
+
+
+class causeItem_msg_type:
+    def __init__(self):
+        self.cause_type = 0
+        self.cause_id = 0
+
+
+class actionNotAdmittedList_msg_type:
+    def __init__(self):
+        self.request_id = []
+        self.cause = []
+        self.count = 0
+
+
+class subResp_msg_type:
+    def __init__(self):
+        self.contents = _subResp_contents()
+
+
+class _subResp_contents:
+
+    def __init__(self):
+        self.request_id = 0
+        self.request_sequence_number = 0
+        self.function_id = 0
+        self.action_admitted_list = actionAdmittedList_msg_type()
+        self.action_not_admitted_list = actionNotAdmittedList_msg_type()
+
+
+def test_call_decode_indication_and_clib_return_none_expect_error_raise(monkeypatch):
+    '''
+    test the decode of IndicationMsg class with invalid payload from rmr
+    '''
+    def mock_decode_return_none(payload: bytes, size):
+        return None
+
+    monkeypatch.setattr("ricxappframe.e2ap.asn1._asn1_decode_indicationMsg",
+                        mock_decode_return_none)
+
+    indication = IndicationMsg()
+
+    try:
+        indication.decode(bytes(0))
+        assert False
+    except BaseException:
+        assert True
+
+
+def test_call_decode_indication_expect_success(monkeypatch):
+    '''
+    test the decode of IndicationMsg class
+    '''
+    def mock_decode_return_valid_indication(payload: bytes, size: int):
+        indication_msg = indication_msg_type()
+        indication_msg.contents.request_id = 1
+        indication_msg.contents.request_sequence_number = 1
+        indication_msg.contents.function_id = 1
+        indication_msg.contents.action_id = 1
+        indication_msg.contents.indication_sequence_number = 1
+        indication_msg.contents.indication_type = 1
+
+        indication_header = ARRAY(c_uint8, 1)()
+        indication_message = ARRAY(c_uint8, 1)()
+        call_process_id = ARRAY(c_uint8, 1)()
+
+        indication_msg.contents.indication_header = indication_header
+        indication_msg.contents.indication_message = indication_message
+        indication_msg.contents.call_process_id = call_process_id
+        return indication_msg
+
+    monkeypatch.setattr("ricxappframe.e2ap.asn1._asn1_decode_indicationMsg",
+                        mock_decode_return_valid_indication)
+
+    indication = IndicationMsg()
+    try:
+        indication.decode(bytes(0))
+        assert indication.request_id == 1
+        assert indication.request_sequence_number == 1
+        assert indication.indication_type == 1
+        assert indication.function_id == 1
+        assert indication.action_id == 1
+        assert indication.indication_sequence_number == 1
+        assert indication.indication_header == bytes(b'\x00')
+        assert indication.indication_message == bytes(b'\x00')
+        assert indication.call_process_id == bytes(b'\x00')
+    except BaseException:
+        assert False
+
+
+def test_call_decode_sub_response_and_clib_return_none_expect_error_raise(monkeypatch):
+    '''
+    test the decode of SubResponseMsg class with invalid payload from rmr
+    '''
+    def mock_decode_return_none(payload: bytes, size):
+        return None
+
+    monkeypatch.setattr("ricxappframe.e2ap.asn1._asn1_decode_subRespMsg",
+                        mock_decode_return_none)
+
+    sub_response = SubResponseMsg()
+    try:
+        sub_response.decode(bytes(0))
+        assert False
+    except BaseException:
+        assert True
+
+
+def test_call_decode_sub_response_expect_success(monkeypatch):
+    '''
+    test the decode of SubResponseMsg class
+    '''
+    def mock_decode_return_valid_sub_response(payload: bytes, size: int):
+        subResp_msg = subResp_msg_type()
+        subResp_msg.contents.request_id = 1
+        subResp_msg.contents.request_sequence_number = 1
+        subResp_msg.contents.function_id = 1
+
+        action_admitted_list_msg = actionAdmittedList_msg_type()
+        action_admitted_list_msg.request_id = [1]
+        action_admitted_list_msg.count = 1
+
+        casue_item_msg = causeItem_msg_type()
+        casue_item_msg.cause_id = 1
+        casue_item_msg.cause_type = 1
+
+        action_not_admitted_list_msg = actionNotAdmittedList_msg_type()
+        action_not_admitted_list_msg.request_id = [1]
+        action_not_admitted_list_msg.count = 1
+        action_not_admitted_list_msg.cause = [casue_item_msg]
+
+        subResp_msg.contents.action_admitted_list = action_admitted_list_msg
+        subResp_msg.contents.action_not_admitted_list = action_not_admitted_list_msg
+
+        return subResp_msg
+
+    monkeypatch.setattr("ricxappframe.e2ap.asn1._asn1_decode_subRespMsg",
+                        mock_decode_return_valid_sub_response)
+
+    sub_response = SubResponseMsg()
+    try:
+        sub_response.decode(bytes(0))
+        assert sub_response.request_id == 1
+        assert sub_response.request_sequence_number == 1
+        assert sub_response.function_id == 1
+        assert sub_response.action_admitted_list.request_id[0] == 1
+        assert sub_response.action_admitted_list.count == 1
+        assert sub_response.action_not_admitted_list.request_id[0] == 1
+        assert sub_response.action_not_admitted_list.count == 1
+        assert sub_response.action_not_admitted_list.cause[0].cause_id == 1
+        assert sub_response.action_not_admitted_list.cause[0].cause_type == 1
+    except BaseException:
+        assert False
+
+
+def test_call_encode_sub_request_and_clib_return_error_expect_error_raise(monkeypatch):
+    '''
+    test the encode of SubRequestMsg class with invalid param
+    '''
+    def mock_encode_return_error(buf, buf_size, requestor_id, request_sequence_number,
+                                 ran_function_id, event_trigger_definition_array, event_definition_count,
+                                 action_count, action_id_array, action_type_array, acttion_definition_array,
+                                 subsequent_action_array):
+        return -1
+
+    monkeypatch.setattr("ricxappframe.e2ap.asn1._asn1_encode_subReqMsg",
+                        mock_encode_return_error)
+
+    sub_request = SubRequestMsg()
+    try:
+        sub_request.encode(1, 1, 1, bytes([1]), [1], [1], [], [])
+        assert False
+    except BaseException:
+        assert True
+
+
+def test_call_encode_sub_request_expect_success(monkeypatch):
+    '''
+    test the encode of SubRequestMsg class
+    '''
+    def mock_encode_return_success(buf, buf_size, requestor_id, request_sequence_number,
+                                   ran_function_id, event_trigger_definition_array, event_definition_count,
+                                   action_count, action_id_array, action_type_array, acttion_definition_array,
+                                   subsequent_action_array):
+        assert buf_size.value == 1024
+        assert requestor_id.value == 1
+        assert request_sequence_number.value == 1
+        assert ran_function_id.value == 1
+        assert event_trigger_definition_array[0] == 1
+        assert event_definition_count.value == 1
+        assert action_count.value == 1
+        assert action_type_array[0] == 1
+        assert acttion_definition_array[0].action_definition[0] == 1
+        assert acttion_definition_array[0].size == 1
+        assert subsequent_action_array[0].is_valid == 1
+        assert subsequent_action_array[0].subsequent_action_type == 1
+        assert subsequent_action_array[0].time_to_wait == 1
+        return 1
+
+    monkeypatch.setattr("ricxappframe.e2ap.asn1._asn1_encode_subReqMsg",
+                        mock_encode_return_success)
+
+    action_definitions = list()
+
+    action_definition = ActionDefinition()
+    action_definition.action_definition = bytes([1])
+    action_definition.size = len(action_definition.action_definition)
+
+    action_definitions.append(action_definition)
+
+    subsequent_actions = list()
+
+    subsequent_action = SubsequentAction()
+    subsequent_action.is_valid = 1
+    subsequent_action.subsequent_action_type = 1
+    subsequent_action.time_to_wait = 1
+
+    subsequent_actions.append(subsequent_action)
+    sub_request = SubRequestMsg()
+    try:
+        sub_request.encode(1, 1, 1, bytes([1]), [1], [1],
+                           action_definitions, subsequent_actions)
+    except BaseException:
+        assert False
+
+
+def test_call_encode_control_request_and_clib_return_error_expect_error_raise(monkeypatch):
+    '''
+    test the encode of ControlRequestMsg class with invalid param
+    '''
+    def mock_encode_return_error(buf, buf_size, requestor_id, request_sequence_number,
+                                 ran_function_id, event_trigger_definition_array, event_definition_count,
+                                 action_count, action_id_array, action_type_array, acttion_definition_array,
+                                 subsequent_action_array):
+        return -1
+
+    monkeypatch.setattr("ricxappframe.e2ap.asn1._asn1_encode_controlReqMsg",
+                        mock_encode_return_error)
+
+    control_request = ControlRequestMsg()
+    try:
+        control_request.encode(1, 1, 1, bytes([1]), bytes([1]), bytes([1]), 1)
+        assert False
+    except BaseException:
+        assert True
+
+
+def test_call_encode_control_request_expect_success(monkeypatch):
+    '''
+    test the encode of ControlRequestMsg class
+    '''
+    def mock_encode_return_success(buf, buf_size, requestor_id, request_sequence_number,
+                                   ran_function_id, call_process_id_buffer, call_process_id_buffer_count,
+                                   call_header_buffer, call_header_buffer_count, call_message_buffer, call_message_buffer_count,
+                                   control_ack_request):
+        assert buf_size.value == 1024
+        assert requestor_id.value == 1
+        assert request_sequence_number.value == 1
+        assert ran_function_id.value == 1
+        assert call_process_id_buffer[0] == 1
+        assert call_process_id_buffer_count.value == 1
+        assert call_header_buffer[0] == 1
+        assert call_header_buffer_count.value == 1
+        assert call_message_buffer[0] == 1
+        assert call_message_buffer_count.value == 1
+        assert control_ack_request.value == 1
+        return 1
+
+    monkeypatch.setattr("ricxappframe.e2ap.asn1._asn1_encode_controlReqMsg",
+                        mock_encode_return_success)
+
+    action_definitions = list()
+
+    action_definition = ActionDefinition()
+    action_definition.action_definition = bytes([1])
+    action_definition.size = len(action_definition.action_definition)
+
+    action_definitions.append(action_definition)
+
+    subsequent_actions = list()
+
+    subsequent_action = SubsequentAction()
+    subsequent_action.is_valid = 1
+    subsequent_action.subsequent_action_type = 1
+    subsequent_action.time_to_wait = 1
+
+    subsequent_actions.append(subsequent_action)
+    control_request = ControlRequestMsg()
+    try:
+        control_request.encode(1, 1, 1, bytes([1]), bytes([1]), bytes([1]), 1)
+    except BaseException:
+        assert False