# ******************************************************************************* # * Copyright 2020 Samsung Electronics All Rights Reserved. # * # * Licensed under the Apache License, Version 2.0 (the "License"); # * you may not use this file except in compliance with the License. # * You may obtain a copy of the License at # * # * http://www.apache.org/licenses/LICENSE-2.0 # * # * Unless required by applicable law or agreed to in writing, software # * distributed under the License is distributed on an "AS IS" BASIS, # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # * See the License for the specific language governing permissions and # * limitations under the License. # * # ******************************************************************************* from sys import getsizeof from typing import List, Tuple from ctypes import POINTER, ARRAY from ctypes import c_ulong, c_void_p, c_long, c_size_t, c_int, c_ssize_t, c_uint8 from ricxappframe.e2ap.asn1clib.asn1clib import asn1_c_lib from ricxappframe.e2ap.asn1clib.types import indication_msg_t, subResp_msg_t, ric_action_definition_t, ric_subsequent_action_t def _wrap_asn1_function(funcname, restype, argtypes): """ Simplify wrapping ctypes functions. Parameters ---------- funcname: str Name of library method restype: class Name of ctypes class; e.g., c_char_p argtypes: list List of ctypes classes; e.g., [ c_char_p, int ] Returns ------- _FuncPointer: Pointer to C library function """ func = asn1_c_lib.__getattr__(funcname) func.restype = restype func.argtypes = argtypes return func _asn1_decode_indicationMsg = _wrap_asn1_function( 'e2ap_decode_ric_indication_message', POINTER(indication_msg_t), [c_void_p, c_ulong]) _asn1_free_indicationMsg = _wrap_asn1_function( 'e2ap_free_decoded_ric_indication_message', c_void_p, [POINTER(indication_msg_t)]) class IndicationMsg: """ A class of E2AP's RICIndicationMessage """ __slots__ = ('__request_id', '__request_sequence_number', '__function_id', '__action_id', '__indication_sequence_number', '__indication_type', '__indication_header', '__indication_message', '__call_process_id') __request_id: int __request_sequence_number: int __function_id: int __action_id: int __indication_sequence_number: int __indication_type: int __indication_header: bytes __indication_message: bytes __call_process_id: bytes def __init__(self): return @property def request_id(self): return self.__request_id @property def request_sequence_number(self): return self.__request_sequence_number @property def function_id(self): return self.__function_id @property def action_id(self): return self.__action_id @property def indication_sequence_number(self): return self.__indication_sequence_number @property def indication_type(self): return self.__indication_type @property def indication_header(self): return self.__indication_header @property def indication_message(self): return self.__indication_message @property def call_process_id(self): return self.__call_process_id def decode(self, payload: c_void_p): """ Function that sets fields of IndicationMsg class through msg payload (bytes) of RICIndication type. Raise Exception when payload is not RICIndication. Parameters ---------- payload: c_void_p RICIndication type payload received via rmr Returns ------- """ indication: indication_msg_t = _asn1_decode_indicationMsg( payload, getsizeof(payload)) if indication is None: raise Exception("Payload is not matched with RICIndication") indication.contents.request_id = 1 self.__request_id = indication.contents.request_id self.__request_sequence_number = indication.contents.request_sequence_number self.__function_id = indication.contents.function_id self.__action_id = indication.contents.action_id self.__indication_sequence_number = indication.contents.indication_sequence_number self.__indication_type = indication.contents.indication_type self.__indication_header = bytes( indication.contents.indication_header[:indication.contents.indication_header_length]) self.__indication_message = bytes( indication.contents.indication_message[:indication.contents.indication_message_length]) self.__call_process_id = bytes( indication.contents.call_process_id[:indication.contents.call_process_id_length]) # _asn1_free_indicationMsg(indication) return class CauseItem: __slots__ = ('__cause_type', '__cause_id') __cause_type: int __cause_id: int def __init__(self, causeType: int, causeID: int): self.__cause_type = causeType self.__cause_id = causeID return @property def cause_type(self): return self.__cause_type @property def cause_id(self): return self.__cause_id class ActionAdmittedList: __slots__ = ('__request_id', '__count') __request_id: List[int] __count: int def __init__(self, request_id: List[int], count: int): self.__request_id = request_id self.__count = count return @property def request_id(self): return self.__request_id @property def count(self): return self.__count class ActionNotAdmittedList: __slots__ = ('__request_id', '__cause', '__count') __request_id: List[int] __cause: List[CauseItem] __count: int def __init__(self, request_id: List[int], cause: List[CauseItem], count: int): self.__request_id = request_id self.__cause = cause self.__count = count return @property def request_id(self): return self.__request_id @property def cause(self): return self.__cause @property def count(self): return self.__count _asn1_decode_subRespMsg = _wrap_asn1_function( 'e2ap_decode_ric_subscription_response_message', POINTER(subResp_msg_t), [c_void_p, c_ulong]) class SubResponseMsg: """ A class of E2AP's RICsubscriptionResponseMessage """ __slots__ = ('__request_id', '__request_sequence_number', '__function_id', '__action_admitted_list', '__action_not_admitted_list') __request_id: int __request_sequence_number: int __function_id: int __action_admitted_list: ActionAdmittedList __action_not_admitted_list: ActionNotAdmittedList @property def request_id(self): return self.__request_id @property def request_sequence_number(self): return self.__request_sequence_number @property def function_id(self): return self.__function_id @property def action_admitted_list(self): return self.__action_admitted_list @property def action_not_admitted_list(self): return self.__action_not_admitted_list def __init__(self): return def decode(self, payload: c_void_p): """ Function that sets fields of SubRespMsg class through msg payload (bytes) of RICsubscriptionResponseMessage type. Raise Exception when payload is not RICsubscriptionResponseMessage. Parameters ---------- payload: c_void_p RICsubscriptionResponseMessage type payload received via rmr Returns ------- """ subResp: subResp_msg_t = _asn1_decode_subRespMsg( payload, getsizeof(payload)) if subResp is None: raise Exception( "Payload is not matched with RICsubscriptionResponseMessage") self.__request_id = subResp.contents.request_id self.__request_sequence_number = subResp.contents.request_sequence_number self.__function_id = subResp.contents.function_id self.__action_admitted_list = ActionAdmittedList(subResp.contents.action_admitted_list.request_id, subResp.contents.action_admitted_list.count) causeList = [CauseItem(item.cause_type, item.cause_id) for item in subResp.contents.action_not_admitted_list.cause] self.__action_not_admitted_list = ActionNotAdmittedList(subResp.contents.action_not_admitted_list.request_id, causeList, subResp.contents.action_not_admitted_list.count) return class ActionDefinition: """ A class that mirrored E2AP's RICactionDefinition with python """ __slots__ = ('action_definition', 'size') action_definition: bytes size: int def __init__(self): self.action_definition = [] self.size = 0 return class SubsequentAction: """ A class that mirrored E2AP's RICSubsequentAction with python """ __slots__ = ('is_valid', 'subsequent_action_type', 'time_to_wait') is_valid: int subsequent_action_type: int time_to_wait: int def __init__(self): self.is_valid = 0 self.subsequent_action_type = 0 self.time_to_wait = 0 return _asn1_encode_subReqMsg = _wrap_asn1_function('e2ap_encode_ric_subscription_request_message', c_ssize_t, [ c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_size_t, POINTER(c_long), POINTER(c_long), POINTER(ric_action_definition_t), POINTER(ric_subsequent_action_t)]) class SubRequestMsg: """ A class that provides a function to make payload of e2ap RICSubscriptionRequestMessage. """ def __init__(self): return def encode(self, requestor_id: int, request_sequence_number: int, ran_function_id: int, event_trigger_definition: bytes, action_ids: List[int], action_types: List[int], action_definitions: List[ActionDefinition], sub_sequent_actions: List[SubsequentAction]) -> Tuple[int, bytes]: """ Function that creates and returns a payload according to e2ap's RICSubscriptionRequestMessage. Raise Exception when the payload of RICSubscriptionRequestMessage cannot be created. Parameters ---------- requestor_id: int request_sequence_number: int ran_function_id: int event_trigger_definition: bytes action_ids: List[int] action_types: List[int] action_definitions: List[ActionDefinition] sub_sequent_actions: List[SubsequentAction] Returns Tuple[int, bytes] int : RICSubscriptionRequestMessage type payload length bytes : RICSubscriptionRequestMessage type payload ------- """ action_count = len(action_ids) action_id_array = ARRAY(c_long, action_count)() for idx in range(action_count): action_id_array[idx] = c_long(action_ids[idx]) action_type_count = len(action_types) action_type_array = ARRAY(c_long, action_type_count)() for idx in range(action_type_count): action_type_array[idx] = c_long(action_types[idx]) event_definition_count = len(event_trigger_definition) event_trigger_definition_array = ARRAY( c_uint8, event_definition_count)() for idx in range(event_definition_count): event_trigger_definition_array[idx] = c_uint8( event_trigger_definition[idx]) action_definition_count = len(action_definitions) acttion_definition_array = ARRAY( ric_action_definition_t, action_definition_count)() for idx in range(action_definition_count): action_definition_buffer = ARRAY( c_uint8, action_definitions[idx].size)() for buf_idx in range(action_definitions[idx].size): action_definition_buffer[buf_idx] = c_uint8( action_definitions[idx].action_definition[buf_idx]) acttion_definition_array[idx].action_definition = action_definition_buffer acttion_definition_array[idx].size = c_int( action_definitions[idx].size) subsequent_action_count = len(sub_sequent_actions) subsequent_action_array = ARRAY( ric_subsequent_action_t, subsequent_action_count)() for idx in range(subsequent_action_count): subsequent_action_array[idx].is_valid = sub_sequent_actions[idx].is_valid subsequent_action_array[idx].subsequent_action_type = sub_sequent_actions[idx].subsequent_action_type subsequent_action_array[idx].time_to_wait = sub_sequent_actions[idx].time_to_wait buf = ARRAY(c_uint8, 1024)() size: int = _asn1_encode_subReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number), c_long(ran_function_id), event_trigger_definition_array, c_size_t( event_definition_count), c_size_t(action_count), action_id_array, action_type_array, acttion_definition_array, subsequent_action_array) if size < 0: raise Exception("Could not create payload.") return size, bytes(buf) _asn1_encode_controlReqMsg = _wrap_asn1_function('e2ap_encode_ric_control_request_message', c_ssize_t, [ c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_void_p, c_size_t, c_void_p, c_size_t, c_long]) class ControlRequestMsg: """ A class that provides a function to make payload of e2ap RICControlRequestMessage. """ def __init__(self): return def encode(self, requestor_id: int, request_sequence_number: int, ran_function_id: int, call_process_id: bytes, control_header: bytes, control_message: bytes, control_ack_request: int) -> Tuple[int, bytes]: """ Function that creates and returns a payload according to e2ap's RICControlRequestMessage. Raise Exception when the payload of RICControlRequestMessage cannot be created. Parameters ---------- requestor_id: int request_sequence_number: int ran_function_id: int call_process_id: bytes control_header: bytes control_message: bytes control_ack_request: int Returns Tuple[int, bytes] int : RICControlRequestMessage type payload length bytes : RICControlRequestMessage type payload ------- """ call_process_id_buffer = ARRAY(c_uint8, len(call_process_id))() for idx in range(len(call_process_id)): call_process_id_buffer[idx] = c_uint8(call_process_id[idx]) call_header_buffer = ARRAY(c_uint8, len(control_header))() for idx in range(len(control_header)): call_header_buffer[idx] = c_uint8(control_header[idx]) call_message_buffer = ARRAY(c_uint8, len(control_message))() for idx in range(len(control_message)): call_message_buffer[idx] = c_uint8(control_message[idx]) buf = ARRAY(c_uint8, 1024)() size: int = _asn1_encode_controlReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number), c_long(ran_function_id), call_process_id_buffer, c_size_t( len(call_process_id_buffer)), call_header_buffer, c_size_t(len(call_header_buffer)), call_message_buffer, c_size_t(len(call_message_buffer)), c_long(control_ack_request)) if size < 0: raise Exception("Could not create payload.") return size, bytes(buf)