Add E2AP package that supports asn1 encoding/decoding function for E2AP.
[ric-plt/xapp-frame-py.git] / ricxappframe / e2ap / asn1.py
diff --git a/ricxappframe/e2ap/asn1.py b/ricxappframe/e2ap/asn1.py
new file mode 100644 (file)
index 0000000..d7afe72
--- /dev/null
@@ -0,0 +1,465 @@
+# *******************************************************************************
+#  * Copyright 2020 Samsung Electronics All Rights Reserved.
+#  *
+#  * Licensed under the Apache License, Version 2.0 (the "License");
+#  * you may not use this file except in compliance with the License.
+#  * You may obtain a copy of the License at
+#  *
+#  * http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  *
+#  *******************************************************************************
+from sys import getsizeof
+from typing import List, Tuple
+from ctypes import POINTER, ARRAY
+from ctypes import c_ulong, c_void_p, c_long, c_size_t, c_int, c_ssize_t, c_uint8
+from ricxappframe.e2ap.asn1clib.asn1clib import asn1_c_lib
+from ricxappframe.e2ap.asn1clib.types import indication_msg_t, subResp_msg_t, ric_action_definition_t, ric_subsequent_action_t
+
+
+def _wrap_asn1_function(funcname, restype, argtypes):
+    """
+    Simplify wrapping ctypes functions.
+
+    Parameters
+    ----------
+    funcname: str
+        Name of library method
+    restype: class
+        Name of ctypes class; e.g., c_char_p
+    argtypes: list
+        List of ctypes classes; e.g., [ c_char_p, int ]
+
+    Returns
+    -------
+    _FuncPointer:
+        Pointer to C library function
+    """
+    func = asn1_c_lib.__getattr__(funcname)
+    func.restype = restype
+    func.argtypes = argtypes
+    return func
+
+
+_asn1_decode_indicationMsg = _wrap_asn1_function(
+    'e2ap_decode_ric_indication_message', POINTER(indication_msg_t), [c_void_p, c_ulong])
+_asn1_free_indicationMsg = _wrap_asn1_function(
+    'e2ap_free_decoded_ric_indication_message', c_void_p, [POINTER(indication_msg_t)])
+
+
+class IndicationMsg:
+    """
+    A class of E2AP's RICIndicationMessage
+    """
+    __slots__ = ('__request_id', '__request_sequence_number', '__function_id', '__action_id',
+                 '__indication_sequence_number', '__indication_type', '__indication_header', '__indication_message',
+                 '__call_process_id')
+    __request_id: int
+    __request_sequence_number: int
+    __function_id: int
+    __action_id: int
+    __indication_sequence_number: int
+    __indication_type: int
+    __indication_header: bytes
+    __indication_message: bytes
+    __call_process_id: bytes
+
+    def __init__(self):
+        return
+
+    @property
+    def request_id(self):
+        return self.__request_id
+
+    @property
+    def request_sequence_number(self):
+        return self.__request_sequence_number
+
+    @property
+    def function_id(self):
+        return self.__function_id
+
+    @property
+    def action_id(self):
+        return self.__action_id
+
+    @property
+    def indication_sequence_number(self):
+        return self.__indication_sequence_number
+
+    @property
+    def indication_type(self):
+        return self.__indication_type
+
+    @property
+    def indication_header(self):
+        return self.__indication_header
+
+    @property
+    def indication_message(self):
+        return self.__indication_message
+
+    @property
+    def call_process_id(self):
+        return self.__call_process_id
+
+    def decode(self, payload: c_void_p):
+        """
+        Function that sets fields of IndicationMsg class
+        through msg payload (bytes) of RICIndication type.
+
+        Raise Exception when payload is not RICIndication.
+
+        Parameters
+        ----------
+        payload: c_void_p
+            RICIndication type payload received via rmr
+
+        Returns
+        -------
+        """
+        indication: indication_msg_t = _asn1_decode_indicationMsg(
+            payload, getsizeof(payload))
+
+        if indication is None:
+            raise Exception("Payload is not matched with RICIndication")
+        indication.contents.request_id = 1
+        self.__request_id = indication.contents.request_id
+        self.__request_sequence_number = indication.contents.request_sequence_number
+        self.__function_id = indication.contents.function_id
+        self.__action_id = indication.contents.action_id
+        self.__indication_sequence_number = indication.contents.indication_sequence_number
+        self.__indication_type = indication.contents.indication_type
+        self.__indication_header = bytes(
+            indication.contents.indication_header[:indication.contents.indication_header_length])
+        self.__indication_message = bytes(
+            indication.contents.indication_message[:indication.contents.indication_message_length])
+        self.__call_process_id = bytes(
+            indication.contents.call_process_id[:indication.contents.call_process_id_length])
+
+        # _asn1_free_indicationMsg(indication)
+        return
+
+
+class CauseItem:
+    __slots__ = ('__cause_type', '__cause_id')
+    __cause_type: int
+    __cause_id: int
+
+    def __init__(self, causeType: int, causeID: int):
+        self.__cause_type = causeType
+        self.__cause_id = causeID
+        return
+
+    @property
+    def cause_type(self):
+        return self.__cause_type
+
+    @property
+    def cause_id(self):
+        return self.__cause_id
+
+
+class ActionAdmittedList:
+    __slots__ = ('__request_id', '__count')
+    __request_id: List[int]
+    __count: int
+
+    def __init__(self, request_id: List[int], count: int):
+        self.__request_id = request_id
+        self.__count = count
+        return
+
+    @property
+    def request_id(self):
+        return self.__request_id
+
+    @property
+    def count(self):
+        return self.__count
+
+
+class ActionNotAdmittedList:
+    __slots__ = ('__request_id', '__cause', '__count')
+    __request_id: List[int]
+    __cause: List[CauseItem]
+    __count: int
+
+    def __init__(self, request_id: List[int], cause: List[CauseItem], count: int):
+        self.__request_id = request_id
+        self.__cause = cause
+        self.__count = count
+        return
+
+    @property
+    def request_id(self):
+        return self.__request_id
+
+    @property
+    def cause(self):
+        return self.__cause
+
+    @property
+    def count(self):
+        return self.__count
+
+
+_asn1_decode_subRespMsg = _wrap_asn1_function(
+    'e2ap_decode_ric_subscription_response_message', POINTER(subResp_msg_t), [c_void_p, c_ulong])
+
+
+class SubResponseMsg:
+    """
+    A class of E2AP's RICsubscriptionResponseMessage
+    """
+    __slots__ = ('__request_id', '__request_sequence_number', '__function_id',
+                 '__action_admitted_list', '__action_not_admitted_list')
+    __request_id: int
+    __request_sequence_number: int
+    __function_id: int
+    __action_admitted_list: ActionAdmittedList
+    __action_not_admitted_list: ActionNotAdmittedList
+
+    @property
+    def request_id(self):
+        return self.__request_id
+
+    @property
+    def request_sequence_number(self):
+        return self.__request_sequence_number
+
+    @property
+    def function_id(self):
+        return self.__function_id
+
+    @property
+    def action_admitted_list(self):
+        return self.__action_admitted_list
+
+    @property
+    def action_not_admitted_list(self):
+        return self.__action_not_admitted_list
+
+    def __init__(self):
+        return
+
+    def decode(self, payload: c_void_p):
+        """
+        Function that sets fields of SubRespMsg class
+        through msg payload (bytes) of RICsubscriptionResponseMessage type.
+
+        Raise Exception when payload is not RICsubscriptionResponseMessage.
+
+        Parameters
+        ----------
+        payload: c_void_p
+            RICsubscriptionResponseMessage type payload received via rmr
+
+        Returns
+        -------
+        """
+        subResp: subResp_msg_t = _asn1_decode_subRespMsg(
+            payload, getsizeof(payload))
+
+        if subResp is None:
+            raise Exception(
+                "Payload is not matched with RICsubscriptionResponseMessage")
+
+        self.__request_id = subResp.contents.request_id
+        self.__request_sequence_number = subResp.contents.request_sequence_number
+        self.__function_id = subResp.contents.function_id
+        self.__action_admitted_list = ActionAdmittedList(subResp.contents.action_admitted_list.request_id,
+                                                         subResp.contents.action_admitted_list.count)
+        causeList = [CauseItem(item.cause_type, item.cause_id)
+                     for item in subResp.contents.action_not_admitted_list.cause]
+        self.__action_not_admitted_list = ActionNotAdmittedList(subResp.contents.action_not_admitted_list.request_id, causeList,
+                                                                subResp.contents.action_not_admitted_list.count)
+        return
+
+
+class ActionDefinition:
+    """
+    A class that mirrored E2AP's RICactionDefinition with python
+    """
+    __slots__ = ('action_definition', 'size')
+
+    action_definition: bytes
+    size: int
+
+    def __init__(self):
+        self.action_definition = []
+        self.size = 0
+        return
+
+
+class SubsequentAction:
+    """
+    A class that mirrored E2AP's RICSubsequentAction with python
+    """
+    __slots__ = ('is_valid', 'subsequent_action_type', 'time_to_wait')
+
+    is_valid: int
+    subsequent_action_type: int
+    time_to_wait: int
+
+    def __init__(self):
+        self.is_valid = 0
+        self.subsequent_action_type = 0
+        self.time_to_wait = 0
+        return
+
+
+_asn1_encode_subReqMsg = _wrap_asn1_function('e2ap_encode_ric_subscription_request_message', c_ssize_t, [
+                                             c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_size_t, POINTER(c_long), POINTER(c_long), POINTER(ric_action_definition_t), POINTER(ric_subsequent_action_t)])
+
+
+class SubRequestMsg:
+    """
+    A class that provides a function to make payload of e2ap RICSubscriptionRequestMessage.
+    """
+
+    def __init__(self):
+        return
+
+    def encode(self, requestor_id: int, request_sequence_number: int,
+               ran_function_id: int, event_trigger_definition: bytes, action_ids: List[int],
+               action_types: List[int], action_definitions: List[ActionDefinition],
+               sub_sequent_actions: List[SubsequentAction]) -> Tuple[int, bytes]:
+        """
+        Function that creates and returns a payload
+        according to e2ap's RICSubscriptionRequestMessage.
+
+        Raise Exception when the payload of RICSubscriptionRequestMessage cannot be created.
+
+        Parameters
+        ----------
+        requestor_id: int
+        request_sequence_number: int
+        ran_function_id: int
+        event_trigger_definition: bytes
+        action_ids: List[int]
+        action_types: List[int]
+        action_definitions: List[ActionDefinition]
+        sub_sequent_actions: List[SubsequentAction]
+
+        Returns
+        Tuple[int, bytes]
+          int :
+            RICSubscriptionRequestMessage type payload length
+          bytes :
+            RICSubscriptionRequestMessage type payload
+        -------
+        """
+        action_count = len(action_ids)
+        action_id_array = ARRAY(c_long, action_count)()
+        for idx in range(action_count):
+            action_id_array[idx] = c_long(action_ids[idx])
+
+        action_type_count = len(action_types)
+        action_type_array = ARRAY(c_long, action_type_count)()
+        for idx in range(action_type_count):
+            action_type_array[idx] = c_long(action_types[idx])
+
+        event_definition_count = len(event_trigger_definition)
+        event_trigger_definition_array = ARRAY(
+            c_uint8, event_definition_count)()
+        for idx in range(event_definition_count):
+            event_trigger_definition_array[idx] = c_uint8(
+                event_trigger_definition[idx])
+
+        action_definition_count = len(action_definitions)
+        acttion_definition_array = ARRAY(
+            ric_action_definition_t, action_definition_count)()
+        for idx in range(action_definition_count):
+            action_definition_buffer = ARRAY(
+                c_uint8, action_definitions[idx].size)()
+            for buf_idx in range(action_definitions[idx].size):
+                action_definition_buffer[buf_idx] = c_uint8(
+                    action_definitions[idx].action_definition[buf_idx])
+            acttion_definition_array[idx].action_definition = action_definition_buffer
+            acttion_definition_array[idx].size = c_int(
+                action_definitions[idx].size)
+
+        subsequent_action_count = len(sub_sequent_actions)
+        subsequent_action_array = ARRAY(
+            ric_subsequent_action_t, subsequent_action_count)()
+        for idx in range(subsequent_action_count):
+            subsequent_action_array[idx].is_valid = sub_sequent_actions[idx].is_valid
+            subsequent_action_array[idx].subsequent_action_type = sub_sequent_actions[idx].subsequent_action_type
+            subsequent_action_array[idx].time_to_wait = sub_sequent_actions[idx].time_to_wait
+
+        buf = ARRAY(c_uint8, 1024)()
+        size: int = _asn1_encode_subReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number),
+                                           c_long(ran_function_id), event_trigger_definition_array, c_size_t(
+                                               event_definition_count), c_size_t(action_count), action_id_array,
+                                           action_type_array, acttion_definition_array, subsequent_action_array)
+        if size < 0:
+            raise Exception("Could not create payload.")
+
+        return size, bytes(buf)
+
+
+_asn1_encode_controlReqMsg = _wrap_asn1_function('e2ap_encode_ric_control_request_message', c_ssize_t, [
+                                                 c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_void_p, c_size_t, c_void_p, c_size_t, c_long])
+
+
+class ControlRequestMsg:
+    """
+    A class that provides a function to make payload of e2ap RICControlRequestMessage.
+    """
+
+    def __init__(self):
+        return
+
+    def encode(self, requestor_id: int, request_sequence_number: int,
+               ran_function_id: int, call_process_id: bytes,
+               control_header: bytes, control_message: bytes,
+               control_ack_request: int) -> Tuple[int, bytes]:
+        """
+        Function that creates and returns a payload
+        according to e2ap's RICControlRequestMessage.
+
+        Raise Exception when the payload of RICControlRequestMessage cannot be created.
+
+        Parameters
+        ----------
+        requestor_id: int
+        request_sequence_number: int
+        ran_function_id: int
+        call_process_id: bytes
+        control_header: bytes
+        control_message: bytes
+        control_ack_request: int
+
+        Returns
+        Tuple[int, bytes]
+          int :
+            RICControlRequestMessage type payload length
+          bytes :
+            RICControlRequestMessage type payload
+        -------
+        """
+        call_process_id_buffer = ARRAY(c_uint8, len(call_process_id))()
+        for idx in range(len(call_process_id)):
+            call_process_id_buffer[idx] = c_uint8(call_process_id[idx])
+
+        call_header_buffer = ARRAY(c_uint8, len(control_header))()
+        for idx in range(len(control_header)):
+            call_header_buffer[idx] = c_uint8(control_header[idx])
+
+        call_message_buffer = ARRAY(c_uint8, len(control_message))()
+        for idx in range(len(control_message)):
+            call_message_buffer[idx] = c_uint8(control_message[idx])
+
+        buf = ARRAY(c_uint8, 1024)()
+        size: int = _asn1_encode_controlReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number), c_long(ran_function_id), call_process_id_buffer, c_size_t(
+            len(call_process_id_buffer)), call_header_buffer, c_size_t(len(call_header_buffer)), call_message_buffer, c_size_t(len(call_message_buffer)), c_long(control_ack_request))
+        if size < 0:
+            raise Exception("Could not create payload.")
+
+        return size, bytes(buf)