Update TS xApp for Release D use case 64/6164/1 1.1.0
authorAlexandre Huff <alexandrehuff@utfpr.edu.br>
Fri, 28 May 2021 16:32:02 +0000 (13:32 -0300)
committerAlexandre Huff <alexandrehuff@utfpr.edu.br>
Sun, 30 May 2021 19:41:19 +0000 (16:41 -0300)
Issue-ID: RICAPP-170

Change-Id: I436624ce9db05af005eb660817cbd0a59dd7e0fe
Signed-off-by: Alexandre Huff <alexandrehuff@utfpr.edu.br>
16 files changed:
Dockerfile
README
container-tag.yaml
docs/rel-notes.rst
docs/user-guide.rst
rmr-version.yaml
src/ts_xapp/CMakeLists.txt
src/ts_xapp/ts_xapp.cpp
test/app/CMakeLists.txt [new file with mode: 0644]
test/app/README [new file with mode: 0644]
test/app/ad_xapp.cpp [new file with mode: 0644]
test/app/echo-server.py [new file with mode: 0644]
test/app/qp_xapp.cpp [new file with mode: 0644]
test/app/routes.rt [new file with mode: 0644]
test/app/routes.rt.stash.inc [new file with mode: 0644]
xapp-descriptor/config.json

index a1cc537..f443bbb 100644 (file)
@@ -23,7 +23,7 @@
 #
 #                              Building should be as simple as:
 #
-#                                      docker build -f Dockerfile -t ric-app-ts:[version]
+#                                      docker build -f Dockerfile -t ric-app-ts:[version] .
 #
 #      Date:           27 April 2020
 #      Author:         E. Scott Daniels
@@ -31,7 +31,7 @@
 
 # the builder has: git, wget, cmake, gcc/g++, make, python2/3. v7 dropped nng support
 #
-FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as buildenv
+FROM nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as buildenv
 
 # spaces to save things in the build image to copy to final image
 RUN mkdir -p /playpen/assets /playpen/src /playpen/bin
@@ -40,13 +40,13 @@ ARG SRC=.
 WORKDIR /playpen
 
 # versions we snarf from package cloud
-ARG RMR_VER=4.0.5
-ARG SDL_VER=1.0.4
-ARG XFCPP_VER=1.2.0
+ARG RMR_VER=4.7.4
+ARG SDL_VER=1.0.4
+ARG XFCPP_VER=2.3.3
 
 # package cloud urls for wget
 ARG PC_REL_URL=https://packagecloud.io/o-ran-sc/release/packages/debian/stretch
-ARG PC_STG_URL=https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch
+ARG PC_STG_URL=https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch
 
 # pull in rmr
 RUN wget -nv --content-disposition ${PC_REL_URL}/rmr_${RMR_VER}_amd64.deb/download.deb && \
@@ -54,16 +54,16 @@ RUN wget -nv --content-disposition ${PC_REL_URL}/rmr_${RMR_VER}_amd64.deb/downlo
        dpkg -i rmr_${RMR_VER}_amd64.deb rmr-dev_${RMR_VER}_amd64.deb
 
 # pull in xapp framework c++
-RUN wget -nv --content-disposition ${PC_STG_URL}/ricxfcpp-dev_${XFCPP_VER}_amd64.deb/download.deb && \
-       wget -nv --content-disposition ${PC_STG_URL}/ricxfcpp_${XFCPP_VER}_amd64.deb/download.deb && \
+RUN wget -nv --content-disposition ${PC_REL_URL}/ricxfcpp-dev_${XFCPP_VER}_amd64.deb/download.deb && \
+       wget -nv --content-disposition ${PC_REL_URL}/ricxfcpp_${XFCPP_VER}_amd64.deb/download.deb && \
        dpkg -i ricxfcpp-dev_${XFCPP_VER}_amd64.deb ricxfcpp_${XFCPP_VER}_amd64.deb
 
-# snarf up SDL dependencies, then pull SDL package and install
-RUN apt-get update
-RUN apt-get install -y libboost-filesystem1.65.1 libboost-system1.65.1 libhiredis0.13
-RUN wget -nv --content-disposition ${PC_STG_URL}/sdl_${SDL_VER}-1_amd64.deb/download.deb && \
-       wget -nv --content-disposition ${PC_STG_URL}/sdl-dev_${SDL_VER}-1_amd64.deb/download.deb &&\
-       dpkg -i sdl-dev_${SDL_VER}-1_amd64.deb sdl_${SDL_VER}-1_amd64.deb
+# snarf up SDL dependencies, then pull SDL package and install
+RUN apt-get update
+RUN apt-get install -y libboost-filesystem1.65.1 libboost-system1.65.1 libhiredis0.13
+RUN wget -nv --content-disposition ${PC_STG_URL}/sdl_${SDL_VER}-1_amd64.deb/download.deb && \
+#      wget -nv --content-disposition ${PC_STG_URL}/sdl-dev_${SDL_VER}-1_amd64.deb/download.deb &&\
+#      dpkg -i sdl-dev_${SDL_VER}-1_amd64.deb sdl_${SDL_VER}-1_amd64.deb
 
 RUN git clone https://github.com/Tencent/rapidjson && \
    cd rapidjson && \
@@ -74,7 +74,9 @@ RUN git clone https://github.com/Tencent/rapidjson && \
    cd ${STAGE_DIR} && \
    rm -rf rapidjson
 
-
+# install TS curl dependencies
+RUN apt-get update && \
+       apt-get install -y libcurl4-openssl-dev
 
 #
 # build and install the application(s)
@@ -99,20 +101,25 @@ COPY assets/bootstrap.rt /playpen/assets
 # -----  create final, smaller, image ----------------------------------
 FROM ubuntu:18.04
 
-# package cloud urls for wget
-ARG PC_REL_URL=https://packagecloud.io/o-ran-sc/release/packages/debian/stretch
-ARG PC_STG_URL=https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch
-ARG SDL_VER=1.0.4
+# package cloud urls for wget
+ARG PC_REL_URL=https://packagecloud.io/o-ran-sc/release/packages/debian/stretch
+ARG PC_STG_URL=https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch
+ARG SDL_VER=1.0.4
 
-# sdl doesn't install into /usr/local like everybody else, and we don't want to
-# hunt for it or copy all of /usr, so we must pull and reinstall it.
-RUN apt-get update
-RUN apt-get install -y libboost-filesystem1.65.1 libboost-system1.65.1 libhiredis0.13 wget
-RUN wget -nv --content-disposition ${PC_STG_URL}/sdl_${SDL_VER}-1_amd64.deb/download.deb && \
-       wget -nv --content-disposition ${PC_STG_URL}/sdl-dev_${SDL_VER}-1_amd64.deb/download.deb &&\
-       dpkg -i sdl-dev_${SDL_VER}-1_amd64.deb sdl_${SDL_VER}-1_amd64.deb
+# sdl doesn't install into /usr/local like everybody else, and we don't want to
+# hunt for it or copy all of /usr, so we must pull and reinstall it.
+RUN apt-get update
+RUN apt-get install -y libboost-filesystem1.65.1 libboost-system1.65.1 libhiredis0.13 wget
+RUN wget -nv --content-disposition ${PC_STG_URL}/sdl_${SDL_VER}-1_amd64.deb/download.deb && \
+#      wget -nv --content-disposition ${PC_STG_URL}/sdl-dev_${SDL_VER}-1_amd64.deb/download.deb &&\
+#      dpkg -i sdl-dev_${SDL_VER}-1_amd64.deb sdl_${SDL_VER}-1_amd64.deb
 
-RUN rm -fr /var/lib/apt/lists
+# RUN rm -fr /var/lib/apt/lists
+
+# install TS curl dependencies in the final image
+RUN apt-get update && \
+       apt-get install -y libcurl4-openssl-dev && \
+       apt-get clean
 
 # snarf the various sdl, rmr, and cpp-framework libraries as well as any binaries
 # created (e.g. rmr_rprobe) and the application binary itself
@@ -136,4 +143,7 @@ ENV RMR_SRC_ID=service-ricxapp-trafficxapp-rmr.ricxapp:4560
 ENV RMR_VCTL_FILE=/tmp/rmr.v
 RUN echo "2" >/tmp/rmr.v
 
+# set TS env vars
+ENV TS_CONTROL_URL=http://localhost:5000/api/echo
+
 CMD [ "/usr/local/bin/ts_xapp" ]
diff --git a/README b/README
index 49bfd96..49d4989 100644 (file)
--- a/README
+++ b/README
@@ -20,7 +20,7 @@ Traffic Steering
 
 This repository contains the source for the RIC traffic steering application.
 
-This xApp can be onboarded through the xApp Onboarder.  The xapp descriptor 
+This xApp can be onboarded through the xApp Onboarder.  The xapp descriptor
 is under the xapp-descriptor/ directory.
 
 Then the xapp can be deployed through the App Manager.
@@ -29,14 +29,14 @@ In order for Traffic Steering xApp to carry out the Traffic Steering Use Case,
 the following needs to be done:
 * QP xApp needs to be onboarded and deployed (see xapp descriptor in that repo)
 * QP Driver xApp needs to be onboarded and deployed (see xapp descriptor in that repo)
-* SDL must contain network data
+* SDL must contain network data which required by the QP Driver xApp
 
 Mock network data
 =================
 
-Currently, there is no xapp available to receive RAN metrics and write to SDL.
+KPIMON xApp is in charge of collecting RAN metrics and write to SDL.
 
-The Traffic Steering Use Case can be run with mock data.  Everything needed to write this
+However, the Traffic Steering Use Case can be run with mock data.  Everything needed to write this
 mock data is included in this repo.  Directions are as follows:
 
 cd test/populatedb
@@ -49,3 +49,24 @@ This script will build a docker image locally and also install a helm chart in t
 
 The code that is run will write the necessary data to SDL
 
+Mock applications (Release D)
+=============================
+
+There are sample applications in the test/app/ directory that demonstrate a dummy message exchange
+among AD, QP Driver, QP, and TS xApps. Currently, there is no Dockerfile to run those mock applications,
+but they can be built according to the following:
+
+1. Build the TS xApp
+2. $cd test/app/
+3. $cmake -S . -B build
+4. $cd build/
+5. $make
+
+Run xApps in the following order:
+1. TS xApp
+2. qp_xapp
+3. ad_xapp
+
+There is an additional application that mocks a Rest server to demonstrate all control messages
+issued by the TS xApp. It is implemented in Python and echoes all messages it receives. This
+application is located at the test/app directory.
index ff04743..94914f5 100644 (file)
@@ -1,3 +1,3 @@
 # this is used by CI jobs to apply a tag when it builds the image
 ---
-tag: '1.0.13'
+tag: '1.1.0'
index f8c430b..270a4e3 100644 (file)
@@ -9,6 +9,10 @@
 Traffic Steering xAPP
 =====================
 
+2021 May 28 Version 1.1.0
+-------------------------
+        Changes to integrate with the traffic steering use case for Release D
+
 2020 Dec 8 Version 1.0.13
 -------------------------
         Small changes to integrate with RAN data written by KPIMON xApp
index 98c3a18..720588a 100644 (file)
@@ -1,29 +1,32 @@
-     
-.. This work is licensed under a Creative Commons Attribution 4.0 International License. 
-.. SPDX-License-Identifier: CC-BY-4.0 
-.. 
-.. CAUTION: this document is generated from source in doc/src/* 
-.. To make changes edit the source and recompile the document. 
-.. Do NOT make changes directly to .rst or .md files. 
+
+
+.. This work is licensed under a Creative Commons Attribution 4.0 International License.
+.. SPDX-License-Identifier: CC-BY-4.0
+..
+.. CAUTION: this document is generated from source in doc/src/*
+.. To make changes edit the source and recompile the document.
+.. Do NOT make changes directly to .rst or .md files.
+
+
 ============
-User's Guide 
+User's Guide
 ============
 ---------------------
 Traffic Steering xAPP
 ---------------------
-Introduction 
+
+Introduction
 ============
 
-The Traffic Steering Use Case demonstrates intelligent inferences in the Near-RT RIC and E2 interaction in order to execute on the inferences. 
+The Traffic Steering Use Case demonstrates intelligent inferences in the Near-RT RIC and E2 interaction in order to execute on the inferences.
 
-The current Use Case is comprised of three xApps:
-* Traffic Steering xApp (this one): Consume A1 Policy Intent, regularly monitor RAN metrics and request prediction for badly performing UEs, and listen for messages that show UE throughput predictions in different cells, in order to make a decision about UE Handover.
-* QoE Prediction (QP) xApp: Receive a feature set of metrics for a given UE, and output Throughput predictions on the Serving and any Neighbor cells
-* QoE Prediction Driver (QP Driver) xApp: Generate a feature set of metrics to input to QoE Prediction, based on SDL lookups in UE-Metric and Cell-Metric namespaces
+The current Use Case is comprised of five xApps:
+
+* KPI Monitoring xApp: Gathers the radio and system Key Performance Indicators (KPI) metrics from E2 Nodes and stores them in the Shared Data Layer (SDL).
+* Anomaly Detection (AD) xApp: Fetches UE data regularly from SDL, monitors UE metrics and sends the anomalous UEs to Traffic Steering xApp.
+* Traffic Steering xApp (*this one*): Consumes A1 Policy Intent, listens for badly performing UEs, sends prediction requests to QP Driver, and listens for messages that show UE throughput predictions in different cells to make a decision about UE Handover.
+* QoE Prediction Driver (QP Driver) xApp: Generates a feature set of metrics to input to QoE Prediction, based on SDL lookups in UE-Metric and Cell-Metric namespaces.
+* QoE Prediction (QP) xApp: Receives a feature set of metrics for a given UE, and output Throughput predictions on the Serving and any Neighbor cells to Traffic Steering xApp.
 
 A1 Policy
 =========
@@ -32,30 +35,85 @@ A1 Policy is sent to Traffic Steering xApp to define the Intent which will drive
 
 Policy Type ID is 20008.
 
-Currently, there is only one parameter that can be provided in A1 Policy: threshold
+Currently, there is only one parameter that can be provided in A1 Policy: *threshold*
 
 An example Policy follows:
-{"threshold" : 5}
+
+.. code-block::
+
+    { "threshold": 5 }
+
+.. FIXME Is the "Serving Cell RSRP" related to "Degradation" in AD message
 
 This Policy instructs Traffic Steering xApp to monitor current RAN metrics and request a QoE Prediction for any UE whose Serving Cell RSRP is less than 5.
 
+Receiving Anomaly Detection
+===========================
+
+Traffic Sterring xApp defines a callback to listen to Anomaly Detection messages received from AD xApp. The RMR message type is 30003.
+The following is an example message body:
+
+.. code-block::
+
+    [
+        {
+            "du-id":1010,
+            "ue-id":"Train passenger 2",
+            "measTimeStampRf":1620835470108,
+            "Degradation":"RSRP RSSINR"
+        }
+    ]
+
+.. ``[{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]``
+
 Sending QoE Prediction Request
 ==============================
 
-Traffic Steering xApp loops repeatedly.  After every sleep, it queries the SDL UE-Metric namespace.  When it identifies a UE whose RSRP is below the threshold, it generates a QoE Prediction message.  The RMR Message Type is 30000.  The following is an example message body:
+Traffic Steering listens for badly performing UEs. When it identifies a UE whose RSRP is below the threshold, it generates
+a QoE Prediction Request message and sends it to the QP Driver xApp. The RMR Message Type is 30000.
+The following is an example message body:
+
+.. {"UEPredictionSet" : ["12345"]}
 
-{"UEPredictionSet" : ["12345"]}
+.. code-block::
+
+    { "UEPredictionSet": ["Train passenger 2"] }
 
 Receiving QoE Prediction
 ========================
 
 Traffic Steering xApp defines a callback for QoE Prediction received from QP xApp.  The RMR message type is 30002.  The following is an example message body:
 
-{"12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ]  } }
+.. {"12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ]  } }
+
+.. code-block::
+
+    {
+        "Train passenger 2":{
+            "310-680-200-555001":[2000000, 1200000],
+            "310-680-200-555002":[1000000, 4000000],
+            "310-680-200-555003":[5000000, 4000000]
+        }
+    }
+
+This message provides predictions for UE ID "Train passenger 2".  For its service cell and neighbor cells, it lists an array containing two elements: DL Throughput and UL Throughput predictions.
+
+Traffic Steering xApp checks for the Service Cell ID for UE ID, and determines whether the predicted throughput is higher in a neighbor cell.
+The first cell in this prediction message is assumed to be the serving cell.
 
-This message provides predictions for UE ID 12345.  For its service cell and neighbor cells, it lists an array containing two elements: DL Throughput and UL Throughput predictions.
+If predicted throughput is higher in a neighbor cell, Traffic Steering sends a CONTROL message through a REST call to E2 SIM. This message requests to hand-off the corresponding UE, and an example of its payload is as follows:
 
-Traffic Steering xApp checks for the Service Cell ID for UE ID, and determines whether the predicted throughput is higher in a neighbor cell. 
+.. code-block::
 
-If predicted throughput is higher in a neighbor cell, Traffic Steering logs its intention to send a CONTROL message to do handover.
+    {
+        "command": "HandOff",
+        "seqNo": 1,
+        "ue": "Train passenger 2",
+        "fromCell": "310-680-200-555001",
+        "toCell": "310-680-200-555003",
+        "timestamp": "Sat May 22 10:35:33 2021",
+        "reason": "Hand-Off Control Request from TS xApp",
+        "ttl": 10
+    }
 
+Traffic Steering also logs the REST response, which shows whether or not the control operation has succeeded.
index ceaf505..dbec252 100644 (file)
@@ -1,3 +1,3 @@
 # Communicate to CI which version of RMR to install in the build/vet environment
 ---
-version: 4.0.5
+version: 4.7.4
index 06ea5f2..9fbc412 100644 (file)
@@ -16,9 +16,9 @@
 #
 
 add_executable( ts_xapp ts_xapp.cpp )
-target_link_libraries( ts_xapp ricxfcpp;rmr_si;sdl;pthread )
+target_link_libraries( ts_xapp ricxfcpp;rmr_si;pthread;curl )
 
-install( 
+install(
     TARGETS ts_xapp
     DESTINATION ${install_bin}
 )
index b32e71e..bf146ec 100644 (file)
 
 /*
        Mnemonic:       ts_xapp.cpp
-       Abstract:       Traffic Steering xApp;
-                              1. Receives A1 Policy
-                              2. Queries SDL to decide which UE to attempt Traffic Steering for
+       Abstract:       Traffic Steering xApp
+                  1. Receives A1 Policy
+                              2. Receives anomaly detection
                               3. Requests prediction for UE throughput on current and neighbor cells
                               4. Receives prediction
                               5. Optionally exercises Traffic Steering action over E2
 
-       Date:           22 April 2020
+       Date:     22 April 2020
        Author:         Ron Shacham
 
+  Modified: 21 May 2021 (Alexandre Huff)
+            Update for traffic steering use case in release D.
 */
 
 #include <stdio.h>
@@ -39,7 +41,6 @@
 #include <iostream>
 #include <memory>
 
-#include <sdl/syncstorage.hpp>
 #include <set>
 #include <map>
 #include <vector>
 #include <rapidjson/stringbuffer.h>
 #include <rapidjson/schema.h>
 #include <rapidjson/reader.h>
+#include <rapidjson/prettywriter.h>
 
-
+#include <curl/curl.h>
+#include <rmr/RIC_message_types.h>
 #include "ricxfcpp/xapp.hpp"
 
+
+// Defines env name for the endpoint to POST handoff control messages
+#define ENV_CONTROL_URL "TS_CONTROL_URL"
+
+
 using namespace rapidjson;
 using namespace std;
+using namespace xapp;
+
 using Namespace = std::string;
 using Key = std::string;
 using Data = std::vector<uint8_t>;
@@ -66,23 +76,17 @@ using Keys = std::set<Key>;
 
 // ----------------------------------------------------------
 
-std::unique_ptr<Xapp> xfw;
+// Stores the the URL to POST handoff control messages
+const char *ts_control_url;
 
-std::string sdl_namespace_u = "TS-UE-metrics";
-std::string sdl_namespace_c = "TS-cell-metrics";
+std::unique_ptr<Xapp> xfw;
 
 int rsrp_threshold = 0;
 
-std::unique_ptr<shareddatalayer::SyncStorage> sdl;
-
-Namespace nsu;
-Namespace nsc;
-
-struct UEData {
+/* struct UEData {
   string serving_cell;
   int serving_cell_rsrp;
-
-};
+}; */
 
 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
   unordered_map<string, string> cell_pred;
@@ -96,7 +100,6 @@ struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
   std::string operation;
   bool found_threshold = false;
 
-  
   bool Null() { return true; }
   bool Bool(bool b) { return true; }
   bool Int(int i) {
@@ -124,12 +127,12 @@ struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
     }
 
     return true;
-  }    
+  }
   bool Int64(int64_t i) {  return true; }
   bool Uint64(uint64_t u) {  return true; }
   bool Double(double d) {  return true; }
   bool String(const char* str, SizeType length, bool copy) {
-    
+
     if (curr_key.compare("operation") != 0) {
       operation = str;
     }
@@ -159,11 +162,16 @@ struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
   bool ue_id_found = false;
   string curr_key = "";
   string curr_value = "";
+  string serving_cell_id;
   bool down_val = true;
   bool Null() {  return true; }
   bool Bool(bool b) {  return true; }
   bool Int(int i) {  return true; }
-  bool Uint(unsigned u) {    
+  bool Uint(unsigned u) {
+    // Currently, we assume the first cell in the prediction message is the serving cell
+    if ( serving_cell_id.empty() ) {
+      serving_cell_id = curr_key;
+    }
 
     if (down_val) {
       cell_pred_down[curr_key] = u;
@@ -173,7 +181,7 @@ struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
       down_val = true;
     }
 
-    return true;    
+    return true;
 
   }
   bool Int64(int64_t i) {  return true; }
@@ -199,8 +207,30 @@ struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
   bool EndArray(SizeType elementCount) {  return true; }
 };
 
+struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
+  /*
+    Assuming we receive the following payload from AD
+    [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
+  */
+  vector<string> prediction_ues;
+  string curr_key = "";
+
+  bool Key(const Ch* str, SizeType len, bool copy) {
+    curr_key = str;
+    return true;
+  }
+
+  bool String(const Ch* str, SizeType len, bool copy) {
+    // We are only interested in the "ue-id"
+    if ( curr_key.compare( "ue-id") == 0 ) {
+      prediction_ues.push_back( str );
+    }
+    return true;
+  }
+};
+
 
-struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
+/* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
   unordered_map<string, string> cell_pred;
   std::string serving_cell_id;
   int serving_cell_rsrp;
@@ -209,7 +239,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
   bool in_serving_array = false;
   int rf_meas_index = 0;
 
-  bool in_serving_report_object = false;  
+  bool in_serving_report_object = false;
 
   string curr_key = "";
   string curr_value = "";
@@ -219,7 +249,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
 
     return true;
   }
-  
+
   bool Uint(unsigned i) {
 
     if (in_serving_report_object) {
@@ -230,8 +260,8 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
       } else if (curr_key.compare("rssinr") == 0) {
        serving_cell_sinr = i;
       }
-    }          
-    
+    }
+
     return true; }
   bool Int64(int64_t i) {
 
@@ -241,10 +271,10 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
     return true; }
   bool Double(double d) { return true; }
   bool String(const char* str, SizeType length, bool copy) {
-    
+
     if (curr_key.compare("ServingCellID") == 0) {
       serving_cell_id = str;
-    } 
+    }
 
     return true;
   }
@@ -255,7 +285,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
 
     return true; }
   bool Key(const char* str, SizeType length, bool copy) {
-    
+
     curr_key = str;
     return true;
   }
@@ -269,7 +299,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
     if (curr_key.compare("ServingCellRF") == 0) {
       in_serving_array = true;
     }
-    
+
     return true;
   }
   bool EndArray(SizeType elementCount) {
@@ -280,24 +310,24 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
     }
 
     return true; }
-};
+}; */
 
 
-unordered_map<string, UEData> get_sdl_ue_data() {
+/* unordered_map<string, UEData> get_sdl_ue_data() {
 
   fprintf(stderr, "In get_sdl_ue_data()\n");
 
   unordered_map<string, string> ue_data;
 
   unordered_map<string, UEData> return_ue_data_map;
-    
+
   std::string prefix3="";
   Keys K2 = sdl->findKeys(nsu, prefix3);
   DataMap Dk2 = sdl->get(nsu, K2);
-  
+
   string ue_json;
   string ue_id;
-  
+
   for(auto si=K2.begin();si!=K2.end();++si){
     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
     char val[val_v.size()+1];                               // from Data
@@ -306,11 +336,11 @@ unordered_map<string, UEData> get_sdl_ue_data() {
     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
     val[i]='\0';
       ue_id.assign((std::string)*si);
-      
+
       ue_json.assign(val);
       ue_data[ue_id] =  ue_json;
   }
-  
+
   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
     UEDataHandler handler;
     Reader reader;
@@ -320,25 +350,24 @@ unordered_map<string, UEData> get_sdl_ue_data() {
     string ueID = map_iter->first;
     string serving_cell_id = handler.serving_cell_id;
     int serv_rsrp = handler.serving_cell_rsrp;
-    
+
     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
-    
-  }  
+
+  }
 
   return return_ue_data_map;
-}
+} */
 
 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
 
   int response_to = 0;  // max timeout wating for a response
   int rmtype;          // received message type
 
-  
-  fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
+  fprintf(stderr, "[INFO] Policy Callback got a message, type=%d, length=%d\n", mtype, len);
 
   const char *arg = (const char*)payload.get();
 
-  fprintf(stderr, "payload is %s\n", payload.get());
+  fprintf(stderr, "[INFO] Payload is %s\n", arg);
 
   PolicyHandler handler;
   Reader reader;
@@ -346,85 +375,83 @@ void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_componen
   reader.Parse(ss,handler);
 
   //Set the threshold value
-
   if (handler.found_threshold) {
-    fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
+    fprintf(stderr, "[INFO] Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
     rsrp_threshold = handler.threshold;
   }
 
   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-  
-  
 }
 
-void send_prediction_request(vector<string> ues_to_predict) {
-
-  std::unique_ptr<Message> msg;
-  Msg_component payload;                                // special type of unique pointer to the payload
-  
-  int nthreads = 1;  
-  int response_to = 0;   // max timeout wating for a response
-  int mtype = 30000;
-  int sz;
-  int i;
-  Msg_component send_payload;
-  
-  msg = xfw->Alloc_msg( 2048 );
-  
-  sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
-  if( sz < 2048 ) {
-    fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
-    exit( 1 );
-  }
-
-  string ues_list = "[";
+// callback to handle handover reply (json http response)
+size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
+  const size_t totalBytes( size * num );
+  out->append( in, totalBytes );
+  return totalBytes;
+}
 
-  for (int i = 0; i < ues_to_predict.size(); i++) {
-    if (i == ues_to_predict.size() - 1) {
-      ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
+// sends a handover message through REST
+void send_handoff_request( string msg ) {
+  CURL *curl = curl_easy_init();
+  curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
+  curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
+  curl_easy_setopt( curl, CURLOPT_POST, 1L );
+  // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
+
+  // response information
+  long httpCode( 0 );
+  unique_ptr<string> httpData( new string() );
+
+  curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
+  curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
+  curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
+
+  struct curl_slist *headers = NULL;  // needs to free this after easy perform
+  headers = curl_slist_append( headers, "Accept: application/json" );
+  headers = curl_slist_append( headers, "Content-Type: application/json" );
+  curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
+
+  cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
+  cout << "[INFO] HandOff request is " << msg << endl;
+
+  // sending request
+  CURLcode res = curl_easy_perform( curl );
+  if( res != CURLE_OK ) {
+    cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
+
+  } else {
+
+    curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
+    if( httpCode == 200 ) {
+      // ============== DO SOMETHING USEFUL HERE ===============
+      // Currently, we only print out the HandOff reply
+      rapidjson::Document document;
+      document.Parse( httpData.get()->c_str() );
+      rapidjson::StringBuffer s;
+           rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+      document.Accept( writer );
+      cout << "[INFO] HandOff reply is " << s.GetString() << endl;
+
+
+    } else if ( httpCode == 404 ) {
+      cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
     } else {
-      ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
+      cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
+              "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
     }
-  }
 
-  string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
-
-  const char *body = message_body.c_str();
-
-  //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
-  
-  send_payload = msg->Get_payload(); // direct access to payload
-  //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
-  snprintf( (char *) send_payload.get(), 2048, body);
-  //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
-
-  fprintf(stderr, "message body %s\n", send_payload.get());  
-  fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
-  
-  // payload updated in place, nothing to copy from, so payload parm is nil
-  if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
-    fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
   }
 
-  /*
-  msg = xfw->Receive( response_to );
-  if( msg != NULL ) {
-    rmtype = msg->Get_mtype();
-    send_payload = msg->Get_payload();
-    fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
-  } 
-  */
-
+  curl_slist_free_all( headers );
+  curl_easy_cleanup( curl );
 }
 
 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
 
-  long now;
-  long total_count;
-
-  int sz;
-  int i;
+  time_t now;
+  string str_now;
+  static unsigned int seq_number = 0; // static counter, not thread-safe
 
   int response_to = 0;  // max timeout wating for a response
 
@@ -432,157 +459,192 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp
   int rmtype;                                                  // received message type
   int delay = 1000000;                         // mu-sec delay; default 1s
 
-  cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
-  cout << "payload is " << payload.get() << "\n";
-
-  mtype = 0;
+  cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
+  cout << "[INFO] Payload is " << payload.get() << endl;
 
   const char* arg = (const char*)payload.get();
   PredictionHandler handler;
 
   try {
-
     Reader reader;
     StringStream ss(arg);
     reader.Parse(ss,handler);
   } catch (...) {
-    cout << "got an exception on stringstream read parse\n";
+    cout << "[ERROR] Got an exception on stringstream read parse\n";
   }
-  
-  std::string pred_ue_id = handler.ue_id;
-  
-  cout << "Prediction for " << pred_ue_id << endl;
-  
-  unordered_map<string, int> throughput_map = handler.cell_pred_down;
-
-  cout << endl;
-  unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
 
-  //Decision about CONTROL message
-  //(1) Identify UE Id in Prediction message
-  //(2) Get UEData struct for this UE Id
-  //(3) Identify the UE's service cell ID
-  //(4) Iterate through Prediction message.
-  //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
-
-  UEData pred_ue_data = sdl_data[pred_ue_id];
-  std::string serving_cell_id = pred_ue_data.serving_cell;
-
-  int serving_cell_throughput;
-  int highest_throughput;
-  std::string highest_throughput_cell_id;
-  std::string::size_type str_size;
-
-  for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
-
-    std::string curr_cellid = map_iter->first;
-    int curr_throughput = map_iter->second;
+  // We are only considering download throughput
+  unordered_map<string, int> throughput_map = handler.cell_pred_down;
 
-    if (curr_cellid.compare(serving_cell_id) == 0) {
-      serving_cell_throughput = curr_throughput;
-      highest_throughput = serving_cell_throughput;
-    }
+  // Decision about CONTROL message
+  // (1) Identify UE Id in Prediction message
+  // (2) Iterate through Prediction message.
+  //     If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
+  //     We assume the first cell in the prediction message is the serving cell
 
-  }
+  int serving_cell_throughput = 0;
+  int highest_throughput = 0;
+  string highest_throughput_cell_id;
 
-  //Iterating again to identify the highest throughput prediction
+  // Getting the current serving cell throughput prediction
+  auto cell = throughput_map.find( handler.serving_cell_id );
+  serving_cell_throughput = cell->second;
 
+   // Iterating to identify the highest throughput prediction
   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
 
-    std::string curr_cellid = map_iter->first;
+    string curr_cellid = map_iter->first;
     int curr_throughput = map_iter->second;
 
-    if (curr_throughput > serving_cell_throughput) {
+    if ( highest_throughput < curr_throughput ) {
       highest_throughput = curr_throughput;
       highest_throughput_cell_id = curr_cellid;
     }
+
   }
 
-  if (highest_throughput > serving_cell_throughput) {
-    cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
-    cout << "UE ID: " << pred_ue_id << endl;
-    cout << "Source cell " << serving_cell_id << endl;
-    cout << "Target cell " << highest_throughput_cell_id << endl;
+  if ( highest_throughput > serving_cell_throughput ) {
+    // building a handoff control message
+    now = time( nullptr );
+    str_now = ctime( &now );
+    str_now.pop_back(); // removing the \n character
+
+    seq_number++;       // static counter, not thread-safe
+
+    rapidjson::StringBuffer s;
+         rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+    writer.StartObject();
+    writer.Key( "command" );
+    writer.String( "HandOff" );
+    writer.Key( "seqNo" );
+    writer.Int( seq_number );
+    writer.Key( "ue" );
+    writer.String( handler.ue_id.c_str() );
+    writer.Key( "fromCell" );
+    writer.String( handler.serving_cell_id.c_str() );
+    writer.Key( "toCell" );
+    writer.String( highest_throughput_cell_id.c_str() );
+    writer.Key( "timestamp" );
+    writer.String( str_now.c_str() );
+    writer.Key( "reason" );
+    writer.String( "HandOff Control Request from TS xApp" );
+    writer.Key( "ttl" );
+    writer.Int( 10 );
+    writer.EndObject();
+    // creates a message like
+    /* {
+      "command": "HandOff",
+      "seqNo": 1,
+      "ue": "ueid-here",
+      "fromCell": "CID1",
+      "toCell": "CID3",
+      "timestamp": "Sat May 22 10:35:33 2021",
+      "reason": "HandOff Control Request from TS xApp",
+      "ttl": 10
+    } */
+
+    // sending a control request message
+    send_handoff_request( s.GetString() );
+
+  } else {
+    cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
   }
 
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-  
-  
+  // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );      // validate that we can use the same buffer for 2 rts calls
+  // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
 }
 
+void send_prediction_request( vector<string> ues_to_predict ) {
 
-//This function runs a loop that continuously checks SDL for any UE
-
-void run_loop() {
+  std::unique_ptr<Message> msg;
+  Msg_component payload;                                // special type of unique pointer to the payload
 
-  cout << "in Traffic Steering run_loop()\n";
+  int sz;
+  int i;
+  size_t plen;
+  Msg_component send_payload;
 
-  unordered_map<string, UEData> uemap;
+  msg = xfw->Alloc_msg( 2048 );
 
-  while (1) {
+  sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
+  if( sz < 2048 ) {
+    fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
+    exit( 1 );
+  }
 
-    uemap = get_sdl_ue_data();
+  string ues_list = "[";
 
-    vector<string> prediction_ues;
+  for (int i = 0; i < ues_to_predict.size(); i++) {
+    if (i == ues_to_predict.size() - 1) {
+      ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
+    } else {
+      ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
+    }
+  }
 
-    for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
-      string ueid = map_iter->first;
-      fprintf(stderr,"found a ueid %s\n", ueid.c_str());
-      UEData data = map_iter->second;
+  string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
 
-      fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
+  const char *body = message_body.c_str();
 
-      if (data.serving_cell_rsrp < rsrp_threshold) {
-       fprintf(stderr,"it is less than the rsrp threshold\n");
-       prediction_ues.push_back(ueid);
-      } else {
-       fprintf(stderr,"it is not less than the rsrp threshold\n");
-      }
-    }
+  send_payload = msg->Get_payload(); // direct access to payload
+  snprintf( (char *) send_payload.get(), 2048, "%s", body );
 
-    fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
+  /*
+    we are sending a string, so we have to include the nil byte in the RMR message
+    to keep things simple in the receiver side
+   */
+  plen = strlen( (char *) send_payload.get() ) + 1;
 
-    if (prediction_ues.size() > 0) {
-      send_prediction_request(prediction_ues);
-    }
+  cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
 
-    sleep(20);
+  // payload updated in place, nothing to copy from, so payload parm is nil
+  if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
+    fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
   }
+
 }
 
 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
- * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
+ * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
+ * sends a prediction request to the QP Driver xApp.
  */
 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
-  cout << "payload is " << payload.get() << "\n";
-  mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
+  const char *json = (const char *) payload.get();
+
+  cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
+  cout << "[INFO] Payload is " << json << "\n";
+
+  AnomalyHandler handler;
+  Reader reader;
+  StringStream ss(json);
+  reader.Parse(ss,handler);
+
+  // just sending ACK to the AD xApp
+  mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
+
+  // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
+  // if( handler.degradation < rsrp_threshold )
+  send_prediction_request(handler.prediction_ues);
 }
 
 extern int main( int argc, char** argv ) {
 
   int nthreads = 1;
-
   char*        port = (char *) "4560";
 
-  sdl = shareddatalayer::SyncStorage::create();
-
-  nsu = Namespace(sdl_namespace_u);
-  nsc = Namespace(sdl_namespace_c);
+  // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
+  if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
+    cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
+    return 1;
+  }
 
-  
-  fprintf( stderr, "<XAPP> listening on port: %s\n", port );
-  xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); 
-  
-  xfw->Add_msg_cb( 20010, policy_callback, NULL );
-  xfw->Add_msg_cb( 30002, prediction_callback, NULL );
-  xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
-  
-  std::thread loop_thread;
+  fprintf( stderr, "[TS xApp] listening on port %s\n", port );
+  xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
 
-  loop_thread = std::thread(&run_loop);
+  xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL );          // msg type 20010
+  xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL );  // msg type 30002
+  xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
 
   xfw->Run( nthreads );
-  
+
 }
diff --git a/test/app/CMakeLists.txt b/test/app/CMakeLists.txt
new file mode 100644 (file)
index 0000000..ffd4cee
--- /dev/null
@@ -0,0 +1,51 @@
+# ==================================================================================
+#      Copyright (c) 2021 AT&T Intellectual Property.
+#      Copyright (c) 2021 Alexandre Huff.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+# ==================================================================================
+#
+#      Mnemonic: ad_xapp.cpp
+#      Abstract: Simulates the AD xApp sending an anomaly dectection message to
+#             the TS xApp. It sends one message and exits.
+#
+#      Date:     20 May 2021
+#      Author:   Alexandre Huff
+
+
+cmake_minimum_required(VERSION 3.14)
+set(CMAKE_CXX_STANDARD 11)
+
+add_executable(
+  ad_xapp
+  ad_xapp.cpp
+)
+target_link_libraries(
+  ad_xapp
+  ricxfcpp
+  rmr_si
+  pthread
+  curl
+)
+
+add_executable(
+  qp_xapp
+  qp_xapp.cpp
+)
+target_link_libraries(
+  qp_xapp
+  ricxfcpp
+  rmr_si
+  pthread
+  curl
+)
diff --git a/test/app/README b/test/app/README
new file mode 100644 (file)
index 0000000..cbf8477
--- /dev/null
@@ -0,0 +1,25 @@
+This directory contains a few sample programs to demonstrante
+a dummy message exchange of the Traffic Steering use case.
+These programs are simple and in most of the cases error
+checking is not performed to keep them simple.
+
+ad_xapp.cpp
+    Simulates the AD xApp sending an Anomaly Message to the
+    TS xApp. It sends one message, receives its corresponding
+    ACK, and exits. All steps are logged in the console. Uses
+    RMR port 4570.
+
+qp_xapp.cpp
+    Simulates both, the QoE Prediction (QP), and the QP Driver xApps.
+    Basically, this program receives Prediction Requests from TS xApp,
+    computes random throughput values (predictions) for neighbor cells,
+    and sends that Throughput Prediction to the TS xApp. All steps are
+    logged in the console. Uses RMR port 4580.
+
+echo-server.py
+    Implements a dummy echo server just for testing REST calls from
+    TS xApp.
+
+routes.rt
+    Contains a few RMR routing policies to allow AD, QP, and TS xApps
+    exchange messages in this controlled environment.
diff --git a/test/app/ad_xapp.cpp b/test/app/ad_xapp.cpp
new file mode 100644 (file)
index 0000000..bb142d2
--- /dev/null
@@ -0,0 +1,106 @@
+// vi: ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2021 AT&T Intellectual Property.
+       Copyright (c) 2021 Alexandre Huff.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       ad_xapp.cpp
+       Abstract:   Simulates the AD xApp sending an anomaly dectection message to
+                the TS xApp. It sends one message and exits.
+
+       Date:           20 May 2021
+       Author:         Alexandre Huff
+*/
+
+#include <iostream>
+#include <memory>
+#include <thread>
+#include <unistd.h>
+#include <string.h>
+
+#include <rmr/RIC_message_types.h>
+#include "ricxfcpp/xapp.hpp"
+
+using namespace std;
+using namespace xapp;
+
+unique_ptr<Xapp> xfw;
+
+void ts_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
+    cout << "[AD] TS Callback got a message, type=" << mtype << ", length=" << len << "\n";
+    cout << "[AD] Payload  is  " << payload.get() << endl;
+
+    // we only send one message, so we expect to receive only one as well
+    xfw->Stop();
+}
+
+// this thread just sends out one anomaly message to the TS xApp
+void ad_loop() {
+    std::unique_ptr<Message> msg;
+    Msg_component payload;
+    int size;
+    int plen;
+
+    cout << "[AD] In Anomaly Detection ad_loop()\n";
+    sleep( 1 ); // just wait receiver thread starting up
+
+    msg = xfw->Alloc_msg(2048);
+    size = msg->Get_available_size();
+    if ( size < 2048 ) {
+        cout << "[ERROR] Message returned does not have enough size: " << size << " < 2048" << endl;
+        exit(1);
+    }
+
+    // the message we are sending
+    const char *ad_msg = "[{\"du-id\": 1010, \"ue-id\": \"Train passenger 2\", \"measTimeStampRf\": 1620835470108, \"Degradation\": \"RSRP RSSINR\"}]";
+
+    payload = msg->Get_payload();
+    snprintf( (char *) payload.get(), 2048, "%s", ad_msg );
+
+    /*
+        we are sending a string, so we have to include the nil byte to send with RMR and keep
+        things simple in the receiver side
+   */
+    plen = strlen( (char *) payload.get() ) + 1;
+    cout << "[AD] Sending a message to TS, length: " << plen << "\n";
+    cout << "[AD] Message body " << payload.get() << endl;
+
+    // payload updated in place, nothing to copy from, so payload parm is nil
+    if ( ! msg->Send_msg( TS_ANOMALY_UPDATE, Message::NO_SUBID, plen, nullptr ) ) // msg type 30003
+        cout << "[ERROR] Unable to send a message to TS xApp, state: " << msg->Get_state() << endl;
+}
+
+int main(int argc, char const *argv[]) {
+    int nthreads = 1;
+
+    char* port = (char *) "4570";
+
+    cout << "[AD] Listening on port " << port << endl;
+    xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
+
+    xfw->Add_msg_cb( TS_ANOMALY_ACK, ts_callback, NULL ); /*Register a callback function for msg type 30004*/
+
+    std::thread ad_thread;
+    ad_thread = std::thread(&ad_loop);
+
+    xfw->Run( nthreads );
+
+    ad_thread.join();
+
+    return 0;
+}
diff --git a/test/app/echo-server.py b/test/app/echo-server.py
new file mode 100644 (file)
index 0000000..9e7ab17
--- /dev/null
@@ -0,0 +1,38 @@
+# ==================================================================================
+#      Copyright (c) 2021 AT&T Intellectual Property.
+#      Copyright (c) 2021 Alexandre Huff.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+# ==================================================================================
+#
+#
+#      Mnemonic:       echo-server.py
+#      Abstract:   Implements a naive echo server just for testing REST calls from
+#               TS xApp. Its goal is to run an effortless REST server.
+#
+#      Date:           22 May 2021
+#      Author:         Alexandre Huff
+
+
+from flask import Flask, jsonify, request
+
+app = Flask(__name__)
+
+@app.route('/api/echo', methods=['POST'])
+def echo():
+    data = request.get_json()
+    # just returning the received data
+    return jsonify(data)
+
+if __name__ == "__main__":
+    app.run()
diff --git a/test/app/qp_xapp.cpp b/test/app/qp_xapp.cpp
new file mode 100644 (file)
index 0000000..c1a7502
--- /dev/null
@@ -0,0 +1,109 @@
+// vi: ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2021 AT&T Intellectual Property.
+       Copyright (c) 2021 Alexandre Huff.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       qp_xapp.cpp
+       Abstract:   Simulates both, the QP Driver and QP xApp for testing the behavior
+                of the TS xApp.
+
+       Date:           20 May 2021
+       Author:         Alexandre Huff
+*/
+
+#include <iostream>
+#include <memory>
+#include <thread>
+#include <unistd.h>
+#include <string.h>
+
+#include <cstdlib>
+#include <ctime>
+
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/schema.h>
+#include <rapidjson/reader.h>
+
+#include <rmr/RIC_message_types.h>
+#include "ricxfcpp/xapp.hpp"
+
+using namespace std;
+using namespace xapp;
+using namespace rapidjson;
+
+unique_ptr<Xapp> xfw;
+
+
+void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
+    cout << "[QP] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
+    cout << "[QP] Payload is " << payload.get() << endl;
+
+    int randomNumber;
+    srand( (unsigned int) time(0) );
+
+    const char *json = (const char *) payload.get();
+    Document document;
+    document.Parse(json);
+
+    const Value& uePred = document["UEPredictionSet"];
+    if ( uePred.Size() > 0 ) {
+        string ueid = uePred[0].GetString();
+        // we want to create "{"ueid-user1": {"CID1": [10, 20], "CID2": [30, 40], "CID3": [50, 60]}}";
+        string body = "{\"" + ueid + "\": {";
+        for ( int i = 1; i <= 3; i++ ) {
+            int down = rand() % 100;
+            int up = rand() % 100;
+            if ( i != 3 ) {
+                body += "\"CID" + to_string(i) + "\": [" + to_string(down) + ", " + to_string(up) + "], ";
+            } else {
+                body += "\"CID" + to_string(i) + "\": [" + to_string(down) + ", " + to_string(up) + "]}}";
+            }
+        }
+
+        /*
+            we are sending a string, so we have to include the nil byte to send with RMR and keep
+            things simple in the receiver side
+        */
+        int len = body.size() + 1;
+
+        cout << "[QP] Sending a message to TS, length=" << len << "\n";
+        cout << "[QP] Message body " << body << endl;
+
+        // payload updated in place, nothing to copy from, so payload parm is nil
+        if ( ! mbuf.Send_response( TS_QOE_PREDICTION, Message::NO_SUBID, len, (unsigned char *) body.c_str() ) ) // msg type 30002
+            cout << "[ERROR] unable to send a message to TS xApp, state: " << mbuf.Get_state() << endl;
+    }
+}
+
+int main(int argc, char const *argv[]) {
+    int nthreads = 1;
+
+    char* port = (char *) "4580";
+
+    cout << "[QP] listening on port " << port << endl;
+    xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
+
+    xfw->Add_msg_cb( TS_UE_LIST, prediction_callback, NULL ); /*Register a callback function for msg type 30000*/
+
+    xfw->Run( nthreads );
+
+    return 0;
+}
diff --git a/test/app/routes.rt b/test/app/routes.rt
new file mode 100644 (file)
index 0000000..5cf0be1
--- /dev/null
@@ -0,0 +1,10 @@
+newrt|start
+# TS xApp
+mse | 20010 | -1 | 127.0.1.1:4560 # policy callback
+mse | 30002 | -1 | 127.0.1.1:4560 # prediction callback
+mse | 30003 | -1 | 127.0.1.1:4560 # ad callback
+# AD xApp
+mse | 30004 | -1 | 127.0.1.1:4570 # ts callback
+# QP / QP Driver xApps
+mse | 30000 | -1 | 127.0.1.1:4580 # prediction callback
+newrt|end
diff --git a/test/app/routes.rt.stash.inc b/test/app/routes.rt.stash.inc
new file mode 100644 (file)
index 0000000..e69de29
index a00dd37..d410e71 100644 (file)
@@ -1,52 +1,52 @@
-{\r
-        "xapp_name": "trafficxapp",\r
-        "version": "1.0.0",\r
-        "containers": [\r
-            {\r
-                "name": "trafficxapp",\r
-                "image": {\r
-                    "registry": "nexus3.o-ran-sc.org:10002",\r
-                    "name": "o-ran-sc/ric-app-ts",\r
-                    "tag": "1.0.13"\r
-                }\r
-            }\r
-        ],\r
-        "messaging": {\r
-            "ports": [\r
-                {\r
-                    "name": "rmr-data",\r
-                    "container": "trafficxapp",\r
-                    "port": 4560,\r
-                    "rxMessages": [ \r
-                       "TS_QOE_PREDICTION",\r
-                       "A1_POLICY_REQ",\r
-                       "TS_ANOMALY_UPDATE"\r
-                    ],\r
-                    "txMessages": [ "TS_UE_LIST", "TS_ANOMALY_ACK" ],\r
-                    "policies": [20008],\r
-                    "description": "rmr receive data port for mcxapp"\r
-                },\r
-                {\r
-                    "name": "rmr-route",\r
-                    "container": "trafficxapp",\r
-                    "port": 4561,\r
-                    "description": "rmr route port for mcxapp"\r
-                }\r
-            ]\r
-        },\r
-        "rmr": {\r
-            "protPort": "tcp:4560",\r
-            "maxSize": 2072,\r
-            "numWorkers": 1,\r
-            "txMessages": [\r
-                "TS_UE_LIST",\r
-               "TS_ANOMALY_ACK"\r
-            ],\r
-            "rxMessages": [\r
-                "TS_QOE_PREDICTION",\r
-                "A1_POLICY_REQ",\r
-               "TS_ANOMALY_UPDATE"             \r
-            ],\r
-            "policies": [20008]\r
-        }\r
-    }\r
+{
+        "xapp_name": "trafficxapp",
+        "version": "1.0.0",
+        "containers": [
+            {
+                "name": "trafficxapp",
+                "image": {
+                    "registry": "nexus3.o-ran-sc.org:10002",
+                    "name": "o-ran-sc/ric-app-ts",
+                    "tag": "1.1.0"
+                }
+            }
+        ],
+        "messaging": {
+            "ports": [
+                {
+                    "name": "rmr-data",
+                    "container": "trafficxapp",
+                    "port": 4560,
+                    "rxMessages": [
+                        "TS_QOE_PREDICTION",
+                        "A1_POLICY_REQ",
+                        "TS_ANOMALY_UPDATE"
+                    ],
+                    "txMessages": [ "TS_UE_LIST", "TS_ANOMALY_ACK" ],
+                    "policies": [20008],
+                    "description": "rmr receive data port for mcxapp"
+                },
+                {
+                    "name": "rmr-route",
+                    "container": "trafficxapp",
+                    "port": 4561,
+                    "description": "rmr route port for mcxapp"
+                }
+            ]
+        },
+        "rmr": {
+            "protPort": "tcp:4560",
+            "maxSize": 2072,
+            "numWorkers": 1,
+            "txMessages": [
+                "TS_UE_LIST",
+                "TS_ANOMALY_ACK"
+            ],
+            "rxMessages": [
+                "TS_QOE_PREDICTION",
+                "A1_POLICY_REQ",
+                "TS_ANOMALY_UPDATE"
+            ],
+            "policies": [20008]
+        }
+    }