// vi: ts=4 sw=4 noet:
/*
==================================================================================
- Copyright (c) 2020 Nokia
Copyright (c) 2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
/*
Mnemonic: ts_xapp.cpp
- Abstract: Traffic Steering xApp;
- 1. Receives A1 Policy
- 2. Queries SDL to decide which UE to attempt Traffic Steering for
+ Abstract: Traffic Steering xApp
+ 1. Receives A1 Policy
+ 2. Receives anomaly detection
3. Requests prediction for UE throughput on current and neighbor cells
4. Receives prediction
5. Optionally exercises Traffic Steering action over E2
- Date: 22 April 2020
+ Date: 22 April 2020
Author: Ron Shacham
-
+
+ Modified: 21 May 2021 (Alexandre Huff)
+ Update for traffic steering use case in release D.
*/
#include <stdio.h>
#include <string.h>
#include <unistd.h>
+#include <thread>
#include <iostream>
#include <memory>
-#include <sdl/syncstorage.hpp>
#include <set>
#include <map>
#include <vector>
#include <string>
+#include <unordered_map>
+
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/schema.h>
+#include <rapidjson/reader.h>
+#include <rapidjson/prettywriter.h>
+#include <curl/curl.h>
+#include <rmr/RIC_message_types.h>
#include "ricxfcpp/xapp.hpp"
+
+// Defines env name for the endpoint to POST handoff control messages
+#define ENV_CONTROL_URL "TS_CONTROL_URL"
+
+
+using namespace rapidjson;
+using namespace std;
+using namespace xapp;
+
using Namespace = std::string;
using Key = std::string;
using Data = std::vector<uint8_t>;
// ----------------------------------------------------------
-std::unique_ptr<Xapp> xfw;
-
+// Stores the the URL to POST handoff control messages
+const char *ts_control_url;
-void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
+std::unique_ptr<Xapp> xfw;
- long now;
- long total_count;
+int rsrp_threshold = 0;
+
+/* struct UEData {
+ string serving_cell;
+ int serving_cell_rsrp;
+}; */
+
+struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
+ unordered_map<string, string> cell_pred;
+ std::string ue_id;
+ bool ue_id_found = false;
+ string curr_key = "";
+ string curr_value = "";
+ int policy_type_id;
+ int policy_instance_id;
+ int threshold;
+ std::string operation;
+ bool found_threshold = false;
+
+ bool Null() { return true; }
+ bool Bool(bool b) { return true; }
+ bool Int(int i) {
+
+ if (curr_key.compare("policy_type_id") == 0) {
+ policy_type_id = i;
+ } else if (curr_key.compare("policy_instance_id") == 0) {
+ policy_instance_id = i;
+ } else if (curr_key.compare("threshold") == 0) {
+ found_threshold = true;
+ threshold = i;
+ }
- int sz;
- int i;
+ return true;
+ }
+ bool Uint(unsigned u) {
+
+ if (curr_key.compare("policy_type_id") == 0) {
+ policy_type_id = u;
+ } else if (curr_key.compare("policy_instance_id") == 0) {
+ policy_instance_id = u;
+ } else if (curr_key.compare("threshold") == 0) {
+ found_threshold = true;
+ threshold = u;
+ }
- int response_to = 0; // max timeout wating for a response
+ return true;
+ }
+ bool Int64(int64_t i) { return true; }
+ bool Uint64(uint64_t u) { return true; }
+ bool Double(double d) { return true; }
+ bool String(const char* str, SizeType length, bool copy) {
- int send_mtype = 0;
- int rmtype; // received message type
- int delay = 1000000; // mu-sec delay; default 1s
+ if (curr_key.compare("operation") != 0) {
+ operation = str;
+ }
- std::unique_ptr<Message> msg;
- Msg_component send_payload; // special type of unique pointer to the payload
-
- fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
- fprintf(stderr, "payload is %s\n", payload.get());
-
- //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+ return true;
+ }
+ bool StartObject() {
- mtype = 0;
+ return true;
+ }
+ bool Key(const char* str, SizeType length, bool copy) {
- fprintf(stderr, "cb 1\n");
+ curr_key = str;
- msg = xfw->Alloc_msg( 2048 );
-
- sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
- if( sz < 2048 ) {
- fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
- exit( 1 );
+ return true;
}
+ bool EndObject(SizeType memberCount) { return true; }
+ bool StartArray() { return true; }
+ bool EndArray(SizeType elementCount) { return true; }
+
+};
+
+struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
+ unordered_map<string, int> cell_pred_down;
+ unordered_map<string, int> cell_pred_up;
+ std::string ue_id;
+ bool ue_id_found = false;
+ string curr_key = "";
+ string curr_value = "";
+ string serving_cell_id;
+ bool down_val = true;
+ bool Null() { return true; }
+ bool Bool(bool b) { return true; }
+ bool Int(int i) { return true; }
+ bool Uint(unsigned u) {
+ // Currently, we assume the first cell in the prediction message is the serving cell
+ if ( serving_cell_id.empty() ) {
+ serving_cell_id = curr_key;
+ }
- fprintf(stderr, "cb 2");
-
- send_payload = msg->Get_payload(); // direct access to payload
- snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\" : [\"222\", \"333\", \"444\"]}" );
+ if (down_val) {
+ cell_pred_down[curr_key] = u;
+ down_val = false;
+ } else {
+ cell_pred_up[curr_key] = u;
+ down_val = true;
+ }
+
+ return true;
- fprintf(stderr, "cb 3");
-
- // payload updated in place, nothing to copy from, so payload parm is nil
- if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() )+1, NULL )) {
- fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
}
+ bool Int64(int64_t i) { return true; }
+ bool Uint64(uint64_t u) { return true; }
+ bool Double(double d) { return true; }
+ bool String(const char* str, SizeType length, bool copy) {
- fprintf(stderr, "cb 4");
+ return true;
+ }
+ bool StartObject() { return true; }
+ bool Key(const char* str, SizeType length, bool copy) {
+ if (!ue_id_found) {
+
+ ue_id = str;
+ ue_id_found = true;
+ } else {
+ curr_key = str;
+ }
+ return true;
+ }
+ bool EndObject(SizeType memberCount) { return true; }
+ bool StartArray() { return true; }
+ bool EndArray(SizeType elementCount) { return true; }
+};
+struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
/*
- msg = xfw->Receive( response_to );
- if( msg != NULL ) {
- rmtype = msg->Get_mtype();
- send_payload = msg->Get_payload();
- fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
- }
- */
-
-}
+ Assuming we receive the following payload from AD
+ [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
+ */
+ vector<string> prediction_ues;
+ string curr_key = "";
+
+ bool Key(const Ch* str, SizeType len, bool copy) {
+ curr_key = str;
+ return true;
+ }
+ bool String(const Ch* str, SizeType len, bool copy) {
+ // We are only interested in the "ue-id"
+ if ( curr_key.compare( "ue-id") == 0 ) {
+ prediction_ues.push_back( str );
+ }
+ return true;
+ }
+};
-void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
- long now;
- long total_count;
+/* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
+ unordered_map<string, string> cell_pred;
+ std::string serving_cell_id;
+ int serving_cell_rsrp;
+ int serving_cell_rsrq;
+ int serving_cell_sinr;
+ bool in_serving_array = false;
+ int rf_meas_index = 0;
- int sz;
- int i;
+ bool in_serving_report_object = false;
- int response_to = 0; // max timeout wating for a response
+ string curr_key = "";
+ string curr_value = "";
+ bool Null() { return true; }
+ bool Bool(bool b) { return true; }
+ bool Int(int i) {
- int send_mtype = 0;
- int rmtype; // received message type
- int delay = 1000000; // mu-sec delay; default 1s
+ return true;
+ }
- std::unique_ptr<Message> msg;
- Msg_component send_payload; // special type of unique pointer to the payload
-
- fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
- fprintf(stderr, "payload is %s\n", payload.get());
-
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+ bool Uint(unsigned i) {
- mtype = 0;
+ if (in_serving_report_object) {
+ if (curr_key.compare("rsrp") == 0) {
+ serving_cell_rsrp = i;
+ } else if (curr_key.compare("rsrq") == 0) {
+ serving_cell_rsrq = i;
+ } else if (curr_key.compare("rssinr") == 0) {
+ serving_cell_sinr = i;
+ }
+ }
- fprintf(stderr, "cb 1\n");
-
-}
+ return true; }
+ bool Int64(int64_t i) {
+ return true; }
+ bool Uint64(uint64_t i) {
+ return true; }
+ bool Double(double d) { return true; }
+ bool String(const char* str, SizeType length, bool copy) {
-extern int main( int argc, char** argv ) {
+ if (curr_key.compare("ServingCellID") == 0) {
+ serving_cell_id = str;
+ }
- std::unique_ptr<Message> msg;
- Msg_component payload; // special type of unique pointer to the payload
+ return true;
+ }
+ bool StartObject() {
+ if (curr_key.compare("ServingCellRF") == 0) {
+ in_serving_report_object = true;
+ }
- int nthreads = 1;
+ return true; }
+ bool Key(const char* str, SizeType length, bool copy) {
- int response_to = 0; // max timeout wating for a response
+ curr_key = str;
+ return true;
+ }
+ bool EndObject(SizeType memberCount) {
+ if (curr_key.compare("ServingCellRF") == 0) {
+ in_serving_report_object = false;
+ }
+ return true; }
+ bool StartArray() {
- int delay = 1000000; // mu-sec delay; default 1s
+ if (curr_key.compare("ServingCellRF") == 0) {
+ in_serving_array = true;
+ }
- char* port = (char *) "4560";
+ return true;
+ }
+ bool EndArray(SizeType elementCount) {
- int ai;
-
- ai = 1;
- while( ai < argc ) { // very simple flag processing (no bounds/error checking)
- if( argv[ai][0] != '-' ) {
- break;
+ if (curr_key.compare("servingCellRF") == 0) {
+ in_serving_array = false;
+ rf_meas_index = 0;
}
-
- switch( argv[ai][1] ) { // we only support -x so -xy must be -x -y
- case 'd': // delay between messages (mu-sec)
- delay = atoi( argv[ai+1] );
- ai++;
- break;
-
- case 'p':
- port = argv[ai+1];
- ai++;
- break;
-
- case 't': // timeout in seconds; we need to convert to ms for rmr calls
- response_to = atoi( argv[ai+1] ) * 1000;
- ai++;
- break;
- }
- ai++;
+
+ return true; }
+}; */
+
+
+/* unordered_map<string, UEData> get_sdl_ue_data() {
+
+ fprintf(stderr, "In get_sdl_ue_data()\n");
+
+ unordered_map<string, string> ue_data;
+
+ unordered_map<string, UEData> return_ue_data_map;
+
+ std::string prefix3="";
+ Keys K2 = sdl->findKeys(nsu, prefix3);
+ DataMap Dk2 = sdl->get(nsu, K2);
+
+ string ue_json;
+ string ue_id;
+
+ for(auto si=K2.begin();si!=K2.end();++si){
+ std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
+ char val[val_v.size()+1]; // from Data
+ int i;
+
+ for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
+ val[i]='\0';
+ ue_id.assign((std::string)*si);
+
+ ue_json.assign(val);
+ ue_data[ue_id] = ue_json;
}
-
- fprintf( stderr, "<XAPP> response timeout set to: %d\n", response_to );
- fprintf( stderr, "<XAPP> listening on port: %s\n", port );
-
- xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
- fprintf(stderr, "code1\n");
-
- xfw->Add_msg_cb( 20010, policy_callback, NULL );
- xfw->Add_msg_cb( 30002, prediction_callback, NULL );
+ for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
+ UEDataHandler handler;
+ Reader reader;
+ StringStream ss(map_iter->second.c_str());
+ reader.Parse(ss,handler);
- fprintf(stderr, "code2\n");
+ string ueID = map_iter->first;
+ string serving_cell_id = handler.serving_cell_id;
+ int serv_rsrp = handler.serving_cell_rsrp;
- std::string sdl_namespace_u = "TS-UE-metrics";
- std::string sdl_namespace_c = "TS-cell-metrics";
+ return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
- fprintf(stderr, "code5\n");
-
- std::unique_ptr<shareddatalayer::SyncStorage> sdl(shareddatalayer::SyncStorage::create());
+ }
- Namespace nsu(sdl_namespace_u);
- Namespace nsc(sdl_namespace_c);
+ return return_ue_data_map;
+} */
- /*
+void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
- fprintf(stderr, "before sdl set\n");
-
- try{
- //connecting to the Redis and generating a random key for namespace "hwxapp"
- fprintf(stderr, "IN SDL Set Data");
- // std::string data_string = "{\"rsrp\" : -110}";
+ int response_to = 0; // max timeout wating for a response
+ int rmtype; // received message type
+ string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
- std::string data_string = "{\"CellID\": \"310-680-200-555001\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 2000000, \"PDCPBytesUL\": 1200000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 50 }";
-
- DataMap dmap;
- // char key[4]="abc";
- char key[] = "310-680-200-555001";
- std::cout << "KEY: "<< key << std::endl;
- Key k = key;
- Data d;
- // uint8_t num = 101;
- d.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap.insert({k,d});
+ cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length="<< len << "\n";
+ cout << "[INFO] Payload is " << arg << endl;
- sdl->set(nsc, dmap);
+ PolicyHandler handler;
+ Reader reader;
+ StringStream ss(arg.c_str());
+ reader.Parse(ss,handler);
- data_string = "{ \"CellID\": \"310-680-200-555002\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }";
+ //Set the threshold value
+ if (handler.found_threshold) {
+ cout << "[INFO] Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
+ rsrp_threshold = handler.threshold;
+ }
- Data d2;
- DataMap dmap2;
- char key2[] = "310-680-200-555002";
- std::cout << "KEY: "<< key2 << std::endl;
- Key k2 = key2;
- d2.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap2.insert({k2,d});
+ mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
+ mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+}
- sdl->set(nsc, dmap2);
+// callback to handle handover reply (json http response)
+size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
+ const size_t totalBytes( size * num );
+ out->append( in, totalBytes );
+ return totalBytes;
+}
+// sends a handover message through REST
+void send_handoff_request( string msg ) {
+ CURL *curl = curl_easy_init();
+ curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
+ curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
+ curl_easy_setopt( curl, CURLOPT_POST, 1L );
+ // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
+
+ // response information
+ long httpCode( 0 );
+ unique_ptr<string> httpData( new string() );
+
+ curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
+ curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
+ curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
+
+ struct curl_slist *headers = NULL; // needs to free this after easy perform
+ headers = curl_slist_append( headers, "Accept: application/json" );
+ headers = curl_slist_append( headers, "Content-Type: application/json" );
+ curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
+
+ cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
+ cout << "[INFO] HandOff request is " << msg << endl;
+
+ // sending request
+ CURLcode res = curl_easy_perform( curl );
+ if( res != CURLE_OK ) {
+ cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
+
+ } else {
+
+ curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
+ if( httpCode == 200 ) {
+ // ============== DO SOMETHING USEFUL HERE ===============
+ // Currently, we only print out the HandOff reply
+ rapidjson::Document document;
+ document.Parse( httpData.get()->c_str() );
+ rapidjson::StringBuffer s;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+ document.Accept( writer );
+ cout << "[INFO] HandOff reply is " << s.GetString() << endl;
+
+
+ } else if ( httpCode == 404 ) {
+ cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
+ } else {
+ cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
+ "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
+ }
+ }
- std::string data_string = "{ \"CellID\": \"310-680-200-555003\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }";
+ curl_slist_free_all( headers );
+ curl_easy_cleanup( curl );
+}
- Data d3;
- DataMap dmap3;
- char key3[] = "310-680-200-555003";
- std::cout << "KEY: "<< key3 << std::endl;
- Key k3 = key3;
- d3.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap3.insert({k3,d3});
+void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
- sdl->set(nsc, dmap3);
+ time_t now;
+ string str_now;
+ static unsigned int seq_number = 0; // static counter, not thread-safe
+ int response_to = 0; // max timeout wating for a response
+ int send_mtype = 0;
+ int rmtype; // received message type
+ int delay = 1000000; // mu-sec delay; default 1s
- data_string = "{ \"UEID\": 12345, \"ServingCellID\": \"310-680-200-555002\", \"MeasTimestampUEPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPDCPBytes\": 20,\"UEPDCPBytesDL\": 250000,\"UEPDCPBytesUL\": 100000, \"MeasTimestampUEPRBUsage\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPRBUsage\": 20, \"UEPRBUsageDL\": 10, \"UEPRBUsageUL\": 30, \"MeasTimestampRF\": \"2020-03-18 02:23:18.210\",\"MeasPeriodRF\": 40, \"ServingCellRF\": [-115,-16,-5], \"NeighborCellRF\": [ {\"CID\": \"310-680-200-555001\",\"Cell-RF\": [-90,-13,-2.5 ] }, {\"CID\": \"310-680-200-555003\", \"Cell-RF\": [-140,-17,-6 ] } ] }";
-
- Data d4;
- DataMap dmap4;
- char key4[] = "12345";
- std::cout << "KEY: "<< key << std::endl;
- d4.assign(data_string.begin(), data_string.end());
- Key k4 = key4;
- // d.push_back(num);
- dmap4.insert({k4,d4});
+ string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
- sdl->set(nsu, dmap4);
+ cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
+ cout << "[INFO] Payload is " << json << endl;
-
+ PredictionHandler handler;
+ try {
+ Reader reader;
+ StringStream ss(json.c_str());
+ reader.Parse(ss,handler);
+ } catch (...) {
+ cout << "[ERROR] Got an exception on stringstream read parse\n";
}
- catch(...){
- fprintf(stderr,"SDL Error in Set Data for Namespace");
- return false;
+
+ // We are only considering download throughput
+ unordered_map<string, int> throughput_map = handler.cell_pred_down;
+
+ // Decision about CONTROL message
+ // (1) Identify UE Id in Prediction message
+ // (2) Iterate through Prediction message.
+ // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
+ // We assume the first cell in the prediction message is the serving cell
+
+ int serving_cell_throughput = 0;
+ int highest_throughput = 0;
+ string highest_throughput_cell_id;
+
+ // Getting the current serving cell throughput prediction
+ auto cell = throughput_map.find( handler.serving_cell_id );
+ serving_cell_throughput = cell->second;
+
+ // Iterating to identify the highest throughput prediction
+ for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
+
+ string curr_cellid = map_iter->first;
+ int curr_throughput = map_iter->second;
+
+ if ( highest_throughput < curr_throughput ) {
+ highest_throughput = curr_throughput;
+ highest_throughput_cell_id = curr_cellid;
+ }
+
}
-
- fprintf(stderr, "after sdl set\n");
- */
+ if ( highest_throughput > serving_cell_throughput ) {
+ // building a handoff control message
+ now = time( nullptr );
+ str_now = ctime( &now );
+ str_now.pop_back(); // removing the \n character
+
+ seq_number++; // static counter, not thread-safe
+
+ rapidjson::StringBuffer s;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+ writer.StartObject();
+ writer.Key( "command" );
+ writer.String( "HandOff" );
+ writer.Key( "seqNo" );
+ writer.Int( seq_number );
+ writer.Key( "ue" );
+ writer.String( handler.ue_id.c_str() );
+ writer.Key( "fromCell" );
+ writer.String( handler.serving_cell_id.c_str() );
+ writer.Key( "toCell" );
+ writer.String( highest_throughput_cell_id.c_str() );
+ writer.Key( "timestamp" );
+ writer.String( str_now.c_str() );
+ writer.Key( "reason" );
+ writer.String( "HandOff Control Request from TS xApp" );
+ writer.Key( "ttl" );
+ writer.Int( 10 );
+ writer.EndObject();
+ // creates a message like
+ /* {
+ "command": "HandOff",
+ "seqNo": 1,
+ "ue": "ueid-here",
+ "fromCell": "CID1",
+ "toCell": "CID3",
+ "timestamp": "Sat May 22 10:35:33 2021",
+ "reason": "HandOff Control Request from TS xApp",
+ "ttl": 10
+ } */
+
+ // sending a control request message
+ send_handoff_request( s.GetString() );
+
+ } else {
+ cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
+ }
- fprintf(stderr, "before sdl get\n");
+ // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
+ // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+}
+void send_prediction_request( vector<string> ues_to_predict ) {
- std::string prefix2="310";
- Keys K = sdl->findKeys(nsc, prefix2); // just the prefix
- DataMap Dk = sdl->get(nsc, K);
+ std::unique_ptr<Message> msg;
+ Msg_component payload; // special type of unique pointer to the payload
- std::cout << "K contains " << K.size() << " elements.\n";
+ int sz;
+ int i;
+ size_t plen;
+ Msg_component send_payload;
- fprintf(stderr, "before forloop\n");
+ msg = xfw->Alloc_msg( 2048 );
- for(auto si=K.begin();si!=K.end();++si){
- std::vector<uint8_t> val_v = Dk[(*si)]; // 4 lines to unpack a string
- char val[val_v.size()+1]; // from Data
- int i;
- for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
- val[i]='\0';
- fprintf(stderr, "KEYS and Values %s = %s\n",(*si).c_str(), val);
+ sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
+ if( sz < 2048 ) {
+ fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
+ exit( 1 );
}
+ string ues_list = "[";
- std::string prefix3="12";
- Keys K2 = sdl->findKeys(nsu, prefix3); // just the prefix
- DataMap Dk2 = sdl->get(nsu, K2);
+ for (int i = 0; i < ues_to_predict.size(); i++) {
+ if (i == ues_to_predict.size() - 1) {
+ ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
+ } else {
+ ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
+ }
+ }
- std::cout << "K contains " << K2.size() << " elements.\n";
+ string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
- fprintf(stderr, "before forloop\n");
+ send_payload = msg->Get_payload(); // direct access to payload
+ snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
- for(auto si=K2.begin();si!=K2.end();++si){
- std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
- char val[val_v.size()+1]; // from Data
- int i;
- for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
- val[i]='\0';
- fprintf(stderr, "KEYS and Values %s = %s\n",(*si).c_str(), val);
- }
-
+ plen = strlen( (char *)send_payload.get() );
- fprintf(stderr, "after sdl get\n");
+ cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
- xfw->Run( nthreads );
+ // payload updated in place, nothing to copy from, so payload parm is nil
+ if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
+ fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
+ }
- fprintf(stderr, "code3\n");
-
- msg = xfw->Alloc_msg( 2048 );
+}
+
+/* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
+ * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
+ * sends a prediction request to the QP Driver xApp.
+ */
+void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
+ string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
+
+ cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
+ cout << "[INFO] Payload is " << json << "\n";
- fprintf(stderr, "code4\n");
+ AnomalyHandler handler;
+ Reader reader;
+ StringStream ss(json.c_str());
+ reader.Parse(ss,handler);
+
+ // just sending ACK to the AD xApp
+ mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
+
+ // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
+ // if( handler.degradation < rsrp_threshold )
+ send_prediction_request(handler.prediction_ues);
+}
+
+extern int main( int argc, char** argv ) {
+
+ int nthreads = 1;
+ char* port = (char *) "4560";
+
+ // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
+ if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
+ cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
+ return 1;
+ }
+
+ fprintf( stderr, "[TS xApp] listening on port %s\n", port );
+ xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
+
+ xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
+ xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
+ xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
+
+ xfw->Run( nthreads );
-
}