# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 0.1.0
+tag: 0.2.0
:depth: 3
:local:
+[0.2.0] - 3/27/2020
+-------------------
+::
+
+ * Implement SDL calls and testing
+ * Small cleanups
+
+
[0.1.0] - 3/26/2020
-------------------
::
# limitations under the License.
# ==================================================================================
+# namespaces
+UE_NS = "TS-UE-metrics"
+CELL_NS = "TS-cell-metrics"
+
# list of keys in ue metrics that we want to carry over to qp pred
UE_KEY_LIST = set(
[
)
-def _fetch_ue_metrics(ueid):
- """fetch ue metrics for ueid"""
- return {}
-
-
-def _fetch_cell_metrics(cellid):
- """fetch cell metrics for a cellid"""
- return {}
-
-
-def form_qp_pred_req(ueid):
+def form_qp_pred_req(xapp_ref, ueid):
"""
this function takes in a single ueid and:
- fetches the current ue data
Note that a single request to qp driver may have many UEs in a list, however since a new message needs to be sent for each one,
the calling function iterates over that list, rather than doing it here.
"""
- ue_data = _fetch_ue_metrics(ueid)
+ ue_data = xapp_ref.sdl_get(UE_NS, str(ueid), usemsgpack=False)
serving_cid = ue_data["ServingCellID"]
# form the CellMeasurements
for cid in cell_ids:
- cellm = _fetch_cell_metrics(cid)
+ cellm = xapp_ref.sdl_get(CELL_NS, cid, usemsgpack=False)
# if we were really under performance strain here we could delete from the orig instead of copying but this code is far simpler
cell_data = {k: cellm[k] for k in CELL_KEY_LIST}
# ==================================================================================
from ricxappframe.xapp_frame import RMRXapp
-
-"""
-This is only a stencil for now, will be filled in!
-What is currently here was only for initial skeleton and test creation.
-"""
-
"""
RMR Messages
#define TS_UE_LIST 30000
"""
+rmr_xapp = None
+
+
def post_init(self):
self.def_hand_called = 0
self.traffic_steering_requests = 0
def default_handler(self, summary, sbuf):
self.def_hand_called += 1
- print(summary)
+ self.logger.info("QP Driver received an unexpected message of type: {}, dropping.".format(summary["message type"]))
self.rmr_free(sbuf)
self.rmr_free(sbuf)
-# obv some of these flags have to change
-rmr_xapp = RMRXapp(default_handler, post_init=post_init, rmr_port=4562, use_fake_sdl=True)
-rmr_xapp.register_callback(steering_req_handler, 30000)
-
-
-def start(thread=False):
+def start(thread=False, use_fake_sdl=False):
+ """
+ this is a convienence function that allows this xapp to run in Docker for "real" (no thread, real SDL)
+ but also easily modified for unit testing (e.g., use_fake_sdl)
+ the defaults for this function are for the Dockerized xapp.
+ """
+ global rmr_xapp
+ rmr_xapp = RMRXapp(default_handler, post_init=post_init, rmr_port=4562, use_fake_sdl=use_fake_sdl)
+ rmr_xapp.register_callback(steering_req_handler, 30000)
rmr_xapp.run(thread)
def stop():
- """can only be called if thread=True when started"""
+ """
+ can only be called if thread=True when started
+ TODO: could we register a signal handler for Docker SIGTERM that calls this?
+ """
rmr_xapp.stop()
setup(
name="qpdriver",
- version="0.1.0",
+ version="0.2.0",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="QP Driver Xapp for traffic steering",
+++ /dev/null
-# ==================================================================================
-# Copyright (c) 2020 AT&T Intellectual Property.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==================================================================================
-from qpdriver import data
-
-
-def test_merge(monkeypatch, ue_metrics, cell_metrics_1, cell_metrics_2, cell_metrics_3, qpd_to_qp):
-
- # monkeypatch
- def fake_ue(ueid):
- if ueid == 12345:
- return ue_metrics
-
- def fake_cell(cellid):
- if cellid == "310-680-200-555001":
- return cell_metrics_1
- if cellid == "310-680-200-555002":
- return cell_metrics_2
- if cellid == "310-680-200-555003":
- return cell_metrics_3
-
- monkeypatch.setattr("qpdriver.data._fetch_ue_metrics", fake_ue)
- monkeypatch.setattr("qpdriver.data._fetch_cell_metrics", fake_cell)
-
- assert data.form_qp_pred_req(12345) == qpd_to_qp
import json
import time
from contextlib import suppress
-from qpdriver import main
+from qpdriver import main, data
from ricxappframe.xapp_frame import Xapp
test_sender = None
+"""
+ these tests are not currently parallelizable (do not use this tox flag)
+ I would use setup_module, however that can't take monkeypatch fixtures
+ Currently looking for the best way to make this better: https://stackoverflow.com/questions/60886013/python-monkeypatch-in-pytest-setup-module
+"""
-def test_flow():
+
+def test_init_xapp(monkeypatch, ue_metrics, cell_metrics_1, cell_metrics_2, cell_metrics_3, qpd_to_qp):
+ # monkeypatch post_init to set the data we want in SDL
+ def fake_post_init(self):
+ self.def_hand_called = 0
+ self.traffic_steering_requests = 0
+ self.sdl_set(data.UE_NS, "12345", ue_metrics, usemsgpack=False)
+ self.sdl_set(data.CELL_NS, "310-680-200-555001", cell_metrics_1, usemsgpack=False)
+ self.sdl_set(data.CELL_NS, "310-680-200-555002", cell_metrics_2, usemsgpack=False)
+ self.sdl_set(data.CELL_NS, "310-680-200-555003", cell_metrics_3, usemsgpack=False)
+
+ # patch
+ monkeypatch.setattr("qpdriver.main.post_init", fake_post_init)
+
+ # start qpd
+ main.start(thread=True, use_fake_sdl=True)
+
+
+def test_data_merge(qpd_to_qp):
"""
- just a skeleton for now.. this will evolve when qpd evolves
+ test the merge (basically tests all of the code in data.py in this one line)
+ TODO: this will go away when the full E2E flow is implemented as we can just look at the final result
"""
+ assert data.form_qp_pred_req(main.rmr_xapp, 12345) == qpd_to_qp
- # start qpd
- main.start(thread=True)
+
+def test_rmr_flow(monkeypatch, ue_metrics, cell_metrics_1, cell_metrics_2, cell_metrics_3, qpd_to_qp):
+ """
+ just a skeleton for now.. this will evolve when qpd evolves
+ """
# define a test sender
def entry(self):