};
-struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
+struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
unordered_map<string, string> cell_pred;
std::string ue_id;
bool ue_id_found = false;
string curr_key = "";
string curr_value = "";
- bool Null() { cout << "Null()" << endl; return true; }
- bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
- bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
- bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; }
- bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
- bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
- bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
+ int policy_type_id;
+ int policy_instance_id;
+ int threshold;
+ std::string operation;
+ bool found_threshold = false;
+
+
+ bool Null() { return true; }
+ bool Bool(bool b) { return true; }
+ bool Int(int i) {
+
+ if (curr_key.compare("policy_type_id") == 0) {
+ policy_type_id = i;
+ } else if (curr_key.compare("policy_instance_id") == 0) {
+ policy_instance_id = i;
+ } else if (curr_key.compare("threshold") == 0) {
+ found_threshold = true;
+ threshold = i;
+ }
+
+ return true;
+ }
+ bool Uint(unsigned u) {
+
+ if (curr_key.compare("policy_type_id") == 0) {
+ policy_type_id = u;
+ } else if (curr_key.compare("policy_instance_id") == 0) {
+ policy_instance_id = u;
+ } else if (curr_key.compare("threshold") == 0) {
+ found_threshold = true;
+ threshold = u;
+ }
+
+ return true;
+ }
+ bool Int64(int64_t i) { return true; }
+ bool Uint64(uint64_t u) { return true; }
+ bool Double(double d) { return true; }
bool String(const char* str, SizeType length, bool copy) {
- cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
- if (curr_key.compare("") != 0) {
- cout << "Found throughput\n";
- curr_value = str;
- cell_pred[curr_key] = curr_value;
- curr_key = "";
- curr_value = "";
+
+ if (curr_key.compare("operation") != 0) {
+ operation = str;
}
return true;
}
- bool StartObject() { cout << "StartObject()" << endl; return true; }
+ bool StartObject() {
+
+ return true;
+ }
+ bool Key(const char* str, SizeType length, bool copy) {
+
+ curr_key = str;
+
+ return true;
+ }
+ bool EndObject(SizeType memberCount) { return true; }
+ bool StartArray() { return true; }
+ bool EndArray(SizeType elementCount) { return true; }
+
+};
+
+struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
+ unordered_map<string, int> cell_pred_down;
+ unordered_map<string, int> cell_pred_up;
+ std::string ue_id;
+ bool ue_id_found = false;
+ string curr_key = "";
+ string curr_value = "";
+ bool down_val = true;
+ bool Null() { return true; }
+ bool Bool(bool b) { return true; }
+ bool Int(int i) { return true; }
+ bool Uint(unsigned u) {
+
+ if (down_val) {
+ cell_pred_down[curr_key] = u;
+ down_val = false;
+ } else {
+ cell_pred_up[curr_key] = u;
+ down_val = true;
+ }
+
+ return true;
+
+ }
+ bool Int64(int64_t i) { return true; }
+ bool Uint64(uint64_t u) { return true; }
+ bool Double(double d) { return true; }
+ bool String(const char* str, SizeType length, bool copy) {
+
+ return true;
+ }
+ bool StartObject() { return true; }
bool Key(const char* str, SizeType length, bool copy) {
- cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
if (!ue_id_found) {
- cout << "Found UE ID\n";
+
ue_id = str;
ue_id_found = true;
} else {
}
return true;
}
- bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
- bool StartArray() { cout << "StartArray()" << endl; return true; }
- bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
+ bool EndObject(SizeType memberCount) { return true; }
+ bool StartArray() { return true; }
+ bool EndArray(SizeType elementCount) { return true; }
};
bool in_serving_array = false;
int rf_meas_index = 0;
+ bool in_serving_report_object = false;
+
string curr_key = "";
string curr_value = "";
- bool Null() { cout << "Null()" << endl; return true; }
- bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
+ bool Null() { return true; }
+ bool Bool(bool b) { return true; }
bool Int(int i) {
- fprintf(stderr, "Int(%d)\n", i);
- if (in_serving_array) {
- fprintf(stderr, "we are in serving array\n");
- switch(rf_meas_index) {
- case 0:
+
+ return true;
+ }
+
+ bool Uint(unsigned i) {
+
+ if (in_serving_report_object) {
+ if (curr_key.compare("rsrp") == 0) {
serving_cell_rsrp = i;
- break;
- case 1:
+ } else if (curr_key.compare("rsrq") == 0) {
serving_cell_rsrq = i;
- break;
- case 2:
+ } else if (curr_key.compare("rssinr") == 0) {
serving_cell_sinr = i;
- break;
}
- rf_meas_index++;
- }
- return true;
- }
- bool Uint(unsigned u) {
- fprintf(stderr, "Int(%d)\n", u); return true; }
- bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
- bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
- bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
+ }
+
+ return true; }
+ bool Int64(int64_t i) {
+
+ return true; }
+ bool Uint64(uint64_t i) {
+
+ return true; }
+ bool Double(double d) { return true; }
bool String(const char* str, SizeType length, bool copy) {
- fprintf(stderr,"String(%s)\n", str);
+
if (curr_key.compare("ServingCellID") == 0) {
serving_cell_id = str;
}
return true;
}
- bool StartObject() { cout << "StartObject()" << endl; return true; }
+ bool StartObject() {
+ if (curr_key.compare("ServingCellRF") == 0) {
+ in_serving_report_object = true;
+ }
+
+ return true; }
bool Key(const char* str, SizeType length, bool copy) {
- fprintf(stderr,"Key(%s)\n", str);
+
curr_key = str;
return true;
}
- bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
+ bool EndObject(SizeType memberCount) {
+ if (curr_key.compare("ServingCellRF") == 0) {
+ in_serving_report_object = false;
+ }
+ return true; }
bool StartArray() {
- fprintf(stderr,"StartArray()");
+
if (curr_key.compare("ServingCellRF") == 0) {
in_serving_array = true;
}
return true;
}
bool EndArray(SizeType elementCount) {
- fprintf(stderr, "EndArray()\n");
+
if (curr_key.compare("servingCellRF") == 0) {
in_serving_array = false;
rf_meas_index = 0;
unordered_map<string, UEData> return_ue_data_map;
- std::string prefix3="12";
+ std::string prefix3="";
Keys K2 = sdl->findKeys(nsu, prefix3);
DataMap Dk2 = sdl->get(nsu, K2);
std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
char val[val_v.size()+1]; // from Data
int i;
- fprintf(stderr, "val size %d\n", val_v.size());
+
for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
val[i]='\0';
ue_id.assign((std::string)*si);
ue_data[ue_id] = ue_json;
}
- fprintf(stderr, "after sdl get of ue data\n");
-
- fprintf(stderr, "From UE data map\n");
-
for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
UEDataHandler handler;
Reader reader;
string serving_cell_id = handler.serving_cell_id;
int serv_rsrp = handler.serving_cell_rsrp;
- fprintf(stderr,"UE data for %s\n", ueID.c_str());
- fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
- fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
-
return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
- }
-
- fprintf(stderr, "\n");
+ }
+
return return_ue_data_map;
}
int rmtype; // received message type
- fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
+ fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
+
+ const char *arg = (const char*)payload.get();
+
fprintf(stderr, "payload is %s\n", payload.get());
-
- //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+
+ PolicyHandler handler;
+ Reader reader;
+ StringStream ss(arg);
+ reader.Parse(ss,handler);
//Set the threshold value
+ if (handler.found_threshold) {
+ fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
+ rsrp_threshold = handler.threshold;
+ }
+
+ mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
+ mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+
}
int i;
Msg_component send_payload;
- fprintf(stderr, "cb 1\n");
-
msg = xfw->Alloc_msg( 2048 );
sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
exit( 1 );
}
- fprintf(stderr, "cb 2");
-
string ues_list = "[";
for (int i = 0; i < ues_to_predict.size(); i++) {
if (i == ues_to_predict.size() - 1) {
- ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
+ ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
} else {
ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
}
send_payload = msg->Get_payload(); // direct access to payload
// snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
- // snprintf( (char *) send_payload.get(), 2048, body);
- snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
+ snprintf( (char *) send_payload.get(), 2048, body);
+ //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
- fprintf(stderr, "message body %s\n", send_payload.get());
-
- fprintf(stderr, "cb 3");
+ fprintf(stderr, "message body %s\n", send_payload.get());
fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
// payload updated in place, nothing to copy from, so payload parm is nil
fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
}
- fprintf(stderr, "cb 4");
-
/*
msg = xfw->Receive( response_to );
if( msg != NULL ) {
int rmtype; // received message type
int delay = 1000000; // mu-sec delay; default 1s
- fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
- fprintf(stderr, "payload is %s\n", payload.get());
-
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+ cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
+ cout << "payload is " << payload.get() << "\n";
mtype = 0;
- fprintf(stderr, "cb 1\n");
-
- char *incoming_msg = "{\"12345\": {\"222\": \"20000\", \"333\" : \"50000\"} }";
-
+ const char* arg = (const char*)payload.get();
PredictionHandler handler;
- Reader reader;
- StringStream ss(incoming_msg);
- reader.Parse(ss,handler);
- std::string pred_ue_id = handler.ue_id;
+ try {
+ Reader reader;
+ StringStream ss(arg);
+ reader.Parse(ss,handler);
+ } catch (...) {
+ cout << "got an exception on stringstream read parse\n";
+ }
+
+ std::string pred_ue_id = handler.ue_id;
+
cout << "Prediction for " << pred_ue_id << endl;
-
- unordered_map<string, string> throughput_map = handler.cell_pred;
-
+
+ unordered_map<string, int> throughput_map = handler.cell_pred_down;
cout << endl;
std::string highest_throughput_cell_id;
std::string::size_type str_size;
- cout << "Going through throughtput map:" << endl;
-
for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
- cout << map_iter->first << " : " << map_iter->second << endl;
+
std::string curr_cellid = map_iter->first;
- cout << "Cell ID is " << curr_cellid;
- int curr_throughput = stoi(map_iter->second, &str_size);
- cout << "Throughput is " << curr_throughput << endl;
+ int curr_throughput = map_iter->second;
if (curr_cellid.compare(serving_cell_id) == 0) {
serving_cell_throughput = curr_throughput;
//Iterating again to identify the highest throughput prediction
for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
- cout << map_iter->first << " : " << map_iter->second << endl;
+
std::string curr_cellid = map_iter->first;
- cout << "Cell ID is " << curr_cellid;
- int curr_throughput = stoi(map_iter->second, &str_size);
- cout << "Throughput is " << curr_throughput << endl;
+ int curr_throughput = map_iter->second;
if (curr_throughput > serving_cell_throughput) {
highest_throughput = curr_throughput;
cout << "Source cell " << serving_cell_id << endl;
cout << "Target cell " << highest_throughput_cell_id << endl;
}
+
+ mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
+ mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
}
void run_loop() {
- fprintf(stderr, "in run_loop()\n");
+ cout << "in Traffic Steering run_loop()\n";
unordered_map<string, UEData> uemap;
- vector<string> prediction_ues;
-
while (1) {
- fprintf(stderr, "in while loop\n");
-
uemap = get_sdl_ue_data();
+ vector<string> prediction_ues;
+
for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
string ueid = map_iter->first;
+ fprintf(stderr,"found a ueid %s\n", ueid.c_str());
UEData data = map_iter->second;
+
+ fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
+
if (data.serving_cell_rsrp < rsrp_threshold) {
+ fprintf(stderr,"it is less than the rsrp threshold\n");
prediction_ues.push_back(ueid);
+ } else {
+ fprintf(stderr,"it is not less than the rsrp threshold\n");
}
}
+ fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
+
if (prediction_ues.size() > 0) {
send_prediction_request(prediction_ues);
}
}
}
-
+/* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
+ * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
+ */
+void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
+ cout << "payload is " << payload.get() << "\n";
+ mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
+}
extern int main( int argc, char** argv ) {
nsu = Namespace(sdl_namespace_u);
nsc = Namespace(sdl_namespace_c);
-
+
fprintf( stderr, "<XAPP> listening on port: %s\n", port );
- xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
- fprintf(stderr, "code1\n");
-
+ xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
xfw->Add_msg_cb( 20010, policy_callback, NULL );
xfw->Add_msg_cb( 30002, prediction_callback, NULL );
-
- fprintf(stderr, "code2\n");
+ xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
std::thread loop_thread;