* This release is seen as the first complete implementation of QPD, although likely fixes and enhancements are needed
* Implement the rmr messaging
* Add tests for various bad scenarios like UE IDs not existing and Cell data not existing
* Fix UE IDs to be strings as they are in the req slides
Issue-ID: RICAPP-92
Change-Id: Id739cfd187237e713cb4b3c0b67cb89c2d853c7b
Signed-off-by: Tommy Carpenter <tc677g@att.com>
# RMR setup
RUN mkdir -p /opt/route/
-COPY --from=nexus3.o-ran-sc.org:10004/bldr-alpine3-go:3-rmr-si95-nng-3.6.1 /usr/local/lib64/librmr_si.so /usr/local/lib64/librmr_si.so
+COPY --from=nexus3.o-ran-sc.org:10004/bldr-alpine3-go:5-a3.11-nng-rmr3 /usr/local/lib64/librmr_si.so /usr/local/lib64/librmr_si.so
ENV LD_LIBRARY_PATH /usr/local/lib/:/usr/local/lib64
# sdl needs gcc
# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 0.2.0
+tag: 1.0.0
qp-driver Overview
==================
-[to come]
+QP Driver is an Xapp in the traffic steering O-RAN use case.
+There are four total Xapps:
+
+1. Traffic steering, which sends "prediction requests" to QP Driver (this)
+
+2. QP Driver (this) which fetches data from SDL[4] on behalf of traffic steering, both UE Data and Cell Data, merges that data together, then sends off the data to the QP Predictor
+
+3. QP Predictor which predicts (?) and sends that prediction back to Traffic Steering
+
+4. the fourth Xapp is called KPIMONN which populates SDL in the first place
+
+So in summary, the QP Driver xapp is a helper function that sits between Traffic Steering and QP Predictor.
:depth: 3
:local:
+[1.0.0] - 4/1/2020
+------------------
+::
+
+ * This release is seen as the first complete implementation of QPD, although likely fixes and enhancements are needed
+ * Implement the rmr messaging
+ * Add tests for various bad scenarios like UE IDs not existing and Cell data not existing
+ * Fix UE IDs to be strings as they are in the req slides
+
+
[0.2.0] - 3/27/2020
-------------------
::
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
+from qpdriver.exceptions import UENotFound
# namespaces
UE_NS = "TS-UE-metrics"
CELL_NS = "TS-cell-metrics"
+# constants
+MEASTSRF = "MeasTimestampRF"
+MEASPRF = "MeasPeriodRF"
+CELLMEAS = "CellMeasurements"
+
# list of keys in ue metrics that we want to carry over to qp pred
UE_KEY_LIST = set(
[
Note that a single request to qp driver may have many UEs in a list, however since a new message needs to be sent for each one,
the calling function iterates over that list, rather than doing it here.
"""
- ue_data = xapp_ref.sdl_get(UE_NS, str(ueid), usemsgpack=False)
+ ue_data = xapp_ref.sdl_get(UE_NS, ueid, usemsgpack=False)
+ if not ue_data:
+ raise UENotFound()
serving_cid = ue_data["ServingCellID"]
# form the qp req
qp_pred_req = {"PredictionUE": ueid} # top level key
qp_pred_req["UEMeasurements"] = {k: ue_data[k] for k in UE_KEY_LIST} # take ue keys we want
- qp_pred_req["CellMeasurements"] = []
+ qp_pred_req[CELLMEAS] = []
- # form the CellMeasurements
+ # form the Cell Measurements
for cid in cell_ids:
+
cellm = xapp_ref.sdl_get(CELL_NS, cid, usemsgpack=False)
- # if we were really under performance strain here we could delete from the orig instead of copying but this code is far simpler
- cell_data = {k: cellm[k] for k in CELL_KEY_LIST}
- # these keys get dropped into *each* cell
- cell_data["MeasTimestampRF"] = ue_data["MeasTimestampRF"]
- cell_data["MeasPeriodRF"] = ue_data["MeasPeriodRF"]
+ if cellm: # if cellm is None, then we omit that cell from this array
+
+ # if we were really under performance strain here we could delete from the orig instead of copying but this code is far simpler
+ cell_data = {k: cellm[k] for k in CELL_KEY_LIST}
+
+ # these keys get dropped into *each* cell
+ cell_data[MEASTSRF] = ue_data[MEASTSRF]
+ cell_data[MEASPRF] = ue_data[MEASPRF]
- # add the RF
- cell_data["RFMeasurements"] = ue_data["ServingCellRF"] if cid == serving_cid else n_cell_info[cid]
+ # add the RF
+ cell_data["RFMeasurements"] = ue_data["ServingCellRF"] if cid == serving_cid else n_cell_info[cid]
- # add to our array
- qp_pred_req["CellMeasurements"].append(cell_data)
+ # add to our array
+ qp_pred_req[CELLMEAS].append(cell_data)
return qp_pred_req
--- /dev/null
+"""
+custom logic exceptions
+"""
+# ==================================================================================
+# Copyright (c) 2020 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+
+
+class UENotFound(BaseException):
+ pass
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
+import json
from ricxappframe.xapp_frame import RMRXapp
+from qpdriver import data
+from qpdriver.exceptions import UENotFound
"""
RMR Messages
{“UEPredictionSet” : [“UEId1”,”UEId2”,”UEId3”]}
"""
self.traffic_steering_requests += 1
- print(summary)
+ ue_list = []
+ try:
+ req = json.loads(summary["payload"]) # input should be a json encoded as bytes
+ ue_list = req["UEPredictionSet"]
+ except (json.decoder.JSONDecodeError, KeyError):
+ self.logger.debug("Received a TS Request but it was malformed!")
+
+ # we don't use rts here; free this
self.rmr_free(sbuf)
+ # iterate over the ues and send a request each, if it is a valid UE, to QPP
+ for ueid in ue_list:
+ try:
+ to_qpp = data.form_qp_pred_req(self, ueid)
+ payload = json.dumps(to_qpp).encode()
+ ok = self.rmr_send(payload, 30001)
+ if not ok:
+ self.logger.debug("QP Driver was unable to send to QP Predictor!")
+ except UENotFound:
+ self.logger.debug("Received a TS Request for a UE that does not exist!")
+
def start(thread=False, use_fake_sdl=False):
"""
# CI script installs RMR from PackageCloud using this version
---
-version: 3.6.1
+version: 3.6.3
setup(
name="qpdriver",
- version="0.2.0",
+ version="1.0.0",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="QP Driver Xapp for traffic steering",
@pytest.fixture
def ue_metrics():
return {
- "UEID": 12345,
+ "UEID": "12345",
"ServingCellID": "310-680-200-555002",
"MeasTimestampUEPDCPBytes": "2020-03-18 02:23:18.220",
"MeasPeriodUEPDCPBytes": 20,
}
+@pytest.fixture
+def ue_metrics_with_bad_cell():
+ return {
+ "UEID": "8675309",
+ "ServingCellID": "310-680-200-555002",
+ "MeasTimestampUEPDCPBytes": "2020-03-18 02:23:18.220",
+ "MeasPeriodUEPDCPBytes": 20,
+ "UEPDCPBytesDL": 250000,
+ "UEPDCPBytesUL": 100000,
+ "MeasTimestampUEPRBUsage": "2020-03-18 02:23:18.220",
+ "MeasPeriodUEPRBUsage": 20,
+ "UEPRBUsageDL": 10,
+ "UEPRBUsageUL": 30,
+ "MeasTimestampRF": "2020-03-18 02:23:18.210",
+ "MeasPeriodRF": 40,
+ "ServingCellRF": {"RSRP": -115, "RSRQ": -16, "RSSINR": -5},
+ "NeighborCellRF": [
+ {"CID": "310-680-200-555001", "CellRF": {"RSRP": -90, "RSRQ": -13, "RSSINR": -2.5}},
+ {"CID": "CANTTOUCHTHIS", "CellRF": {"RSRP": -140, "RSRQ": -17, "RSSINR": -6}},
+ ],
+ }
+
+
@pytest.fixture
def cell_metrics_1():
return {
@pytest.fixture
def qpd_to_qp():
return {
- "PredictionUE": 12345,
+ "PredictionUE": "12345",
"UEMeasurements": {
"ServingCellID": "310-680-200-555002",
"MeasTimestampUEPDCPBytes": "2020-03-18 02:23:18.220",
},
],
}
+
+
+@pytest.fixture
+def qpd_to_qp_bad_cell():
+ return {
+ "PredictionUE": "8675309",
+ "UEMeasurements": {
+ "ServingCellID": "310-680-200-555002",
+ "MeasTimestampUEPDCPBytes": "2020-03-18 02:23:18.220",
+ "MeasPeriodUEPDCPBytes": 20,
+ "UEPDCPBytesDL": 250000,
+ "UEPDCPBytesUL": 100000,
+ "MeasTimestampUEPRBUsage": "2020-03-18 02:23:18.220",
+ "MeasPeriodUEPRBUsage": 20,
+ "UEPRBUsageDL": 10,
+ "UEPRBUsageUL": 30,
+ },
+ "CellMeasurements": [
+ {
+ "CellID": "310-680-200-555001",
+ "MeasTimestampPDCPBytes": "2020-03-18 02:23:18.220",
+ "MeasPeriodPDCPBytes": 20,
+ "PDCPBytesDL": 2000000,
+ "PDCPBytesUL": 1200000,
+ "MeasTimestampAvailPRB": "2020-03-18 02:23:18.220",
+ "MeasPeriodAvailPRB": 20,
+ "AvailPRBDL": 30,
+ "AvailPRBUL": 50,
+ "MeasTimestampRF": "2020-03-18 02:23:18.210",
+ "MeasPeriodRF": 40,
+ "RFMeasurements": {"RSRP": -90, "RSRQ": -13, "RSSINR": -2.5},
+ },
+ {
+ "CellID": "310-680-200-555002",
+ "MeasTimestampPDCPBytes": "2020-03-18 02:23:18.220",
+ "MeasPeriodPDCPBytes": 20,
+ "PDCPBytesDL": 800000,
+ "PDCPBytesUL": 400000,
+ "MeasTimestampAvailPRB": "2020-03-18 02:23:18.220",
+ "MeasPeriodAvailPRB": 20,
+ "AvailPRBDL": 30,
+ "AvailPRBUL": 45,
+ "MeasTimestampRF": "2020-03-18 02:23:18.210",
+ "MeasPeriodRF": 40,
+ "RFMeasurements": {"RSRP": -115, "RSRQ": -16, "RSSINR": -5},
+ },
+ ],
+ }
# do NOT use localhost, seems unresolved on jenkins VMs
newrt|start
mse| 30000 | -1 | 127.0.0.1:4562
+mse| 30001 | -1 | 127.0.0.1:4666
mse| 60001 | -1 | 127.0.0.1:4562
newrt|end
import time
from contextlib import suppress
from qpdriver import main, data
-from ricxappframe.xapp_frame import Xapp
+from ricxappframe.xapp_frame import Xapp, RMRXapp
-test_sender = None
+mock_traffic_steering = None
+mock_qp_predictor = None
"""
these tests are not currently parallelizable (do not use this tox flag)
"""
-def test_init_xapp(monkeypatch, ue_metrics, cell_metrics_1, cell_metrics_2, cell_metrics_3, qpd_to_qp):
+def test_init_xapp(monkeypatch, ue_metrics, cell_metrics_1, cell_metrics_2, cell_metrics_3, ue_metrics_with_bad_cell):
# monkeypatch post_init to set the data we want in SDL
def fake_post_init(self):
self.def_hand_called = 0
self.traffic_steering_requests = 0
self.sdl_set(data.UE_NS, "12345", ue_metrics, usemsgpack=False)
+ self.sdl_set(data.UE_NS, "8675309", ue_metrics_with_bad_cell, usemsgpack=False)
self.sdl_set(data.CELL_NS, "310-680-200-555001", cell_metrics_1, usemsgpack=False)
self.sdl_set(data.CELL_NS, "310-680-200-555002", cell_metrics_2, usemsgpack=False)
self.sdl_set(data.CELL_NS, "310-680-200-555003", cell_metrics_3, usemsgpack=False)
main.start(thread=True, use_fake_sdl=True)
-def test_data_merge(qpd_to_qp):
+def test_rmr_flow(monkeypatch, qpd_to_qp, qpd_to_qp_bad_cell):
"""
- test the merge (basically tests all of the code in data.py in this one line)
- TODO: this will go away when the full E2E flow is implemented as we can just look at the final result
+ this flow mocks out the xapps on both sides of QP driver.
+ It first stands up a mock qp predictor, then it starts up a mock traffic steering which will immediately send requests to the running qp driver]
"""
- assert data.form_qp_pred_req(main.rmr_xapp, 12345) == qpd_to_qp
+ expected_result = {}
-def test_rmr_flow(monkeypatch, ue_metrics, cell_metrics_1, cell_metrics_2, cell_metrics_3, qpd_to_qp):
- """
- just a skeleton for now.. this will evolve when qpd evolves
- """
+ # define a mock qp predictor
+ def default_handler(self, summary, sbuf):
+ pass
+
+ def qp_driver_handler(self, summary, sbuf):
+ nonlocal expected_result # closures ftw
+ pay = json.loads(summary["payload"])
+ expected_result[pay["PredictionUE"]] = pay
+
+ global mock_qp_predictor
+ mock_qp_predictor = RMRXapp(default_handler, rmr_port=4666, use_fake_sdl=True)
+ mock_qp_predictor.register_callback(qp_driver_handler, 30001)
+ mock_qp_predictor.run(thread=True)
+
+ time.sleep(1)
- # define a test sender
+ # define a mock traffic steering xapp
def entry(self):
- val = json.dumps({"test send 30000": 1}).encode()
+ # make sure a bad steering request doesn't blow up in qpd
+ val = "notevenjson".encode()
+ self.rmr_send(val, 30000)
+ val = json.dumps({"bad": "tothebone"}).encode() # json but missing UEPredictionSet
+ self.rmr_send(val, 30000)
+
+ # valid request body but missing cell id
+ val = json.dumps({"UEPredictionSet": ["VOIDOFLIGHT"]}).encode()
self.rmr_send(val, 30000)
+ # good traffic steering request
+ val = json.dumps({"UEPredictionSet": ["12345", "8675309"]}).encode()
+ self.rmr_send(val, 30000)
+
+ # should trigger the default handler and do nothing
val = json.dumps({"test send 60001": 2}).encode()
self.rmr_send(val, 60001)
- global test_sender
- test_sender = Xapp(entrypoint=entry, rmr_port=4564, use_fake_sdl=True)
- test_sender.run()
+ global mock_traffic_steering
+ mock_traffic_steering = Xapp(entrypoint=entry, rmr_port=4564, use_fake_sdl=True)
+ mock_traffic_steering.run() # this will return since entry isn't a loop
time.sleep(1)
- assert main.get_stats() == {"DefCalled": 1, "SteeringRequests": 1}
+ assert expected_result == {"12345": qpd_to_qp, "8675309": qpd_to_qp_bad_cell}
+ assert main.get_stats() == {"DefCalled": 1, "SteeringRequests": 4}
def teardown_module():
for example if an exception gets raised before stop is called in any test function above, pytest will hang forever
"""
with suppress(Exception):
- test_sender.stop()
+ mock_traffic_steering.stop()
+ with suppress(Exception):
+ mock_qp_predictor.stop()
with suppress(Exception):
main.stop()