1 # *******************************************************************************
2 # * Copyright 2020 Samsung Electronics All Rights Reserved.
4 # * Licensed under the Apache License, Version 2.0 (the "License");
5 # * you may not use this file except in compliance with the License.
6 # * You may obtain a copy of the License at
8 # * http://www.apache.org/licenses/LICENSE-2.0
10 # * Unless required by applicable law or agreed to in writing, software
11 # * distributed under the License is distributed on an "AS IS" BASIS,
12 # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # * See the License for the specific language governing permissions and
14 # * limitations under the License.
16 # *******************************************************************************
17 from sys import getsizeof
18 from typing import List, Tuple
19 from ctypes import POINTER, ARRAY
20 from ctypes import c_ulong, c_void_p, c_long, c_size_t, c_int, c_ssize_t, c_uint8
21 from ricxappframe.e2ap.asn1clib.asn1clib import asn1_c_lib
22 from ricxappframe.e2ap.asn1clib.types import indication_msg_t, subResp_msg_t, ric_action_definition_t, ric_subsequent_action_t
25 def _wrap_asn1_function(funcname, restype, argtypes):
27 Simplify wrapping ctypes functions.
32 Name of library method
34 Name of ctypes class; e.g., c_char_p
36 List of ctypes classes; e.g., [ c_char_p, int ]
41 Pointer to C library function
43 func = asn1_c_lib.__getattr__(funcname)
44 func.restype = restype
45 func.argtypes = argtypes
49 _asn1_decode_indicationMsg = _wrap_asn1_function(
50 'e2ap_decode_ric_indication_message', POINTER(indication_msg_t), [c_void_p, c_ulong])
51 _asn1_free_indicationMsg = _wrap_asn1_function(
52 'e2ap_free_decoded_ric_indication_message', c_void_p, [POINTER(indication_msg_t)])
57 A class of E2AP's RICIndicationMessage
59 __slots__ = ('__request_id', '__request_sequence_number', '__function_id', '__action_id',
60 '__indication_sequence_number', '__indication_type', '__indication_header', '__indication_message',
63 __request_sequence_number: int
66 __indication_sequence_number: int
67 __indication_type: int
68 __indication_header: bytes
69 __indication_message: bytes
70 __call_process_id: bytes
77 return self.__request_id
80 def request_sequence_number(self):
81 return self.__request_sequence_number
84 def function_id(self):
85 return self.__function_id
89 return self.__action_id
92 def indication_sequence_number(self):
93 return self.__indication_sequence_number
96 def indication_type(self):
97 return self.__indication_type
100 def indication_header(self):
101 return self.__indication_header
104 def indication_message(self):
105 return self.__indication_message
108 def call_process_id(self):
109 return self.__call_process_id
111 def decode(self, payload: c_void_p):
113 Function that sets fields of IndicationMsg class
114 through msg payload (bytes) of RICIndication type.
116 Raise Exception when payload is not RICIndication.
121 RICIndication type payload received via rmr
126 indication: indication_msg_t = _asn1_decode_indicationMsg(
127 payload, getsizeof(payload))
129 if indication is None:
130 raise Exception("Payload is not matched with RICIndication")
131 indication.contents.request_id = 1
132 self.__request_id = indication.contents.request_id
133 self.__request_sequence_number = indication.contents.request_sequence_number
134 self.__function_id = indication.contents.function_id
135 self.__action_id = indication.contents.action_id
136 self.__indication_sequence_number = indication.contents.indication_sequence_number
137 self.__indication_type = indication.contents.indication_type
138 self.__indication_header = bytes(
139 indication.contents.indication_header[:indication.contents.indication_header_length])
140 self.__indication_message = bytes(
141 indication.contents.indication_message[:indication.contents.indication_message_length])
142 self.__call_process_id = bytes(
143 indication.contents.call_process_id[:indication.contents.call_process_id_length])
145 # _asn1_free_indicationMsg(indication)
150 __slots__ = ('__cause_type', '__cause_id')
154 def __init__(self, causeType: int, causeID: int):
155 self.__cause_type = causeType
156 self.__cause_id = causeID
160 def cause_type(self):
161 return self.__cause_type
165 return self.__cause_id
168 class ActionAdmittedList:
169 __slots__ = ('__request_id', '__count')
170 __request_id: List[int]
173 def __init__(self, request_id: List[int], count: int):
174 self.__request_id = request_id
179 def request_id(self):
180 return self.__request_id
187 class ActionNotAdmittedList:
188 __slots__ = ('__request_id', '__cause', '__count')
189 __request_id: List[int]
190 __cause: List[CauseItem]
193 def __init__(self, request_id: List[int], cause: List[CauseItem], count: int):
194 self.__request_id = request_id
200 def request_id(self):
201 return self.__request_id
212 _asn1_decode_subRespMsg = _wrap_asn1_function(
213 'e2ap_decode_ric_subscription_response_message', POINTER(subResp_msg_t), [c_void_p, c_ulong])
216 class SubResponseMsg:
218 A class of E2AP's RICsubscriptionResponseMessage
220 __slots__ = ('__request_id', '__request_sequence_number', '__function_id',
221 '__action_admitted_list', '__action_not_admitted_list')
223 __request_sequence_number: int
225 __action_admitted_list: ActionAdmittedList
226 __action_not_admitted_list: ActionNotAdmittedList
229 def request_id(self):
230 return self.__request_id
233 def request_sequence_number(self):
234 return self.__request_sequence_number
237 def function_id(self):
238 return self.__function_id
241 def action_admitted_list(self):
242 return self.__action_admitted_list
245 def action_not_admitted_list(self):
246 return self.__action_not_admitted_list
251 def decode(self, payload: c_void_p):
253 Function that sets fields of SubRespMsg class
254 through msg payload (bytes) of RICsubscriptionResponseMessage type.
256 Raise Exception when payload is not RICsubscriptionResponseMessage.
261 RICsubscriptionResponseMessage type payload received via rmr
266 subResp: subResp_msg_t = _asn1_decode_subRespMsg(
267 payload, getsizeof(payload))
271 "Payload is not matched with RICsubscriptionResponseMessage")
273 self.__request_id = subResp.contents.request_id
274 self.__request_sequence_number = subResp.contents.request_sequence_number
275 self.__function_id = subResp.contents.function_id
276 self.__action_admitted_list = ActionAdmittedList(subResp.contents.action_admitted_list.request_id,
277 subResp.contents.action_admitted_list.count)
278 causeList = [CauseItem(item.cause_type, item.cause_id)
279 for item in subResp.contents.action_not_admitted_list.cause]
280 self.__action_not_admitted_list = ActionNotAdmittedList(subResp.contents.action_not_admitted_list.request_id, causeList,
281 subResp.contents.action_not_admitted_list.count)
285 class ActionDefinition:
287 A class that mirrored E2AP's RICactionDefinition with python
289 __slots__ = ('action_definition', 'size')
291 action_definition: bytes
295 self.action_definition = []
300 class SubsequentAction:
302 A class that mirrored E2AP's RICSubsequentAction with python
304 __slots__ = ('is_valid', 'subsequent_action_type', 'time_to_wait')
307 subsequent_action_type: int
312 self.subsequent_action_type = 0
313 self.time_to_wait = 0
317 _asn1_encode_subReqMsg = _wrap_asn1_function('e2ap_encode_ric_subscription_request_message', c_ssize_t, [
318 c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_size_t, POINTER(c_long), POINTER(c_long), POINTER(ric_action_definition_t), POINTER(ric_subsequent_action_t)])
323 A class that provides a function to make payload of e2ap RICSubscriptionRequestMessage.
329 def encode(self, requestor_id: int, request_sequence_number: int,
330 ran_function_id: int, event_trigger_definition: bytes, action_ids: List[int],
331 action_types: List[int], action_definitions: List[ActionDefinition],
332 sub_sequent_actions: List[SubsequentAction]) -> Tuple[int, bytes]:
334 Function that creates and returns a payload
335 according to e2ap's RICSubscriptionRequestMessage.
337 Raise Exception when the payload of RICSubscriptionRequestMessage cannot be created.
342 request_sequence_number: int
344 event_trigger_definition: bytes
345 action_ids: List[int]
346 action_types: List[int]
347 action_definitions: List[ActionDefinition]
348 sub_sequent_actions: List[SubsequentAction]
353 RICSubscriptionRequestMessage type payload length
355 RICSubscriptionRequestMessage type payload
358 action_count = len(action_ids)
359 action_id_array = ARRAY(c_long, action_count)()
360 for idx in range(action_count):
361 action_id_array[idx] = c_long(action_ids[idx])
363 action_type_count = len(action_types)
364 action_type_array = ARRAY(c_long, action_type_count)()
365 for idx in range(action_type_count):
366 action_type_array[idx] = c_long(action_types[idx])
368 event_definition_count = len(event_trigger_definition)
369 event_trigger_definition_array = ARRAY(
370 c_uint8, event_definition_count)()
371 for idx in range(event_definition_count):
372 event_trigger_definition_array[idx] = c_uint8(
373 event_trigger_definition[idx])
375 action_definition_count = len(action_definitions)
376 acttion_definition_array = ARRAY(
377 ric_action_definition_t, action_definition_count)()
378 for idx in range(action_definition_count):
379 action_definition_buffer = ARRAY(
380 c_uint8, action_definitions[idx].size)()
381 for buf_idx in range(action_definitions[idx].size):
382 action_definition_buffer[buf_idx] = c_uint8(
383 action_definitions[idx].action_definition[buf_idx])
384 acttion_definition_array[idx].action_definition = action_definition_buffer
385 acttion_definition_array[idx].size = c_int(
386 action_definitions[idx].size)
388 subsequent_action_count = len(sub_sequent_actions)
389 subsequent_action_array = ARRAY(
390 ric_subsequent_action_t, subsequent_action_count)()
391 for idx in range(subsequent_action_count):
392 subsequent_action_array[idx].is_valid = sub_sequent_actions[idx].is_valid
393 subsequent_action_array[idx].subsequent_action_type = sub_sequent_actions[idx].subsequent_action_type
394 subsequent_action_array[idx].time_to_wait = sub_sequent_actions[idx].time_to_wait
396 buf = ARRAY(c_uint8, 1024)()
397 size: int = _asn1_encode_subReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number),
398 c_long(ran_function_id), event_trigger_definition_array, c_size_t(
399 event_definition_count), c_size_t(action_count), action_id_array,
400 action_type_array, acttion_definition_array, subsequent_action_array)
402 raise Exception("Could not create payload.")
404 return size, bytes(buf)
407 _asn1_encode_controlReqMsg = _wrap_asn1_function('e2ap_encode_ric_control_request_message', c_ssize_t, [
408 c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_void_p, c_size_t, c_void_p, c_size_t, c_long])
411 class ControlRequestMsg:
413 A class that provides a function to make payload of e2ap RICControlRequestMessage.
419 def encode(self, requestor_id: int, request_sequence_number: int,
420 ran_function_id: int, call_process_id: bytes,
421 control_header: bytes, control_message: bytes,
422 control_ack_request: int) -> Tuple[int, bytes]:
424 Function that creates and returns a payload
425 according to e2ap's RICControlRequestMessage.
427 Raise Exception when the payload of RICControlRequestMessage cannot be created.
432 request_sequence_number: int
434 call_process_id: bytes
435 control_header: bytes
436 control_message: bytes
437 control_ack_request: int
442 RICControlRequestMessage type payload length
444 RICControlRequestMessage type payload
447 call_process_id_buffer = ARRAY(c_uint8, len(call_process_id))()
448 for idx in range(len(call_process_id)):
449 call_process_id_buffer[idx] = c_uint8(call_process_id[idx])
451 call_header_buffer = ARRAY(c_uint8, len(control_header))()
452 for idx in range(len(control_header)):
453 call_header_buffer[idx] = c_uint8(control_header[idx])
455 call_message_buffer = ARRAY(c_uint8, len(control_message))()
456 for idx in range(len(control_message)):
457 call_message_buffer[idx] = c_uint8(control_message[idx])
459 buf = ARRAY(c_uint8, 1024)()
460 size: int = _asn1_encode_controlReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number), c_long(ran_function_id), call_process_id_buffer, c_size_t(
461 len(call_process_id_buffer)), call_header_buffer, c_size_t(len(call_header_buffer)), call_message_buffer, c_size_t(len(call_message_buffer)), c_long(control_ack_request))
463 raise Exception("Could not create payload.")
465 return size, bytes(buf)