Merge "Add E2AP package that supports asn1 encoding/decoding function for E2AP."
[ric-plt/xapp-frame-py.git] / ricxappframe / e2ap / asn1.py
1 # *******************************************************************************
2 #  * Copyright 2020 Samsung Electronics All Rights Reserved.
3 #  *
4 #  * Licensed under the Apache License, Version 2.0 (the "License");
5 #  * you may not use this file except in compliance with the License.
6 #  * You may obtain a copy of the License at
7 #  *
8 #  * http://www.apache.org/licenses/LICENSE-2.0
9 #  *
10 #  * Unless required by applicable law or agreed to in writing, software
11 #  * distributed under the License is distributed on an "AS IS" BASIS,
12 #  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #  * See the License for the specific language governing permissions and
14 #  * limitations under the License.
15 #  *
16 #  *******************************************************************************
17 from sys import getsizeof
18 from typing import List, Tuple
19 from ctypes import POINTER, ARRAY
20 from ctypes import c_ulong, c_void_p, c_long, c_size_t, c_int, c_ssize_t, c_uint8
21 from ricxappframe.e2ap.asn1clib.asn1clib import asn1_c_lib
22 from ricxappframe.e2ap.asn1clib.types import indication_msg_t, subResp_msg_t, ric_action_definition_t, ric_subsequent_action_t
23
24
25 def _wrap_asn1_function(funcname, restype, argtypes):
26     """
27     Simplify wrapping ctypes functions.
28
29     Parameters
30     ----------
31     funcname: str
32         Name of library method
33     restype: class
34         Name of ctypes class; e.g., c_char_p
35     argtypes: list
36         List of ctypes classes; e.g., [ c_char_p, int ]
37
38     Returns
39     -------
40     _FuncPointer:
41         Pointer to C library function
42     """
43     func = asn1_c_lib.__getattr__(funcname)
44     func.restype = restype
45     func.argtypes = argtypes
46     return func
47
48
49 _asn1_decode_indicationMsg = _wrap_asn1_function(
50     'e2ap_decode_ric_indication_message', POINTER(indication_msg_t), [c_void_p, c_ulong])
51 _asn1_free_indicationMsg = _wrap_asn1_function(
52     'e2ap_free_decoded_ric_indication_message', c_void_p, [POINTER(indication_msg_t)])
53
54
55 class IndicationMsg:
56     """
57     A class of E2AP's RICIndicationMessage
58     """
59     __slots__ = ('__request_id', '__request_sequence_number', '__function_id', '__action_id',
60                  '__indication_sequence_number', '__indication_type', '__indication_header', '__indication_message',
61                  '__call_process_id')
62     __request_id: int
63     __request_sequence_number: int
64     __function_id: int
65     __action_id: int
66     __indication_sequence_number: int
67     __indication_type: int
68     __indication_header: bytes
69     __indication_message: bytes
70     __call_process_id: bytes
71
72     def __init__(self):
73         return
74
75     @property
76     def request_id(self):
77         return self.__request_id
78
79     @property
80     def request_sequence_number(self):
81         return self.__request_sequence_number
82
83     @property
84     def function_id(self):
85         return self.__function_id
86
87     @property
88     def action_id(self):
89         return self.__action_id
90
91     @property
92     def indication_sequence_number(self):
93         return self.__indication_sequence_number
94
95     @property
96     def indication_type(self):
97         return self.__indication_type
98
99     @property
100     def indication_header(self):
101         return self.__indication_header
102
103     @property
104     def indication_message(self):
105         return self.__indication_message
106
107     @property
108     def call_process_id(self):
109         return self.__call_process_id
110
111     def decode(self, payload: c_void_p):
112         """
113         Function that sets fields of IndicationMsg class
114         through msg payload (bytes) of RICIndication type.
115
116         Raise Exception when payload is not RICIndication.
117
118         Parameters
119         ----------
120         payload: c_void_p
121             RICIndication type payload received via rmr
122
123         Returns
124         -------
125         """
126         indication: indication_msg_t = _asn1_decode_indicationMsg(
127             payload, getsizeof(payload))
128
129         if indication is None:
130             raise Exception("Payload is not matched with RICIndication")
131         indication.contents.request_id = 1
132         self.__request_id = indication.contents.request_id
133         self.__request_sequence_number = indication.contents.request_sequence_number
134         self.__function_id = indication.contents.function_id
135         self.__action_id = indication.contents.action_id
136         self.__indication_sequence_number = indication.contents.indication_sequence_number
137         self.__indication_type = indication.contents.indication_type
138         self.__indication_header = bytes(
139             indication.contents.indication_header[:indication.contents.indication_header_length])
140         self.__indication_message = bytes(
141             indication.contents.indication_message[:indication.contents.indication_message_length])
142         self.__call_process_id = bytes(
143             indication.contents.call_process_id[:indication.contents.call_process_id_length])
144
145         # _asn1_free_indicationMsg(indication)
146         return
147
148
149 class CauseItem:
150     __slots__ = ('__cause_type', '__cause_id')
151     __cause_type: int
152     __cause_id: int
153
154     def __init__(self, causeType: int, causeID: int):
155         self.__cause_type = causeType
156         self.__cause_id = causeID
157         return
158
159     @property
160     def cause_type(self):
161         return self.__cause_type
162
163     @property
164     def cause_id(self):
165         return self.__cause_id
166
167
168 class ActionAdmittedList:
169     __slots__ = ('__request_id', '__count')
170     __request_id: List[int]
171     __count: int
172
173     def __init__(self, request_id: List[int], count: int):
174         self.__request_id = request_id
175         self.__count = count
176         return
177
178     @property
179     def request_id(self):
180         return self.__request_id
181
182     @property
183     def count(self):
184         return self.__count
185
186
187 class ActionNotAdmittedList:
188     __slots__ = ('__request_id', '__cause', '__count')
189     __request_id: List[int]
190     __cause: List[CauseItem]
191     __count: int
192
193     def __init__(self, request_id: List[int], cause: List[CauseItem], count: int):
194         self.__request_id = request_id
195         self.__cause = cause
196         self.__count = count
197         return
198
199     @property
200     def request_id(self):
201         return self.__request_id
202
203     @property
204     def cause(self):
205         return self.__cause
206
207     @property
208     def count(self):
209         return self.__count
210
211
212 _asn1_decode_subRespMsg = _wrap_asn1_function(
213     'e2ap_decode_ric_subscription_response_message', POINTER(subResp_msg_t), [c_void_p, c_ulong])
214
215
216 class SubResponseMsg:
217     """
218     A class of E2AP's RICsubscriptionResponseMessage
219     """
220     __slots__ = ('__request_id', '__request_sequence_number', '__function_id',
221                  '__action_admitted_list', '__action_not_admitted_list')
222     __request_id: int
223     __request_sequence_number: int
224     __function_id: int
225     __action_admitted_list: ActionAdmittedList
226     __action_not_admitted_list: ActionNotAdmittedList
227
228     @property
229     def request_id(self):
230         return self.__request_id
231
232     @property
233     def request_sequence_number(self):
234         return self.__request_sequence_number
235
236     @property
237     def function_id(self):
238         return self.__function_id
239
240     @property
241     def action_admitted_list(self):
242         return self.__action_admitted_list
243
244     @property
245     def action_not_admitted_list(self):
246         return self.__action_not_admitted_list
247
248     def __init__(self):
249         return
250
251     def decode(self, payload: c_void_p):
252         """
253         Function that sets fields of SubRespMsg class
254         through msg payload (bytes) of RICsubscriptionResponseMessage type.
255
256         Raise Exception when payload is not RICsubscriptionResponseMessage.
257
258         Parameters
259         ----------
260         payload: c_void_p
261             RICsubscriptionResponseMessage type payload received via rmr
262
263         Returns
264         -------
265         """
266         subResp: subResp_msg_t = _asn1_decode_subRespMsg(
267             payload, getsizeof(payload))
268
269         if subResp is None:
270             raise Exception(
271                 "Payload is not matched with RICsubscriptionResponseMessage")
272
273         self.__request_id = subResp.contents.request_id
274         self.__request_sequence_number = subResp.contents.request_sequence_number
275         self.__function_id = subResp.contents.function_id
276         self.__action_admitted_list = ActionAdmittedList(subResp.contents.action_admitted_list.request_id,
277                                                          subResp.contents.action_admitted_list.count)
278         causeList = [CauseItem(item.cause_type, item.cause_id)
279                      for item in subResp.contents.action_not_admitted_list.cause]
280         self.__action_not_admitted_list = ActionNotAdmittedList(subResp.contents.action_not_admitted_list.request_id, causeList,
281                                                                 subResp.contents.action_not_admitted_list.count)
282         return
283
284
285 class ActionDefinition:
286     """
287     A class that mirrored E2AP's RICactionDefinition with python
288     """
289     __slots__ = ('action_definition', 'size')
290
291     action_definition: bytes
292     size: int
293
294     def __init__(self):
295         self.action_definition = []
296         self.size = 0
297         return
298
299
300 class SubsequentAction:
301     """
302     A class that mirrored E2AP's RICSubsequentAction with python
303     """
304     __slots__ = ('is_valid', 'subsequent_action_type', 'time_to_wait')
305
306     is_valid: int
307     subsequent_action_type: int
308     time_to_wait: int
309
310     def __init__(self):
311         self.is_valid = 0
312         self.subsequent_action_type = 0
313         self.time_to_wait = 0
314         return
315
316
317 _asn1_encode_subReqMsg = _wrap_asn1_function('e2ap_encode_ric_subscription_request_message', c_ssize_t, [
318                                              c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_size_t, POINTER(c_long), POINTER(c_long), POINTER(ric_action_definition_t), POINTER(ric_subsequent_action_t)])
319
320
321 class SubRequestMsg:
322     """
323     A class that provides a function to make payload of e2ap RICSubscriptionRequestMessage.
324     """
325
326     def __init__(self):
327         return
328
329     def encode(self, requestor_id: int, request_sequence_number: int,
330                ran_function_id: int, event_trigger_definition: bytes, action_ids: List[int],
331                action_types: List[int], action_definitions: List[ActionDefinition],
332                sub_sequent_actions: List[SubsequentAction]) -> Tuple[int, bytes]:
333         """
334         Function that creates and returns a payload
335         according to e2ap's RICSubscriptionRequestMessage.
336
337         Raise Exception when the payload of RICSubscriptionRequestMessage cannot be created.
338
339         Parameters
340         ----------
341         requestor_id: int
342         request_sequence_number: int
343         ran_function_id: int
344         event_trigger_definition: bytes
345         action_ids: List[int]
346         action_types: List[int]
347         action_definitions: List[ActionDefinition]
348         sub_sequent_actions: List[SubsequentAction]
349
350         Returns
351         Tuple[int, bytes]
352           int :
353             RICSubscriptionRequestMessage type payload length
354           bytes :
355             RICSubscriptionRequestMessage type payload
356         -------
357         """
358         action_count = len(action_ids)
359         action_id_array = ARRAY(c_long, action_count)()
360         for idx in range(action_count):
361             action_id_array[idx] = c_long(action_ids[idx])
362
363         action_type_count = len(action_types)
364         action_type_array = ARRAY(c_long, action_type_count)()
365         for idx in range(action_type_count):
366             action_type_array[idx] = c_long(action_types[idx])
367
368         event_definition_count = len(event_trigger_definition)
369         event_trigger_definition_array = ARRAY(
370             c_uint8, event_definition_count)()
371         for idx in range(event_definition_count):
372             event_trigger_definition_array[idx] = c_uint8(
373                 event_trigger_definition[idx])
374
375         action_definition_count = len(action_definitions)
376         acttion_definition_array = ARRAY(
377             ric_action_definition_t, action_definition_count)()
378         for idx in range(action_definition_count):
379             action_definition_buffer = ARRAY(
380                 c_uint8, action_definitions[idx].size)()
381             for buf_idx in range(action_definitions[idx].size):
382                 action_definition_buffer[buf_idx] = c_uint8(
383                     action_definitions[idx].action_definition[buf_idx])
384             acttion_definition_array[idx].action_definition = action_definition_buffer
385             acttion_definition_array[idx].size = c_int(
386                 action_definitions[idx].size)
387
388         subsequent_action_count = len(sub_sequent_actions)
389         subsequent_action_array = ARRAY(
390             ric_subsequent_action_t, subsequent_action_count)()
391         for idx in range(subsequent_action_count):
392             subsequent_action_array[idx].is_valid = sub_sequent_actions[idx].is_valid
393             subsequent_action_array[idx].subsequent_action_type = sub_sequent_actions[idx].subsequent_action_type
394             subsequent_action_array[idx].time_to_wait = sub_sequent_actions[idx].time_to_wait
395
396         buf = ARRAY(c_uint8, 1024)()
397         size: int = _asn1_encode_subReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number),
398                                            c_long(ran_function_id), event_trigger_definition_array, c_size_t(
399                                                event_definition_count), c_size_t(action_count), action_id_array,
400                                            action_type_array, acttion_definition_array, subsequent_action_array)
401         if size < 0:
402             raise Exception("Could not create payload.")
403
404         return size, bytes(buf)
405
406
407 _asn1_encode_controlReqMsg = _wrap_asn1_function('e2ap_encode_ric_control_request_message', c_ssize_t, [
408                                                  c_void_p, c_size_t, c_long, c_long, c_long, c_void_p, c_size_t, c_void_p, c_size_t, c_void_p, c_size_t, c_long])
409
410
411 class ControlRequestMsg:
412     """
413     A class that provides a function to make payload of e2ap RICControlRequestMessage.
414     """
415
416     def __init__(self):
417         return
418
419     def encode(self, requestor_id: int, request_sequence_number: int,
420                ran_function_id: int, call_process_id: bytes,
421                control_header: bytes, control_message: bytes,
422                control_ack_request: int) -> Tuple[int, bytes]:
423         """
424         Function that creates and returns a payload
425         according to e2ap's RICControlRequestMessage.
426
427         Raise Exception when the payload of RICControlRequestMessage cannot be created.
428
429         Parameters
430         ----------
431         requestor_id: int
432         request_sequence_number: int
433         ran_function_id: int
434         call_process_id: bytes
435         control_header: bytes
436         control_message: bytes
437         control_ack_request: int
438
439         Returns
440         Tuple[int, bytes]
441           int :
442             RICControlRequestMessage type payload length
443           bytes :
444             RICControlRequestMessage type payload
445         -------
446         """
447         call_process_id_buffer = ARRAY(c_uint8, len(call_process_id))()
448         for idx in range(len(call_process_id)):
449             call_process_id_buffer[idx] = c_uint8(call_process_id[idx])
450
451         call_header_buffer = ARRAY(c_uint8, len(control_header))()
452         for idx in range(len(control_header)):
453             call_header_buffer[idx] = c_uint8(control_header[idx])
454
455         call_message_buffer = ARRAY(c_uint8, len(control_message))()
456         for idx in range(len(control_message)):
457             call_message_buffer[idx] = c_uint8(control_message[idx])
458
459         buf = ARRAY(c_uint8, 1024)()
460         size: int = _asn1_encode_controlReqMsg(buf, c_size_t(1024), c_long(requestor_id), c_long(request_sequence_number), c_long(ran_function_id), call_process_id_buffer, c_size_t(
461             len(call_process_id_buffer)), call_header_buffer, c_size_t(len(call_header_buffer)), call_message_buffer, c_size_t(len(call_message_buffer)), c_long(control_ack_request))
462         if size < 0:
463             raise Exception("Could not create payload.")
464
465         return size, bytes(buf)