-
-
-.. This work is licensed under a Creative Commons Attribution 4.0 International License.
-.. SPDX-License-Identifier: CC-BY-4.0
-..
-.. CAUTION: this document is generated from source in doc/src/*
-.. To make changes edit the source and recompile the document.
-.. Do NOT make changes directly to .rst or .md files.
-
-
+
+
+.. This work is licensed under a Creative Commons Attribution 4.0 International License.
+.. SPDX-License-Identifier: CC-BY-4.0
+..
+.. CAUTION: this document is generated from source in doc/src/*
+.. To make changes edit the source and recompile the document.
+.. Do NOT make changes directly to .rst or .md files.
+
+
============
-User's Guide
+User's Guide
============
---------------------
Traffic Steering xAPP
---------------------
-
-Introduction
+
+Introduction
============
-The Traffic Steering Use Case demonstrates intelligent inferences in the Near-RT RIC and E2 interaction in order to execute on the inferences.
+The Traffic Steering Use Case demonstrates intelligent inferences in the Near-RT RIC and E2 interaction in order to execute on the inferences.
-The current Use Case is comprised of three xApps:
-* Traffic Steering xApp (this one): Consume A1 Policy Intent, regularly monitor RAN metrics and request prediction for badly performing UEs, and listen for messages that show UE throughput predictions in different cells, in order to make a decision about UE Handover.
-* QoE Prediction (QP) xApp: Receive a feature set of metrics for a given UE, and output Throughput predictions on the Serving and any Neighbor cells
-* QoE Prediction Driver (QP Driver) xApp: Generate a feature set of metrics to input to QoE Prediction, based on SDL lookups in UE-Metric and Cell-Metric namespaces
+The current Use Case is comprised of five xApps:
+
+* KPI Monitoring xApp: Gathers the radio and system Key Performance Indicators (KPI) metrics from E2 Nodes and stores them in the Shared Data Layer (SDL).
+* Anomaly Detection (AD) xApp: Fetches UE data regularly from SDL, monitors UE metrics and sends the anomalous UEs to Traffic Steering xApp.
+* Traffic Steering xApp (*this one*): Consumes A1 Policy Intent, listens for badly performing UEs, sends prediction requests to QP Driver, and listens for messages that show UE throughput predictions in different cells to make a decision about UE Handover.
+* QoE Prediction Driver (QP Driver) xApp: Generates a feature set of metrics to input to QoE Prediction, based on SDL lookups in UE-Metric and Cell-Metric namespaces.
+* QoE Prediction (QP) xApp: Receives a feature set of metrics for a given UE, and output Throughput predictions on the Serving and any Neighbor cells to Traffic Steering xApp.
A1 Policy
=========
Policy Type ID is 20008.
-Currently, there is only one parameter that can be provided in A1 Policy: threshold
+Currently, there is only one parameter that can be provided in A1 Policy: *threshold*
An example Policy follows:
-{"threshold" : 5}
+
+.. code-block::
+
+ { "threshold": 5 }
+
+.. FIXME Is the "Serving Cell RSRP" related to "Degradation" in AD message
This Policy instructs Traffic Steering xApp to monitor current RAN metrics and request a QoE Prediction for any UE whose Serving Cell RSRP is less than 5.
+Receiving Anomaly Detection
+===========================
+
+Traffic Sterring xApp defines a callback to listen to Anomaly Detection messages received from AD xApp. The RMR message type is 30003.
+The following is an example message body:
+
+.. code-block::
+
+ [
+ {
+ "du-id":1010,
+ "ue-id":"Train passenger 2",
+ "measTimeStampRf":1620835470108,
+ "Degradation":"RSRP RSSINR"
+ }
+ ]
+
+.. ``[{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]``
+
Sending QoE Prediction Request
==============================
-Traffic Steering xApp loops repeatedly. After every sleep, it queries the SDL UE-Metric namespace. When it identifies a UE whose RSRP is below the threshold, it generates a QoE Prediction message. The RMR Message Type is 30000. The following is an example message body:
+Traffic Steering listens for badly performing UEs. When it identifies a UE whose RSRP is below the threshold, it generates
+a QoE Prediction Request message and sends it to the QP Driver xApp. The RMR Message Type is 30000.
+The following is an example message body:
+
+.. {"UEPredictionSet" : ["12345"]}
-{"UEPredictionSet" : ["12345"]}
+.. code-block::
+
+ { "UEPredictionSet": ["Train passenger 2"] }
Receiving QoE Prediction
========================
Traffic Steering xApp defines a callback for QoE Prediction received from QP xApp. The RMR message type is 30002. The following is an example message body:
-{"12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ] } }
+.. {"12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ] } }
+
+.. code-block::
+
+ {
+ "Train passenger 2":{
+ "310-680-200-555001":[2000000, 1200000],
+ "310-680-200-555002":[1000000, 4000000],
+ "310-680-200-555003":[5000000, 4000000]
+ }
+ }
+
+This message provides predictions for UE ID "Train passenger 2". For its service cell and neighbor cells, it lists an array containing two elements: DL Throughput and UL Throughput predictions.
+
+Traffic Steering xApp checks for the Service Cell ID for UE ID, and determines whether the predicted throughput is higher in a neighbor cell.
+The first cell in this prediction message is assumed to be the serving cell.
-This message provides predictions for UE ID 12345. For its service cell and neighbor cells, it lists an array containing two elements: DL Throughput and UL Throughput predictions.
+If predicted throughput is higher in a neighbor cell, Traffic Steering sends a CONTROL message through a REST call to E2 SIM. This message requests to hand-off the corresponding UE, and an example of its payload is as follows:
-Traffic Steering xApp checks for the Service Cell ID for UE ID, and determines whether the predicted throughput is higher in a neighbor cell.
+.. code-block::
-If predicted throughput is higher in a neighbor cell, Traffic Steering logs its intention to send a CONTROL message to do handover.
+ {
+ "command": "HandOff",
+ "seqNo": 1,
+ "ue": "Train passenger 2",
+ "fromCell": "310-680-200-555001",
+ "toCell": "310-680-200-555003",
+ "timestamp": "Sat May 22 10:35:33 2021",
+ "reason": "Hand-Off Control Request from TS xApp",
+ "ttl": 10
+ }
+Traffic Steering also logs the REST response, which shows whether or not the control operation has succeeded.
/*
Mnemonic: ts_xapp.cpp
- Abstract: Traffic Steering xApp;
- 1. Receives A1 Policy
- 2. Queries SDL to decide which UE to attempt Traffic Steering for
+ Abstract: Traffic Steering xApp
+ 1. Receives A1 Policy
+ 2. Receives anomaly detection
3. Requests prediction for UE throughput on current and neighbor cells
4. Receives prediction
5. Optionally exercises Traffic Steering action over E2
- Date: 22 April 2020
+ Date: 22 April 2020
Author: Ron Shacham
+ Modified: 21 May 2021 (Alexandre Huff)
+ Update for traffic steering use case in release D.
*/
#include <stdio.h>
#include <iostream>
#include <memory>
-#include <sdl/syncstorage.hpp>
#include <set>
#include <map>
#include <vector>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/schema.h>
#include <rapidjson/reader.h>
+#include <rapidjson/prettywriter.h>
-
+#include <curl/curl.h>
+#include <rmr/RIC_message_types.h>
#include "ricxfcpp/xapp.hpp"
+
+// Defines env name for the endpoint to POST handoff control messages
+#define ENV_CONTROL_URL "TS_CONTROL_URL"
+
+
using namespace rapidjson;
using namespace std;
+using namespace xapp;
+
using Namespace = std::string;
using Key = std::string;
using Data = std::vector<uint8_t>;
// ----------------------------------------------------------
-std::unique_ptr<Xapp> xfw;
+// Stores the the URL to POST handoff control messages
+const char *ts_control_url;
-std::string sdl_namespace_u = "TS-UE-metrics";
-std::string sdl_namespace_c = "TS-cell-metrics";
+std::unique_ptr<Xapp> xfw;
int rsrp_threshold = 0;
-std::unique_ptr<shareddatalayer::SyncStorage> sdl;
-
-Namespace nsu;
-Namespace nsc;
-
-struct UEData {
+/* struct UEData {
string serving_cell;
int serving_cell_rsrp;
-
-};
+}; */
struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
unordered_map<string, string> cell_pred;
std::string operation;
bool found_threshold = false;
-
bool Null() { return true; }
bool Bool(bool b) { return true; }
bool Int(int i) {
}
return true;
- }
+ }
bool Int64(int64_t i) { return true; }
bool Uint64(uint64_t u) { return true; }
bool Double(double d) { return true; }
bool String(const char* str, SizeType length, bool copy) {
-
+
if (curr_key.compare("operation") != 0) {
operation = str;
}
bool ue_id_found = false;
string curr_key = "";
string curr_value = "";
+ string serving_cell_id;
bool down_val = true;
bool Null() { return true; }
bool Bool(bool b) { return true; }
bool Int(int i) { return true; }
- bool Uint(unsigned u) {
+ bool Uint(unsigned u) {
+ // Currently, we assume the first cell in the prediction message is the serving cell
+ if ( serving_cell_id.empty() ) {
+ serving_cell_id = curr_key;
+ }
if (down_val) {
cell_pred_down[curr_key] = u;
down_val = true;
}
- return true;
+ return true;
}
bool Int64(int64_t i) { return true; }
bool EndArray(SizeType elementCount) { return true; }
};
+struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
+ /*
+ Assuming we receive the following payload from AD
+ [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
+ */
+ vector<string> prediction_ues;
+ string curr_key = "";
+
+ bool Key(const Ch* str, SizeType len, bool copy) {
+ curr_key = str;
+ return true;
+ }
+
+ bool String(const Ch* str, SizeType len, bool copy) {
+ // We are only interested in the "ue-id"
+ if ( curr_key.compare( "ue-id") == 0 ) {
+ prediction_ues.push_back( str );
+ }
+ return true;
+ }
+};
+
-struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
+/* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
unordered_map<string, string> cell_pred;
std::string serving_cell_id;
int serving_cell_rsrp;
bool in_serving_array = false;
int rf_meas_index = 0;
- bool in_serving_report_object = false;
+ bool in_serving_report_object = false;
string curr_key = "";
string curr_value = "";
return true;
}
-
+
bool Uint(unsigned i) {
if (in_serving_report_object) {
} else if (curr_key.compare("rssinr") == 0) {
serving_cell_sinr = i;
}
- }
-
+ }
+
return true; }
bool Int64(int64_t i) {
return true; }
bool Double(double d) { return true; }
bool String(const char* str, SizeType length, bool copy) {
-
+
if (curr_key.compare("ServingCellID") == 0) {
serving_cell_id = str;
- }
+ }
return true;
}
return true; }
bool Key(const char* str, SizeType length, bool copy) {
-
+
curr_key = str;
return true;
}
if (curr_key.compare("ServingCellRF") == 0) {
in_serving_array = true;
}
-
+
return true;
}
bool EndArray(SizeType elementCount) {
}
return true; }
-};
+}; */
-unordered_map<string, UEData> get_sdl_ue_data() {
+/* unordered_map<string, UEData> get_sdl_ue_data() {
fprintf(stderr, "In get_sdl_ue_data()\n");
unordered_map<string, string> ue_data;
unordered_map<string, UEData> return_ue_data_map;
-
+
std::string prefix3="";
Keys K2 = sdl->findKeys(nsu, prefix3);
DataMap Dk2 = sdl->get(nsu, K2);
-
+
string ue_json;
string ue_id;
-
+
for(auto si=K2.begin();si!=K2.end();++si){
std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
char val[val_v.size()+1]; // from Data
for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
val[i]='\0';
ue_id.assign((std::string)*si);
-
+
ue_json.assign(val);
ue_data[ue_id] = ue_json;
}
-
+
for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
UEDataHandler handler;
Reader reader;
string ueID = map_iter->first;
string serving_cell_id = handler.serving_cell_id;
int serv_rsrp = handler.serving_cell_rsrp;
-
+
return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
-
- }
+
+ }
return return_ue_data_map;
-}
+} */
void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
int response_to = 0; // max timeout wating for a response
int rmtype; // received message type
-
- fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
+ fprintf(stderr, "[INFO] Policy Callback got a message, type=%d, length=%d\n", mtype, len);
const char *arg = (const char*)payload.get();
- fprintf(stderr, "payload is %s\n", payload.get());
+ fprintf(stderr, "[INFO] Payload is %s\n", arg);
PolicyHandler handler;
Reader reader;
reader.Parse(ss,handler);
//Set the threshold value
-
if (handler.found_threshold) {
- fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
+ fprintf(stderr, "[INFO] Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
rsrp_threshold = handler.threshold;
}
mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-
-
}
-void send_prediction_request(vector<string> ues_to_predict) {
-
- std::unique_ptr<Message> msg;
- Msg_component payload; // special type of unique pointer to the payload
-
- int nthreads = 1;
- int response_to = 0; // max timeout wating for a response
- int mtype = 30000;
- int sz;
- int i;
- Msg_component send_payload;
-
- msg = xfw->Alloc_msg( 2048 );
-
- sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
- if( sz < 2048 ) {
- fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
- exit( 1 );
- }
-
- string ues_list = "[";
+// callback to handle handover reply (json http response)
+size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
+ const size_t totalBytes( size * num );
+ out->append( in, totalBytes );
+ return totalBytes;
+}
- for (int i = 0; i < ues_to_predict.size(); i++) {
- if (i == ues_to_predict.size() - 1) {
- ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
+// sends a handover message through REST
+void send_handoff_request( string msg ) {
+ CURL *curl = curl_easy_init();
+ curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
+ curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
+ curl_easy_setopt( curl, CURLOPT_POST, 1L );
+ // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
+
+ // response information
+ long httpCode( 0 );
+ unique_ptr<string> httpData( new string() );
+
+ curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
+ curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
+ curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
+
+ struct curl_slist *headers = NULL; // needs to free this after easy perform
+ headers = curl_slist_append( headers, "Accept: application/json" );
+ headers = curl_slist_append( headers, "Content-Type: application/json" );
+ curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
+
+ cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
+ cout << "[INFO] HandOff request is " << msg << endl;
+
+ // sending request
+ CURLcode res = curl_easy_perform( curl );
+ if( res != CURLE_OK ) {
+ cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
+
+ } else {
+
+ curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
+ if( httpCode == 200 ) {
+ // ============== DO SOMETHING USEFUL HERE ===============
+ // Currently, we only print out the HandOff reply
+ rapidjson::Document document;
+ document.Parse( httpData.get()->c_str() );
+ rapidjson::StringBuffer s;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+ document.Accept( writer );
+ cout << "[INFO] HandOff reply is " << s.GetString() << endl;
+
+
+ } else if ( httpCode == 404 ) {
+ cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
} else {
- ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
+ cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
+ "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
}
- }
- string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
-
- const char *body = message_body.c_str();
-
- // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
-
- send_payload = msg->Get_payload(); // direct access to payload
- // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
- snprintf( (char *) send_payload.get(), 2048, body);
- //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
-
- fprintf(stderr, "message body %s\n", send_payload.get());
- fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
-
- // payload updated in place, nothing to copy from, so payload parm is nil
- if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
- fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
}
- /*
- msg = xfw->Receive( response_to );
- if( msg != NULL ) {
- rmtype = msg->Get_mtype();
- send_payload = msg->Get_payload();
- fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
- }
- */
-
+ curl_slist_free_all( headers );
+ curl_easy_cleanup( curl );
}
void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
- long now;
- long total_count;
-
- int sz;
- int i;
+ time_t now;
+ string str_now;
+ static unsigned int seq_number = 0; // static counter, not thread-safe
int response_to = 0; // max timeout wating for a response
int rmtype; // received message type
int delay = 1000000; // mu-sec delay; default 1s
- cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
- cout << "payload is " << payload.get() << "\n";
-
- mtype = 0;
+ cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
+ cout << "[INFO] Payload is " << payload.get() << endl;
const char* arg = (const char*)payload.get();
PredictionHandler handler;
try {
-
Reader reader;
StringStream ss(arg);
reader.Parse(ss,handler);
} catch (...) {
- cout << "got an exception on stringstream read parse\n";
+ cout << "[ERROR] Got an exception on stringstream read parse\n";
}
-
- std::string pred_ue_id = handler.ue_id;
-
- cout << "Prediction for " << pred_ue_id << endl;
-
- unordered_map<string, int> throughput_map = handler.cell_pred_down;
-
- cout << endl;
-
- unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
- //Decision about CONTROL message
- //(1) Identify UE Id in Prediction message
- //(2) Get UEData struct for this UE Id
- //(3) Identify the UE's service cell ID
- //(4) Iterate through Prediction message.
- // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
-
- UEData pred_ue_data = sdl_data[pred_ue_id];
- std::string serving_cell_id = pred_ue_data.serving_cell;
-
- int serving_cell_throughput;
- int highest_throughput;
- std::string highest_throughput_cell_id;
- std::string::size_type str_size;
-
- for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
-
- std::string curr_cellid = map_iter->first;
- int curr_throughput = map_iter->second;
+ // We are only considering download throughput
+ unordered_map<string, int> throughput_map = handler.cell_pred_down;
- if (curr_cellid.compare(serving_cell_id) == 0) {
- serving_cell_throughput = curr_throughput;
- highest_throughput = serving_cell_throughput;
- }
+ // Decision about CONTROL message
+ // (1) Identify UE Id in Prediction message
+ // (2) Iterate through Prediction message.
+ // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
+ // We assume the first cell in the prediction message is the serving cell
- }
+ int serving_cell_throughput = 0;
+ int highest_throughput = 0;
+ string highest_throughput_cell_id;
- //Iterating again to identify the highest throughput prediction
+ // Getting the current serving cell throughput prediction
+ auto cell = throughput_map.find( handler.serving_cell_id );
+ serving_cell_throughput = cell->second;
+ // Iterating to identify the highest throughput prediction
for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
- std::string curr_cellid = map_iter->first;
+ string curr_cellid = map_iter->first;
int curr_throughput = map_iter->second;
- if (curr_throughput > serving_cell_throughput) {
+ if ( highest_throughput < curr_throughput ) {
highest_throughput = curr_throughput;
highest_throughput_cell_id = curr_cellid;
}
+
}
- if (highest_throughput > serving_cell_throughput) {
- cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
- cout << "UE ID: " << pred_ue_id << endl;
- cout << "Source cell " << serving_cell_id << endl;
- cout << "Target cell " << highest_throughput_cell_id << endl;
+ if ( highest_throughput > serving_cell_throughput ) {
+ // building a handoff control message
+ now = time( nullptr );
+ str_now = ctime( &now );
+ str_now.pop_back(); // removing the \n character
+
+ seq_number++; // static counter, not thread-safe
+
+ rapidjson::StringBuffer s;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+ writer.StartObject();
+ writer.Key( "command" );
+ writer.String( "HandOff" );
+ writer.Key( "seqNo" );
+ writer.Int( seq_number );
+ writer.Key( "ue" );
+ writer.String( handler.ue_id.c_str() );
+ writer.Key( "fromCell" );
+ writer.String( handler.serving_cell_id.c_str() );
+ writer.Key( "toCell" );
+ writer.String( highest_throughput_cell_id.c_str() );
+ writer.Key( "timestamp" );
+ writer.String( str_now.c_str() );
+ writer.Key( "reason" );
+ writer.String( "HandOff Control Request from TS xApp" );
+ writer.Key( "ttl" );
+ writer.Int( 10 );
+ writer.EndObject();
+ // creates a message like
+ /* {
+ "command": "HandOff",
+ "seqNo": 1,
+ "ue": "ueid-here",
+ "fromCell": "CID1",
+ "toCell": "CID3",
+ "timestamp": "Sat May 22 10:35:33 2021",
+ "reason": "HandOff Control Request from TS xApp",
+ "ttl": 10
+ } */
+
+ // sending a control request message
+ send_handoff_request( s.GetString() );
+
+ } else {
+ cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
}
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-
-
+ // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
+ // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
}
+void send_prediction_request( vector<string> ues_to_predict ) {
-//This function runs a loop that continuously checks SDL for any UE
-
-void run_loop() {
+ std::unique_ptr<Message> msg;
+ Msg_component payload; // special type of unique pointer to the payload
- cout << "in Traffic Steering run_loop()\n";
+ int sz;
+ int i;
+ size_t plen;
+ Msg_component send_payload;
- unordered_map<string, UEData> uemap;
+ msg = xfw->Alloc_msg( 2048 );
- while (1) {
+ sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
+ if( sz < 2048 ) {
+ fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
+ exit( 1 );
+ }
- uemap = get_sdl_ue_data();
+ string ues_list = "[";
- vector<string> prediction_ues;
+ for (int i = 0; i < ues_to_predict.size(); i++) {
+ if (i == ues_to_predict.size() - 1) {
+ ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
+ } else {
+ ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
+ }
+ }
- for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
- string ueid = map_iter->first;
- fprintf(stderr,"found a ueid %s\n", ueid.c_str());
- UEData data = map_iter->second;
+ string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
- fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
+ const char *body = message_body.c_str();
- if (data.serving_cell_rsrp < rsrp_threshold) {
- fprintf(stderr,"it is less than the rsrp threshold\n");
- prediction_ues.push_back(ueid);
- } else {
- fprintf(stderr,"it is not less than the rsrp threshold\n");
- }
- }
+ send_payload = msg->Get_payload(); // direct access to payload
+ snprintf( (char *) send_payload.get(), 2048, "%s", body );
- fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
+ /*
+ we are sending a string, so we have to include the nil byte in the RMR message
+ to keep things simple in the receiver side
+ */
+ plen = strlen( (char *) send_payload.get() ) + 1;
- if (prediction_ues.size() > 0) {
- send_prediction_request(prediction_ues);
- }
+ cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
- sleep(20);
+ // payload updated in place, nothing to copy from, so payload parm is nil
+ if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
+ fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
}
+
}
/* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
- * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
+ * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
+ * sends a prediction request to the QP Driver xApp.
*/
void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
- cout << "payload is " << payload.get() << "\n";
- mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
+ const char *json = (const char *) payload.get();
+
+ cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
+ cout << "[INFO] Payload is " << json << "\n";
+
+ AnomalyHandler handler;
+ Reader reader;
+ StringStream ss(json);
+ reader.Parse(ss,handler);
+
+ // just sending ACK to the AD xApp
+ mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
+
+ // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
+ // if( handler.degradation < rsrp_threshold )
+ send_prediction_request(handler.prediction_ues);
}
extern int main( int argc, char** argv ) {
int nthreads = 1;
-
char* port = (char *) "4560";
- sdl = shareddatalayer::SyncStorage::create();
-
- nsu = Namespace(sdl_namespace_u);
- nsc = Namespace(sdl_namespace_c);
+ // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
+ if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
+ cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
+ return 1;
+ }
-
- fprintf( stderr, "<XAPP> listening on port: %s\n", port );
- xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
-
- xfw->Add_msg_cb( 20010, policy_callback, NULL );
- xfw->Add_msg_cb( 30002, prediction_callback, NULL );
- xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
-
- std::thread loop_thread;
+ fprintf( stderr, "[TS xApp] listening on port %s\n", port );
+ xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
- loop_thread = std::thread(&run_loop);
+ xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
+ xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
+ xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
xfw->Run( nthreads );
-
+
}