/*
Mnemonic: ts_xapp.cpp
- Abstract: Traffic Steering xApp;
- 1. Receives A1 Policy
- 2. Queries SDL to decide which UE to attempt Traffic Steering for
+ Abstract: Traffic Steering xApp
+ 1. Receives A1 Policy
+ 2. Receives anomaly detection
3. Requests prediction for UE throughput on current and neighbor cells
4. Receives prediction
5. Optionally exercises Traffic Steering action over E2
- Date: 22 April 2020
+ Date: 22 April 2020
Author: Ron Shacham
+ Modified: 21 May 2021 (Alexandre Huff)
+ Update for traffic steering use case in release D.
+ 07 Dec 2021 (Alexandre Huff)
+ Update for traffic steering use case in release E.
*/
#include <stdio.h>
#include <iostream>
#include <memory>
-#include <sdl/syncstorage.hpp>
#include <set>
#include <map>
#include <vector>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/schema.h>
#include <rapidjson/reader.h>
+#include <rapidjson/prettywriter.h>
+#include <rmr/RIC_message_types.h>
+#include <ricxfcpp/xapp.hpp>
+#include <ricxfcpp/config.hpp>
+
+/*
+ FIXME unfortunately this RMR flag has to be disabled
+ due to name resolution conflicts.
+ RC xApp defines the same name for gRPC control messages.
+*/
+#undef RIC_CONTROL_ACK
+
+#include <grpc/grpc.h>
+#include <grpcpp/channel.h>
+#include <grpcpp/client_context.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/security/credentials.h>
+#include "protobuf/api.grpc.pb.h"
+
+#include "utils/restclient.hpp"
-#include "ricxfcpp/xapp.hpp"
using namespace rapidjson;
using namespace std;
+using namespace xapp;
+
using Namespace = std::string;
using Key = std::string;
using Data = std::vector<uint8_t>;
// ----------------------------------------------------------
-
std::unique_ptr<Xapp> xfw;
+std::unique_ptr<api::MsgComm::Stub> rc_stub;
-std::string sdl_namespace_u = "TS-UE-metrics";
-std::string sdl_namespace_c = "TS-cell-metrics";
+int downlink_threshold = 0; // A1 policy type 20008 (in percentage)
-int rsrp_threshold = 0;
+// scoped enum to identify which API is used to send control messages
+enum class TsControlApi { REST, gRPC };
+TsControlApi ts_control_api; // api to send control messages
+string ts_control_ep; // api target endpoint
-std::unique_ptr<shareddatalayer::SyncStorage> sdl;
+typedef struct nodeb {
+ string ran_name;
+ struct {
+ string plmn_id;
+ string nb_id;
+ } global_nb_id;
+} nodeb_t;
-Namespace nsu;
-Namespace nsc;
+unordered_map<string, shared_ptr<nodeb_t>> cell_map; // maps each cell to its nodeb
-struct UEData {
+/* struct UEData {
string serving_cell;
int serving_cell_rsrp;
-
-};
+}; */
struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
+ /*
+ Assuming we receive the following payload from A1 Mediator
+ {"operation": "CREATE", "policy_type_id": 20008, "policy_instance_id": "tsapolicy145", "payload": {"threshold": 5}}
+ */
unordered_map<string, string> cell_pred;
std::string ue_id;
bool ue_id_found = false;
std::string operation;
bool found_threshold = false;
-
bool Null() { return true; }
bool Bool(bool b) { return true; }
bool Int(int i) {
}
return true;
- }
+ }
bool Int64(int64_t i) { return true; }
bool Uint64(uint64_t u) { return true; }
bool Double(double d) { return true; }
bool String(const char* str, SizeType length, bool copy) {
-
+
if (curr_key.compare("operation") != 0) {
operation = str;
}
bool ue_id_found = false;
string curr_key = "";
string curr_value = "";
+ string serving_cell_id;
bool down_val = true;
bool Null() { return true; }
bool Bool(bool b) { return true; }
bool Int(int i) { return true; }
- bool Uint(unsigned u) {
+ bool Uint(unsigned u) {
+ // Currently, we assume the first cell in the prediction message is the serving cell
+ if ( serving_cell_id.empty() ) {
+ serving_cell_id = curr_key;
+ }
if (down_val) {
cell_pred_down[curr_key] = u;
down_val = true;
}
- return true;
+ return true;
}
bool Int64(int64_t i) { return true; }
bool EndArray(SizeType elementCount) { return true; }
};
+struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
+ /*
+ Assuming we receive the following payload from AD
+ [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
+ */
+ vector<string> prediction_ues;
+ string curr_key = "";
+
+ bool Key(const Ch* str, SizeType len, bool copy) {
+ curr_key = str;
+ return true;
+ }
+
+ bool String(const Ch* str, SizeType len, bool copy) {
+ // We are only interested in the "ue-id"
+ if ( curr_key.compare( "ue-id") == 0 ) {
+ prediction_ues.push_back( str );
+ }
+ return true;
+ }
+};
+
+struct NodebListHandler : public BaseReaderHandler<UTF8<>, NodebListHandler> {
+ vector<string> nodeb_list;
+ string curr_key = "";
+
+ bool Key(const Ch* str, SizeType length, bool copy) {
+ curr_key = str;
+ return true;
+ }
+
+ bool String(const Ch* str, SizeType length, bool copy) {
+ if( curr_key.compare( "inventoryName" ) == 0 ) {
+ nodeb_list.push_back( str );
+ }
+ return true;
+ }
+};
+
+struct NodebHandler : public BaseReaderHandler<UTF8<>, NodebHandler> {
+ string curr_key = "";
+ shared_ptr<nodeb_t> nodeb = make_shared<nodeb_t>();
+
+ bool Key(const Ch* str, SizeType length, bool copy) {
+ curr_key = str;
+ return true;
+ }
+
+ bool String(const Ch* str, SizeType length, bool copy) {
+ if( curr_key.compare( "ranName" ) == 0 ) {
+ nodeb->ran_name = str;
+ } else if( curr_key.compare( "plmnId" ) == 0 ) {
+ nodeb->global_nb_id.plmn_id = str;
+ } else if( curr_key.compare( "nbId" ) == 0 ) {
+ nodeb->global_nb_id.nb_id = str;
+ } else if( curr_key.compare( "cellId" ) == 0 ) {
+ cell_map[str] = nodeb;
+ }
+ return true;
+ }
+
+};
+
-struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
+/* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
unordered_map<string, string> cell_pred;
std::string serving_cell_id;
int serving_cell_rsrp;
bool in_serving_array = false;
int rf_meas_index = 0;
- bool in_serving_report_object = false;
+ bool in_serving_report_object = false;
string curr_key = "";
string curr_value = "";
return true;
}
-
+
bool Uint(unsigned i) {
if (in_serving_report_object) {
} else if (curr_key.compare("rssinr") == 0) {
serving_cell_sinr = i;
}
- }
-
+ }
+
return true; }
bool Int64(int64_t i) {
return true; }
bool Double(double d) { return true; }
bool String(const char* str, SizeType length, bool copy) {
-
+
if (curr_key.compare("ServingCellID") == 0) {
serving_cell_id = str;
- }
+ }
return true;
}
return true; }
bool Key(const char* str, SizeType length, bool copy) {
-
+
curr_key = str;
return true;
}
if (curr_key.compare("ServingCellRF") == 0) {
in_serving_array = true;
}
-
+
return true;
}
bool EndArray(SizeType elementCount) {
}
return true; }
-};
+}; */
-unordered_map<string, UEData> get_sdl_ue_data() {
+/* unordered_map<string, UEData> get_sdl_ue_data() {
fprintf(stderr, "In get_sdl_ue_data()\n");
unordered_map<string, string> ue_data;
unordered_map<string, UEData> return_ue_data_map;
-
+
std::string prefix3="";
Keys K2 = sdl->findKeys(nsu, prefix3);
DataMap Dk2 = sdl->get(nsu, K2);
-
+
string ue_json;
string ue_id;
-
+
for(auto si=K2.begin();si!=K2.end();++si){
std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
char val[val_v.size()+1]; // from Data
for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
val[i]='\0';
ue_id.assign((std::string)*si);
-
+
ue_json.assign(val);
ue_data[ue_id] = ue_json;
}
-
+
for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
UEDataHandler handler;
Reader reader;
string ueID = map_iter->first;
string serving_cell_id = handler.serving_cell_id;
int serv_rsrp = handler.serving_cell_rsrp;
-
+
return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
-
- }
+
+ }
return return_ue_data_map;
-}
+} */
void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
+ string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
- int response_to = 0; // max timeout wating for a response
- int rmtype; // received message type
-
-
- fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
-
- const char *arg = (const char*)payload.get();
-
- fprintf(stderr, "payload is %s\n", payload.get());
+ cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length=" << len << "\n";
+ cout << "[INFO] Payload is " << arg << endl;
PolicyHandler handler;
Reader reader;
- StringStream ss(arg);
+ StringStream ss(arg.c_str());
reader.Parse(ss,handler);
//Set the threshold value
-
if (handler.found_threshold) {
- fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
- rsrp_threshold = handler.threshold;
+ cout << "[INFO] Setting Threshold for A1-P value: " << handler.threshold << "%\n";
+ downlink_threshold = handler.threshold;
}
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-
-
}
-void send_prediction_request(vector<string> ues_to_predict) {
-
- std::unique_ptr<Message> msg;
- Msg_component payload; // special type of unique pointer to the payload
-
- int nthreads = 1;
- int response_to = 0; // max timeout wating for a response
- int mtype = 30000;
- int sz;
- int i;
- Msg_component send_payload;
-
- msg = xfw->Alloc_msg( 2048 );
-
- sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
- if( sz < 2048 ) {
- fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
- exit( 1 );
+// sends a handover message through REST
+void send_rest_control_request( string ue_id, string serving_cell_id, string target_cell_id ) {
+ time_t now;
+ string str_now;
+ static unsigned int seq_number = 0; // static counter, not thread-safe
+
+ // building a handoff control message
+ now = time( nullptr );
+ str_now = ctime( &now );
+ str_now.pop_back(); // removing the \n character
+
+ seq_number++; // static counter, not thread-safe
+
+ rapidjson::StringBuffer s;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+ writer.StartObject();
+ writer.Key( "command" );
+ writer.String( "HandOff" );
+ writer.Key( "seqNo" );
+ writer.Int( seq_number );
+ writer.Key( "ue" );
+ writer.String( ue_id.c_str() );
+ writer.Key( "fromCell" );
+ writer.String( serving_cell_id.c_str() );
+ writer.Key( "toCell" );
+ writer.String( target_cell_id.c_str() );
+ writer.Key( "timestamp" );
+ writer.String( str_now.c_str() );
+ writer.Key( "reason" );
+ writer.String( "HandOff Control Request from TS xApp" );
+ writer.Key( "ttl" );
+ writer.Int( 10 );
+ writer.EndObject();
+ // creates a message like
+ /* {
+ "command": "HandOff",
+ "seqNo": 1,
+ "ue": "ueid-here",
+ "fromCell": "CID1",
+ "toCell": "CID3",
+ "timestamp": "Sat May 22 10:35:33 2021",
+ "reason": "HandOff Control Request from TS xApp",
+ "ttl": 10
+ } */
+
+ string msg = s.GetString();
+
+ cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
+ cout << "[INFO] HandOff request is " << msg << endl;
+
+ // sending request
+ restclient::RestClient client( ts_control_ep );
+ restclient::response_t resp = client.do_post( "", msg ); // we already have the full path in ts_control_ep
+
+ if( resp.status_code == 200 ) {
+ // ============== DO SOMETHING USEFUL HERE ===============
+ // Currently, we only print out the HandOff reply
+ rapidjson::Document document;
+ document.Parse( resp.body.c_str() );
+ rapidjson::StringBuffer s;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+ document.Accept( writer );
+ cout << "[INFO] HandOff reply is " << s.GetString() << endl;
+
+ } else {
+ cout << "[ERROR] Unexpected HTTP code " << resp.status_code << " from " << \
+ client.getBaseUrl() << \
+ "\n[ERROR] HTTP payload is " << resp.body.c_str() << endl;
}
- string ues_list = "[";
+}
- for (int i = 0; i < ues_to_predict.size(); i++) {
- if (i == ues_to_predict.size() - 1) {
- ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
- } else {
- ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
- }
+// sends a handover message to RC xApp through gRPC
+void send_grpc_control_request( string ue_id, string target_cell_id ) {
+ grpc::ClientContext context;
+
+ api::RicControlGrpcRsp response;
+ shared_ptr<api::RicControlGrpcReq> request = make_shared<api::RicControlGrpcReq>();
+
+ api::RICE2APHeader *apHeader = request->mutable_rice2apheaderdata();
+ apHeader->set_ranfuncid( 300 );
+ apHeader->set_ricrequestorid( 1001 );
+
+ api::RICControlHeader *ctrlHeader = request->mutable_riccontrolheaderdata();
+ ctrlHeader->set_controlstyle( 3 );
+ ctrlHeader->set_controlactionid( 1 );
+ ctrlHeader->set_ueid( ue_id );
+
+ api::RICControlMessage *ctrlMsg = request->mutable_riccontrolmessagedata();
+ ctrlMsg->set_riccontrolcelltypeval( api::RIC_CONTROL_CELL_UNKWON );
+ ctrlMsg->set_targetcellid( target_cell_id );
+
+ auto data = cell_map.find( target_cell_id );
+ if( data != cell_map.end() ) {
+ request->set_e2nodeid( data->second->global_nb_id.nb_id );
+ request->set_plmnid( data->second->global_nb_id.plmn_id );
+ request->set_ranname( data->second->ran_name );
+ } else {
+ request->set_e2nodeid( "unknown_e2nodeid" );
+ request->set_plmnid( "unknown_plmnid" );
+ request->set_ranname( "unknown_ranname" );
}
+ request->set_riccontrolackreqval( api::RIC_CONTROL_ACK_UNKWON ); // not yet used in api.proto
- string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
-
- const char *body = message_body.c_str();
+ grpc::Status status = rc_stub->SendRICControlReqServiceGrpc( &context, *request, &response );
- // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
-
- send_payload = msg->Get_payload(); // direct access to payload
- // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
- snprintf( (char *) send_payload.get(), 2048, body);
- //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
+ if( status.ok() ) {
+ if( response.rspcode() == 0 ) {
+ cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
+ } else {
+ cout << "[ERROR] Control Request failed with code=" << response.rspcode()
+ << ", description=" << response.description() << endl;
+ }
- fprintf(stderr, "message body %s\n", send_payload.get());
- fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
-
- // payload updated in place, nothing to copy from, so payload parm is nil
- if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
- fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
+ } else {
+ cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
+ << status.error_code() << ", error_msg=" << status.error_message() << endl;
}
- /*
- msg = xfw->Receive( response_to );
- if( msg != NULL ) {
- rmtype = msg->Get_mtype();
- send_payload = msg->Get_payload();
- fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
- }
- */
-
}
void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
+ string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
- long now;
- long total_count;
+ cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
+ cout << "[INFO] Payload is " << json << endl;
- int sz;
- int i;
-
- int response_to = 0; // max timeout wating for a response
-
- int send_mtype = 0;
- int rmtype; // received message type
- int delay = 1000000; // mu-sec delay; default 1s
-
- cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
- cout << "payload is " << payload.get() << "\n";
-
- mtype = 0;
-
- const char* arg = (const char*)payload.get();
PredictionHandler handler;
-
try {
-
Reader reader;
- StringStream ss(arg);
+ StringStream ss(json.c_str());
reader.Parse(ss,handler);
} catch (...) {
- cout << "got an exception on stringstream read parse\n";
+ cout << "[ERROR] Got an exception on stringstream read parse\n";
}
-
- std::string pred_ue_id = handler.ue_id;
-
- cout << "Prediction for " << pred_ue_id << endl;
-
- unordered_map<string, int> throughput_map = handler.cell_pred_down;
- cout << endl;
-
- unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
+ // We are only considering download throughput
+ unordered_map<string, int> throughput_map = handler.cell_pred_down;
- //Decision about CONTROL message
- //(1) Identify UE Id in Prediction message
- //(2) Get UEData struct for this UE Id
- //(3) Identify the UE's service cell ID
- //(4) Iterate through Prediction message.
- // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
+ // Decision about CONTROL message
+ // (1) Identify UE Id in Prediction message
+ // (2) Iterate through Prediction message.
+ // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
+ // We assume the first cell in the prediction message is the serving cell
- UEData pred_ue_data = sdl_data[pred_ue_id];
- std::string serving_cell_id = pred_ue_data.serving_cell;
+ int serving_cell_throughput = 0;
+ int highest_throughput = 0;
+ string highest_throughput_cell_id;
- int serving_cell_throughput;
- int highest_throughput;
- std::string highest_throughput_cell_id;
- std::string::size_type str_size;
+ // Getting the current serving cell throughput prediction
+ auto cell = throughput_map.find( handler.serving_cell_id );
+ serving_cell_throughput = cell->second;
+ // Iterating to identify the highest throughput prediction
for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
- std::string curr_cellid = map_iter->first;
+ string curr_cellid = map_iter->first;
int curr_throughput = map_iter->second;
- if (curr_cellid.compare(serving_cell_id) == 0) {
- serving_cell_throughput = curr_throughput;
- highest_throughput = serving_cell_throughput;
+ if ( highest_throughput < curr_throughput ) {
+ highest_throughput = curr_throughput;
+ highest_throughput_cell_id = curr_cellid;
}
}
- //Iterating again to identify the highest throughput prediction
+ float thresh = 0;
+ if( downlink_threshold > 0 ) { // we also take into account the threshold in A1 policy type 20008
+ thresh = serving_cell_throughput * (downlink_threshold / 100.0);
+ }
- for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
+ if ( highest_throughput > ( serving_cell_throughput + thresh ) ) {
- std::string curr_cellid = map_iter->first;
- int curr_throughput = map_iter->second;
+ // sending a control request message
+ if ( ts_control_api == TsControlApi::REST ) {
+ send_rest_control_request( handler.ue_id, handler.serving_cell_id, highest_throughput_cell_id );
+ } else {
+ send_grpc_control_request( handler.ue_id, highest_throughput_cell_id );
+ }
- if (curr_throughput > serving_cell_throughput) {
- highest_throughput = curr_throughput;
- highest_throughput_cell_id = curr_cellid;
+ } else {
+ cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
+ }
+
+}
+
+void send_prediction_request( vector<string> ues_to_predict ) {
+ std::unique_ptr<Message> msg;
+ Msg_component payload; // special type of unique pointer to the payload
+
+ int sz;
+ int i;
+ size_t plen;
+ Msg_component send_payload;
+
+ msg = xfw->Alloc_msg( 2048 );
+
+ sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
+ if( sz < 2048 ) {
+ fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
+ exit( 1 );
+ }
+
+ string ues_list = "[";
+
+ for (int i = 0; i < ues_to_predict.size(); i++) {
+ if (i == ues_to_predict.size() - 1) {
+ ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
+ } else {
+ ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
}
}
- if (highest_throughput > serving_cell_throughput) {
- cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
- cout << "UE ID: " << pred_ue_id << endl;
- cout << "Source cell " << serving_cell_id << endl;
- cout << "Target cell " << highest_throughput_cell_id << endl;
+ string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
+
+ send_payload = msg->Get_payload(); // direct access to payload
+ snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
+
+ plen = strlen( (char *)send_payload.get() );
+
+ cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
+
+ // payload updated in place, nothing to copy from, so payload parm is nil
+ if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
+ fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
}
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-
-
}
+/* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
+ * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
+ * sends a prediction request to the QP Driver xApp.
+ */
+void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
+ string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
-//This function runs a loop that continuously checks SDL for any UE
+ cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
+ cout << "[INFO] Payload is " << json << "\n";
-void run_loop() {
+ AnomalyHandler handler;
+ Reader reader;
+ StringStream ss(json.c_str());
+ reader.Parse(ss,handler);
- cout << "in Traffic Steering run_loop()\n";
+ // just sending ACK to the AD xApp
+ mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
- unordered_map<string, UEData> uemap;
+ send_prediction_request(handler.prediction_ues);
+}
- while (1) {
+vector<string> get_nodeb_list( restclient::RestClient& client ) {
- uemap = get_sdl_ue_data();
+ restclient::response_t response = client.do_get( "/v1/nodeb/states" );
- vector<string> prediction_ues;
+ NodebListHandler handler;
+ if( response.status_code == 200 ) {
+ Reader reader;
+ StringStream ss( response.body.c_str() );
+ reader.Parse( ss, handler );
+
+ cout << "[INFO] nodeb list is " << response.body.c_str() << endl;
- for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
- string ueid = map_iter->first;
- fprintf(stderr,"found a ueid %s\n", ueid.c_str());
- UEData data = map_iter->second;
+ } else {
+ if( response.body.empty() ) {
+ cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() << endl;
+ } else {
+ cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() <<
+ ". HTTP payload is " << response.body.c_str() << endl;
+ }
+ }
- fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
+ return handler.nodeb_list;
+}
- if (data.serving_cell_rsrp < rsrp_threshold) {
- fprintf(stderr,"it is less than the rsrp threshold\n");
- prediction_ues.push_back(ueid);
+bool build_cell_mapping() {
+ string base_url;
+ char *data = getenv( "SERVICE_E2MGR_HTTP_BASE_URL" );
+ if ( data == NULL ) {
+ base_url = "http://service-ricplt-e2mgr-http.ricplt:3800";
+ } else {
+ base_url = string( data );
+ }
+
+ restclient::RestClient client( base_url );
+
+ vector<string> nb_list = get_nodeb_list( client );
+
+ for( string nb : nb_list ) {
+ string full_path = string("/v1/nodeb/") + nb;
+ restclient::response_t response = client.do_get( full_path );
+ if( response.status_code != 200 ) {
+ if( response.body.empty() ) {
+ cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
+ client.getBaseUrl() + full_path << endl;
} else {
- fprintf(stderr,"it is not less than the rsrp threshold\n");
+ cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
+ client.getBaseUrl() + full_path << ". HTTP payload is " << response.body.c_str() << endl;
}
+ return false;
}
- fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
-
- if (prediction_ues.size() > 0) {
- send_prediction_request(prediction_ues);
+ try {
+ NodebHandler handler;
+ Reader reader;
+ StringStream ss( response.body.c_str() );
+ reader.Parse( ss, handler );
+ } catch (...) {
+ cout << "[ERROR] Got an exception on parsing nodeb (stringstream read parse)\n";
+ return false;
}
-
- sleep(20);
}
-}
-/* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
- * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
- */
-void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
- cout << "payload is " << payload.get() << "\n";
- mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
+ return true;
}
extern int main( int argc, char** argv ) {
-
int nthreads = 1;
-
char* port = (char *) "4560";
+ shared_ptr<grpc::Channel> channel;
+
+ Config *config = new Config();
+ string api = config->Get_control_str("ts_control_api");
+ ts_control_ep = config->Get_control_str("ts_control_ep");
+ if ( api.empty() ) {
+ cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
+ exit(1);
+ }
+ if ( api.compare("rest") == 0 ) {
+ ts_control_api = TsControlApi::REST;
+ } else {
+ ts_control_api = TsControlApi::gRPC;
- sdl = shareddatalayer::SyncStorage::create();
+ if( !build_cell_mapping() ) {
+ cout << "[ERROR] unable to map cells to nodeb\n";
+ }
- nsu = Namespace(sdl_namespace_u);
- nsc = Namespace(sdl_namespace_c);
+ channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
+ rc_stub = api::MsgComm::NewStub(channel, grpc::StubOptions());
+ }
-
- fprintf( stderr, "<XAPP> listening on port: %s\n", port );
- xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
-
- xfw->Add_msg_cb( 20010, policy_callback, NULL );
- xfw->Add_msg_cb( 30002, prediction_callback, NULL );
- xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
-
- std::thread loop_thread;
+ fprintf( stderr, "[TS xApp] listening on port %s\n", port );
+ xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
- loop_thread = std::thread(&run_loop);
+ xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
+ xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
+ xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
xfw->Run( nthreads );
-
+
}