X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricxappframe%2Fe2ap%2Fasn1.py;fp=ricxappframe%2Fe2ap%2Fasn1.py;h=d7afe72fe90515d9ee220efc14aec1132961e2bb;hb=26f15d649bf3687b2b6864f0e05c9fb88eec5456;hp=0000000000000000000000000000000000000000;hpb=6b6b79b0ac66347c3e6b70fd82796390d0ddddf1;p=ric-plt%2Fxapp-frame-py.git diff --git a/ricxappframe/e2ap/asn1.py b/ricxappframe/e2ap/asn1.py new file mode 100644 index 0000000..d7afe72 --- /dev/null +++ b/ricxappframe/e2ap/asn1.py @@ -0,0 +1,465 @@ +# ******************************************************************************* +# * Copyright 2020 Samsung Electronics All Rights Reserved. +# * +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +# ******************************************************************************* +from sys import getsizeof +from typing import List, Tuple +from ctypes import POINTER, ARRAY +from ctypes import c_ulong, c_void_p, c_long, c_size_t, c_int, c_ssize_t, c_uint8 +from ricxappframe.e2ap.asn1clib.asn1clib import asn1_c_lib +from ricxappframe.e2ap.asn1clib.types import indication_msg_t, subResp_msg_t, ric_action_definition_t, ric_subsequent_action_t + + +def _wrap_asn1_function(funcname, restype, argtypes): + """ + Simplify wrapping ctypes functions. + + Parameters + ---------- + funcname: str + Name of library method + restype: class + Name of ctypes class; e.g., c_char_p + argtypes: list + List of ctypes classes; e.g., [ c_char_p, int ] + + Returns + ------- + _FuncPointer: + Pointer to C library function + """ + func = asn1_c_lib.__getattr__(funcname) + func.restype = restype + func.argtypes = argtypes + return func + + +_asn1_decode_indicationMsg = _wrap_asn1_function( + 'e2ap_decode_ric_indication_message', POINTER(indication_msg_t), [c_void_p, c_ulong]) +_asn1_free_indicationMsg = _wrap_asn1_function( + 'e2ap_free_decoded_ric_indication_message', c_void_p, [POINTER(indication_msg_t)]) + + +class IndicationMsg: + """ + A class of E2AP's RICIndicationMessage + """ + __slots__ = ('__request_id', '__request_sequence_number', '__function_id', '__action_id', + '__indication_sequence_number', '__indication_type', '__indication_header', '__indication_message', + '__call_process_id') + __request_id: int + __request_sequence_number: int + __function_id: int + __action_id: int + __indication_sequence_number: int + __indication_type: int + __indication_header: bytes + __indication_message: bytes + __call_process_id: bytes + + def __init__(self): + return + + @property + def request_id(self): + return self.__request_id + + @property + def request_sequence_number(self): + return self.__request_sequence_number + + @property + def function_id(self): + return self.__function_id + + @property + def action_id(self): + return self.__action_id + + @property + def indication_sequence_number(self): + return self.__indication_sequence_number + + @property + def indication_type(self): + return self.__indication_type + + @property + def indication_header(self): + return self.__indication_header + + @property + def indication_message(self): + return self.__indication_message + + @property + def call_process_id(self): + return self.__call_process_id + + def decode(self, payload: c_void_p): + """ + Function that sets fields of IndicationMsg class + through msg payload (bytes) of RICIndication type. + + Raise Exception when payload is not RICIndication. + + Parameters + ---------- + payload: c_void_p + RICIndication type payload received via rmr + + Returns + ------- + """ + indication: indication_msg_t = _asn1_decode_indicationMsg( + payload, getsizeof(payload)) + + if indication is None: + raise Exception("Payload is not matched with RICIndication") + indication.contents.request_id = 1 + self.__request_id = indication.contents.request_id + self.__request_sequence_number = indication.contents.request_sequence_number + self.__function_id = indication.contents.function_id + self.__action_id = indication.contents.action_id + self.__indication_sequence_number = indication.contents.indication_sequence_number + self.__indication_type = indication.contents.indication_type + self.__indication_header = bytes( + indication.contents.indication_header[:indication.contents.indication_header_length]) + self.__indication_message = bytes( + indication.contents.indication_message[:indication.contents.indication_message_length]) + self.__call_process_id = bytes( + indication.contents.call_process_id[:indication.contents.call_process_id_length]) + + # _asn1_free_indicationMsg(indication) + return + + +class CauseItem: + __slots__ = ('__cause_type', '__cause_id') + __cause_type: int + __cause_id: int + + def __init__(self, causeType: int, causeID: int): + self.__cause_type = causeType + self.__cause_id = causeID + return + + @property + def cause_type(self): + return self.__cause_type + + @property + def cause_id(self): + return self.__cause_id + + +class ActionAdmittedList: + __slots__ = ('__request_id', '__count') + __request_id: List[int] + __count: int + + def __init__(self, request_id: List[int], count: int): + self.__request_id = request_id + self.__count = count + return + + @property + def request_id(self): + return self.__request_id + + @property + def count(self): + return self.__count + + +class ActionNotAdmittedList: + __slots__ = ('__request_id', '__cause', '__count') + __request_id: List[int] + __cause: List[CauseItem] + __count: int + + def __init__(self, request_id: List[int], cause: List[CauseItem], count: int): + self.__request_id = request_id + self.__cause = cause + self.__count = count + return + + @property + def request_id(self): + return self.__request_id + + @property + def cause(self): + return self.__cause + + @property + def count(self): + return self.__count + + +_asn1_decode_subRespMsg = _wrap_asn1_function( + 'e2ap_decode_ric_subscription_response_message', POINTER(subResp_msg_t), [c_void_p, c_ulong]) + + +class SubResponseMsg: + """ + A class of E2AP's RICsubscriptionResponseMessage + """ + __slots__ = ('__request_id', '__request_sequence_number', '__function_id', + '__action_admitted_list', '__action_not_admitted_list') + __request_id: int + __request_sequence_number: int + __function_id: int + __action_admitted_list: ActionAdmittedList + __action_not_admitted_list: ActionNotAdmittedList + + @property + def request_id(self): + return self.__request_id + + @property + def request_sequence_number(self): + return self.__request_sequence_number + + @property + def function_id(self): + return self.__function_id + + @property + def action_admitted_list(self): + return self.__action_admitted_list + + @property + def action_not_admitted_list(self): + return self.__action_not_admitted_list + + def __init__(self): + return + + def decode(self, payload: c_void_p): + """ + Function that sets fields of SubRespMsg class + through msg payload (bytes) of RICsubscriptionResponseMessage type. + + Raise Exception when payload is not RICsubscriptionResponseMessage. + + Parameters + ---------- + payload: c_void_p + RICsubscriptionResponseMessage type payload received via rmr + + Returns + ------- + """ + subResp: subResp_msg_t = _asn1_decode_subRespMsg( + payload, getsizeof(payload)) + + if subResp is None: + raise Exception( + "Payload is not matched with RICsubscriptionResponseMessage") + + self.__request_id = subResp.contents.request_id + self.__request_sequence_number = subResp.contents.request_sequence_number + self.__function_id = subResp.contents.function_id + self.__action_admitted_list = ActionAdmittedList(subResp.contents.action_admitted_list.request_id, + subResp.contents.action_admitted_list.count) + causeList = [CauseItem(item.cause_type, item.cause_id) + for item in subResp.contents.action_not_admitted_list.cause] + self.__action_not_admitted_list = ActionNotAdmittedList(subResp.contents.action_not_admitted_list.request_id, causeList, + subResp.contents.action_not_admitted_list.count) + return + + +class ActionDefinition: + """ + A class that mirrored E2AP's RICactionDefinition with python + """ + __slots__ = ('action_definition', 'size') + + action_definition: bytes + size: int + + def __init__(self): + self.action_definition = [] + self.size = 0 + return + + +class SubsequentAction: + """ + A class that mirrored E2AP's RICSubsequentAction with python + """ + __slots__ = ('is_valid', 'subsequent_action_type', 'time_to_wait') + + is_valid: int + subsequent_action_type: int + time_to_wait: int + + def __init__(self): + self.is_valid = 0 + self.subsequent_action_type = 0 + self.time_to_wait = 0 + return + + +_asn1_encode_subReqMsg = _wrap_asn1_function('e2ap_encode_ric_subscription_request_message', c_ssize_t, [ + c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_size_t, POINTER(c_long), POINTER(c_long), POINTER(ric_action_definition_t), POINTER(ric_subsequent_action_t)]) + + +class SubRequestMsg: + """ + A class that provides a function to make payload of e2ap RICSubscriptionRequestMessage. + """ + + def __init__(self): + return + + def encode(self, requestor_id: int, request_sequence_number: int, + ran_function_id: int, event_trigger_definition: bytes, action_ids: List[int], + action_types: List[int], action_definitions: List[ActionDefinition], + sub_sequent_actions: List[SubsequentAction]) -> Tuple[int, bytes]: + """ + Function that creates and returns a payload + according to e2ap's RICSubscriptionRequestMessage. + + Raise Exception when the payload of RICSubscriptionRequestMessage cannot be created. + + Parameters + ---------- + requestor_id: int + request_sequence_number: int + ran_function_id: int + event_trigger_definition: bytes + action_ids: List[int] + action_types: List[int] + action_definitions: List[ActionDefinition] + sub_sequent_actions: List[SubsequentAction] + + Returns + Tuple[int, bytes] + int : + RICSubscriptionRequestMessage type payload length + bytes : + RICSubscriptionRequestMessage type payload + ------- + """ + action_count = len(action_ids) + action_id_array = ARRAY(c_long, action_count)() + for idx in range(action_count): + action_id_array[idx] = c_long(action_ids[idx]) + + action_type_count = len(action_types) + action_type_array = ARRAY(c_long, action_type_count)() + for idx in range(action_type_count): + action_type_array[idx] = c_long(action_types[idx]) + + event_definition_count = len(event_trigger_definition) + event_trigger_definition_array = ARRAY( + c_uint8, event_definition_count)() + for idx in range(event_definition_count): + event_trigger_definition_array[idx] = c_uint8( + event_trigger_definition[idx]) + + action_definition_count = len(action_definitions) + acttion_definition_array = ARRAY( + ric_action_definition_t, action_definition_count)() + for idx in range(action_definition_count): + action_definition_buffer = ARRAY( + c_uint8, action_definitions[idx].size)() + for buf_idx in range(action_definitions[idx].size): + action_definition_buffer[buf_idx] = c_uint8( + action_definitions[idx].action_definition[buf_idx]) + acttion_definition_array[idx].action_definition = action_definition_buffer + acttion_definition_array[idx].size = c_int( + action_definitions[idx].size) + + subsequent_action_count = len(sub_sequent_actions) + subsequent_action_array = ARRAY( + ric_subsequent_action_t, subsequent_action_count)() + for idx in range(subsequent_action_count): + subsequent_action_array[idx].is_valid = sub_sequent_actions[idx].is_valid + subsequent_action_array[idx].subsequent_action_type = sub_sequent_actions[idx].subsequent_action_type + subsequent_action_array[idx].time_to_wait = sub_sequent_actions[idx].time_to_wait + + buf = ARRAY(c_uint8, 1024)() + size: int = _asn1_encode_subReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number), + c_long(ran_function_id), event_trigger_definition_array, c_size_t( + event_definition_count), c_size_t(action_count), action_id_array, + action_type_array, acttion_definition_array, subsequent_action_array) + if size < 0: + raise Exception("Could not create payload.") + + return size, bytes(buf) + + +_asn1_encode_controlReqMsg = _wrap_asn1_function('e2ap_encode_ric_control_request_message', c_ssize_t, [ + c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_void_p, c_size_t, c_void_p, c_size_t, c_long]) + + +class ControlRequestMsg: + """ + A class that provides a function to make payload of e2ap RICControlRequestMessage. + """ + + def __init__(self): + return + + def encode(self, requestor_id: int, request_sequence_number: int, + ran_function_id: int, call_process_id: bytes, + control_header: bytes, control_message: bytes, + control_ack_request: int) -> Tuple[int, bytes]: + """ + Function that creates and returns a payload + according to e2ap's RICControlRequestMessage. + + Raise Exception when the payload of RICControlRequestMessage cannot be created. + + Parameters + ---------- + requestor_id: int + request_sequence_number: int + ran_function_id: int + call_process_id: bytes + control_header: bytes + control_message: bytes + control_ack_request: int + + Returns + Tuple[int, bytes] + int : + RICControlRequestMessage type payload length + bytes : + RICControlRequestMessage type payload + ------- + """ + call_process_id_buffer = ARRAY(c_uint8, len(call_process_id))() + for idx in range(len(call_process_id)): + call_process_id_buffer[idx] = c_uint8(call_process_id[idx]) + + call_header_buffer = ARRAY(c_uint8, len(control_header))() + for idx in range(len(control_header)): + call_header_buffer[idx] = c_uint8(control_header[idx]) + + call_message_buffer = ARRAY(c_uint8, len(control_message))() + for idx in range(len(control_message)): + call_message_buffer[idx] = c_uint8(control_message[idx]) + + buf = ARRAY(c_uint8, 1024)() + size: int = _asn1_encode_controlReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number), c_long(ran_function_id), call_process_id_buffer, c_size_t( + len(call_process_id_buffer)), call_header_buffer, c_size_t(len(call_header_buffer)), call_message_buffer, c_size_t(len(call_message_buffer)), c_long(control_ack_request)) + if size < 0: + raise Exception("Could not create payload.") + + return size, bytes(buf)