1 # -------------------------------------------------------------------------------
2 # Copyright (c) 2018-2019 AT&T Intellectual Property.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # -------------------------------------------------------------------------------
19 from binascii import unhexlify
22 # from https://stackoverflow.com/questions/8730927/convert-python-long-int-to-fixed-size-byte-array
23 def long_to_bytes(val, endianness='big'):
26 fmt = '%%0%dx' % (width // 4)
27 s = unhexlify(fmt % val)
28 if endianness == 'little':
34 # ------------------------------------
35 # START generated code
36 import rrctransfer_pb2
37 import x2ap_streaming_pb2
40 #"id_MeNB_UE_X2AP_ID": 1,
41 #"id_SgNB_UE_X2AP_ID": 2,
42 #"MeasResultServMO_data": [
43 # {"servCellID": 3, "physCellId": 4, "rsrq": 5, "rsrp": 6, "sinr": 7},
47 def create_record(dataset, ofl, ts):
48 X2APStreaming = x2ap_streaming_pb2.X2APStreaming() # header message
49 X2APStreaming.header.gNbID.value = dataset["sgnb_id"]
52 utc_time = datetime.datetime.utcfromtimestamp(sec)
53 diff = utc_time - datetime.datetime(1900, 1, 1, 0, 0, 0)
54 ntp_hi = diff.days*24*60*60+diff.seconds
55 ntp_lo = (ms << 32) / 1000
56 X2APStreaming.header.timestamp = ntp_hi << 32 | ntp_lo
59 # print "sgnb_id="+X2APStreaming.header.gNbID.value+", ts="+str(ts)
61 RRCTransfer = X2APStreaming.rrcTransfer # message RRCTransfer
63 RRCTransferIEs = RRCTransfer.rrcTransfer_IEs # field rrcTransfer_IEs
64 RRCTransferIEs.id_MeNB_UE_X2AP_ID = dataset["id_MeNB_UE_X2AP_ID"] # field id_MeNB_UE_X2AP_ID
65 RRCTransferIEs.id_SgNB_UE_X2AP_ID = dataset["id_SgNB_UE_X2AP_ID"]
66 NRUeReport = RRCTransferIEs.id_UENRMeasurement
67 RRCContainer = NRUeReport.uENRMeasurements
68 ULDCCHMessageType = RRCContainer.UL_DCCH_message
69 MeasurementReport = ULDCCHMessageType.measurementReport
70 MeasurementReportIEs = MeasurementReport.measurementReport
71 MeasResults = MeasurementReportIEs.measResults
72 MeasResultServMOList = MeasResults.measResultServingMOList
73 for d in dataset["MeasResultServMO_data"]:
74 MeasResultServMO = MeasResultServMOList.items.add()
75 MeasResultServMO.servCellID = d["servCellID"]
76 MeasResultNR = MeasResultServMO.measResultServingCell
77 MeasResultNR.physCellId.value = d["physCellId"]
78 MeasResult = MeasResultNR.measResult
79 CellResults = MeasResult.cellResults
80 MeasQuantityResults = CellResults.resultsCSI_RS_Cell
81 MeasQuantityResults.rsrq.value = d["rsrq"]
82 MeasQuantityResults.rsrp.value = d["rsrp"]
83 MeasQuantityResults.sinr.value = d["sinr"]
85 v = X2APStreaming.SerializeToString()
88 ofl.write("{0:7}".format(vlen))
97 # ------------------------------------
103 def triangle_measurements(d):
104 for dd in d["MeasResultServMO_data"]:
105 dd["rsrp"] = int(random.triangular(0,100,50))
106 dd["rsrq"] = int(random.triangular(0,100,50))
107 dd["sinr"] = int(random.triangular(0,100,50))
111 fifo_dir = "/tmp/mcl/fifos"
112 dst_flnm = "MT_000010350"
117 "id_MeNB_UE_X2AP_ID": 1,
118 "id_SgNB_UE_X2AP_ID": 11,
119 "MeasResultServMO_data": [
120 {"servCellID": 0, "physCellId": 10, "rsrq": 20, "rsrp": 40, "sinr": 60},
121 {"servCellID": 1, "physCellId": 20, "rsrq": 60, "rsrp": 40, "sinr": 20}
126 "id_MeNB_UE_X2AP_ID": 2,
127 "id_SgNB_UE_X2AP_ID": 12,
128 "MeasResultServMO_data": [
129 {"servCellID": 0, "physCellId": 10, "rsrq": 25, "rsrp": 45, "sinr": 65},
130 {"servCellID": 1, "physCellId": 20, "rsrq": 65, "rsrp": 45, "sinr": 25}
135 "id_MeNB_UE_X2AP_ID": 3,
136 "id_SgNB_UE_X2AP_ID": 13,
137 "MeasResultServMO_data": [
138 {"servCellID": 0, "physCellId": 10, "rsrq": 25, "rsrp": 45, "sinr": 65},
139 {"servCellID": 1, "physCellId": 20, "rsrq": 65, "rsrp": 45, "sinr": 25}
144 "id_MeNB_UE_X2AP_ID": 4,
145 "id_SgNB_UE_X2AP_ID": 14,
146 "MeasResultServMO_data": [
147 {"servCellID": 0, "physCellId": 15, "rsrq": 25, "rsrp": 45, "sinr": 65},
148 {"servCellID": 1, "physCellId": 25, "rsrq": 65, "rsrp": 45, "sinr": 25}
153 "id_MeNB_UE_X2AP_ID": 5,
154 "id_SgNB_UE_X2AP_ID": 15,
155 "MeasResultServMO_data": [
156 {"servCellID": 0, "physCellId": 15, "rsrq": 25, "rsrp": 45, "sinr": 65},
157 {"servCellID": 1, "physCellId": 25, "rsrq": 65, "rsrp": 45, "sinr": 25}
162 "id_MeNB_UE_X2AP_ID": 6,
163 "id_SgNB_UE_X2AP_ID": 16,
164 "MeasResultServMO_data": [
165 {"servCellID": 0, "physCellId": 15, "rsrq": 25, "rsrp": 45, "sinr": 65},
166 {"servCellID": 1, "physCellId": 25, "rsrq": 65, "rsrp": 45, "sinr": 25}
170 if not os.path.exists(fifo_dir +"/" + dst_flnm):
171 os.mkfifo(fifo_dir +"/" + dst_flnm)
172 ofl = open(fifo_dir +"/" + dst_flnm,"w", 0)
175 curr_time = int(time.time())
176 triangle_measurements(dataset1)
177 create_record(dataset1, ofl, 1000*curr_time + 100)
178 triangle_measurements(dataset2)
179 create_record(dataset2, ofl, 1000*curr_time + 200)
180 triangle_measurements(dataset3)
181 create_record(dataset3, ofl, 1000*curr_time + 300)
182 triangle_measurements(dataset4)
183 create_record(dataset4, ofl, 1000*curr_time + 400)
184 triangle_measurements(dataset5)
185 create_record(dataset5, ofl, 1000*curr_time + 500)
186 triangle_measurements(dataset6)
187 create_record(dataset6, ofl, 1000*curr_time + 600)
188 time.sleep(sleep_interval)