3 ==================================================================================
4 Copyright (c) 2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
22 Abstract: Traffic Steering xApp
24 2. Receives anomaly detection
25 3. Requests prediction for UE throughput on current and neighbor cells
26 4. Receives prediction
27 5. Optionally exercises Traffic Steering action over E2
32 Modified: 21 May 2021 (Alexandre Huff)
33 Update for traffic steering use case in release D.
48 #include <unordered_map>
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55 #include <rapidjson/prettywriter.h>
57 #include <curl/curl.h>
58 #include <rmr/RIC_message_types.h>
59 #include "ricxfcpp/xapp.hpp"
62 // Defines env name for the endpoint to POST handoff control messages
63 #define ENV_CONTROL_URL "TS_CONTROL_URL"
66 using namespace rapidjson;
70 using Namespace = std::string;
71 using Key = std::string;
72 using Data = std::vector<uint8_t>;
73 using DataMap = std::map<Key, Data>;
74 using Keys = std::set<Key>;
77 // ----------------------------------------------------------
79 // Stores the the URL to POST handoff control messages
80 const char *ts_control_url;
82 std::unique_ptr<Xapp> xfw;
84 int rsrp_threshold = 0;
88 int serving_cell_rsrp;
91 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
92 unordered_map<string, string> cell_pred;
94 bool ue_id_found = false;
96 string curr_value = "";
98 int policy_instance_id;
100 std::string operation;
101 bool found_threshold = false;
103 bool Null() { return true; }
104 bool Bool(bool b) { return true; }
107 if (curr_key.compare("policy_type_id") == 0) {
109 } else if (curr_key.compare("policy_instance_id") == 0) {
110 policy_instance_id = i;
111 } else if (curr_key.compare("threshold") == 0) {
112 found_threshold = true;
118 bool Uint(unsigned u) {
120 if (curr_key.compare("policy_type_id") == 0) {
122 } else if (curr_key.compare("policy_instance_id") == 0) {
123 policy_instance_id = u;
124 } else if (curr_key.compare("threshold") == 0) {
125 found_threshold = true;
131 bool Int64(int64_t i) { return true; }
132 bool Uint64(uint64_t u) { return true; }
133 bool Double(double d) { return true; }
134 bool String(const char* str, SizeType length, bool copy) {
136 if (curr_key.compare("operation") != 0) {
146 bool Key(const char* str, SizeType length, bool copy) {
152 bool EndObject(SizeType memberCount) { return true; }
153 bool StartArray() { return true; }
154 bool EndArray(SizeType elementCount) { return true; }
158 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
159 unordered_map<string, int> cell_pred_down;
160 unordered_map<string, int> cell_pred_up;
162 bool ue_id_found = false;
163 string curr_key = "";
164 string curr_value = "";
165 string serving_cell_id;
166 bool down_val = true;
167 bool Null() { return true; }
168 bool Bool(bool b) { return true; }
169 bool Int(int i) { return true; }
170 bool Uint(unsigned u) {
171 // Currently, we assume the first cell in the prediction message is the serving cell
172 if ( serving_cell_id.empty() ) {
173 serving_cell_id = curr_key;
177 cell_pred_down[curr_key] = u;
180 cell_pred_up[curr_key] = u;
187 bool Int64(int64_t i) { return true; }
188 bool Uint64(uint64_t u) { return true; }
189 bool Double(double d) { return true; }
190 bool String(const char* str, SizeType length, bool copy) {
194 bool StartObject() { return true; }
195 bool Key(const char* str, SizeType length, bool copy) {
205 bool EndObject(SizeType memberCount) { return true; }
206 bool StartArray() { return true; }
207 bool EndArray(SizeType elementCount) { return true; }
210 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
212 Assuming we receive the following payload from AD
213 [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
215 vector<string> prediction_ues;
216 string curr_key = "";
218 bool Key(const Ch* str, SizeType len, bool copy) {
223 bool String(const Ch* str, SizeType len, bool copy) {
224 // We are only interested in the "ue-id"
225 if ( curr_key.compare( "ue-id") == 0 ) {
226 prediction_ues.push_back( str );
233 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
234 unordered_map<string, string> cell_pred;
235 std::string serving_cell_id;
236 int serving_cell_rsrp;
237 int serving_cell_rsrq;
238 int serving_cell_sinr;
239 bool in_serving_array = false;
240 int rf_meas_index = 0;
242 bool in_serving_report_object = false;
244 string curr_key = "";
245 string curr_value = "";
246 bool Null() { return true; }
247 bool Bool(bool b) { return true; }
253 bool Uint(unsigned i) {
255 if (in_serving_report_object) {
256 if (curr_key.compare("rsrp") == 0) {
257 serving_cell_rsrp = i;
258 } else if (curr_key.compare("rsrq") == 0) {
259 serving_cell_rsrq = i;
260 } else if (curr_key.compare("rssinr") == 0) {
261 serving_cell_sinr = i;
266 bool Int64(int64_t i) {
269 bool Uint64(uint64_t i) {
272 bool Double(double d) { return true; }
273 bool String(const char* str, SizeType length, bool copy) {
275 if (curr_key.compare("ServingCellID") == 0) {
276 serving_cell_id = str;
282 if (curr_key.compare("ServingCellRF") == 0) {
283 in_serving_report_object = true;
287 bool Key(const char* str, SizeType length, bool copy) {
292 bool EndObject(SizeType memberCount) {
293 if (curr_key.compare("ServingCellRF") == 0) {
294 in_serving_report_object = false;
299 if (curr_key.compare("ServingCellRF") == 0) {
300 in_serving_array = true;
305 bool EndArray(SizeType elementCount) {
307 if (curr_key.compare("servingCellRF") == 0) {
308 in_serving_array = false;
316 /* unordered_map<string, UEData> get_sdl_ue_data() {
318 fprintf(stderr, "In get_sdl_ue_data()\n");
320 unordered_map<string, string> ue_data;
322 unordered_map<string, UEData> return_ue_data_map;
324 std::string prefix3="";
325 Keys K2 = sdl->findKeys(nsu, prefix3);
326 DataMap Dk2 = sdl->get(nsu, K2);
331 for(auto si=K2.begin();si!=K2.end();++si){
332 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
333 char val[val_v.size()+1]; // from Data
336 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
338 ue_id.assign((std::string)*si);
341 ue_data[ue_id] = ue_json;
344 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
345 UEDataHandler handler;
347 StringStream ss(map_iter->second.c_str());
348 reader.Parse(ss,handler);
350 string ueID = map_iter->first;
351 string serving_cell_id = handler.serving_cell_id;
352 int serv_rsrp = handler.serving_cell_rsrp;
354 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
358 return return_ue_data_map;
361 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
363 int response_to = 0; // max timeout wating for a response
364 int rmtype; // received message type
366 string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
368 cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length="<< len << "\n";
369 cout << "[INFO] Payload is " << arg << endl;
371 PolicyHandler handler;
373 StringStream ss(arg.c_str());
374 reader.Parse(ss,handler);
376 //Set the threshold value
377 if (handler.found_threshold) {
378 cout << "[INFO] Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
379 rsrp_threshold = handler.threshold;
382 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
383 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
386 // callback to handle handover reply (json http response)
387 size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
388 const size_t totalBytes( size * num );
389 out->append( in, totalBytes );
393 // sends a handover message through REST
394 void send_handoff_request( string msg ) {
395 CURL *curl = curl_easy_init();
396 curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
397 curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
398 curl_easy_setopt( curl, CURLOPT_POST, 1L );
399 // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
401 // response information
403 unique_ptr<string> httpData( new string() );
405 curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
406 curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
407 curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
409 struct curl_slist *headers = NULL; // needs to free this after easy perform
410 headers = curl_slist_append( headers, "Accept: application/json" );
411 headers = curl_slist_append( headers, "Content-Type: application/json" );
412 curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
414 cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
415 cout << "[INFO] HandOff request is " << msg << endl;
418 CURLcode res = curl_easy_perform( curl );
419 if( res != CURLE_OK ) {
420 cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
424 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
425 if( httpCode == 200 ) {
426 // ============== DO SOMETHING USEFUL HERE ===============
427 // Currently, we only print out the HandOff reply
428 rapidjson::Document document;
429 document.Parse( httpData.get()->c_str() );
430 rapidjson::StringBuffer s;
431 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
432 document.Accept( writer );
433 cout << "[INFO] HandOff reply is " << s.GetString() << endl;
436 } else if ( httpCode == 404 ) {
437 cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
439 cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
440 "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
445 curl_slist_free_all( headers );
446 curl_easy_cleanup( curl );
449 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
453 static unsigned int seq_number = 0; // static counter, not thread-safe
455 int response_to = 0; // max timeout wating for a response
458 int rmtype; // received message type
459 int delay = 1000000; // mu-sec delay; default 1s
461 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
463 cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
464 cout << "[INFO] Payload is " << json << endl;
466 PredictionHandler handler;
469 StringStream ss(json.c_str());
470 reader.Parse(ss,handler);
472 cout << "[ERROR] Got an exception on stringstream read parse\n";
475 // We are only considering download throughput
476 unordered_map<string, int> throughput_map = handler.cell_pred_down;
478 // Decision about CONTROL message
479 // (1) Identify UE Id in Prediction message
480 // (2) Iterate through Prediction message.
481 // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
482 // We assume the first cell in the prediction message is the serving cell
484 int serving_cell_throughput = 0;
485 int highest_throughput = 0;
486 string highest_throughput_cell_id;
488 // Getting the current serving cell throughput prediction
489 auto cell = throughput_map.find( handler.serving_cell_id );
490 serving_cell_throughput = cell->second;
492 // Iterating to identify the highest throughput prediction
493 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
495 string curr_cellid = map_iter->first;
496 int curr_throughput = map_iter->second;
498 if ( highest_throughput < curr_throughput ) {
499 highest_throughput = curr_throughput;
500 highest_throughput_cell_id = curr_cellid;
505 if ( highest_throughput > serving_cell_throughput ) {
506 // building a handoff control message
507 now = time( nullptr );
508 str_now = ctime( &now );
509 str_now.pop_back(); // removing the \n character
511 seq_number++; // static counter, not thread-safe
513 rapidjson::StringBuffer s;
514 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
515 writer.StartObject();
516 writer.Key( "command" );
517 writer.String( "HandOff" );
518 writer.Key( "seqNo" );
519 writer.Int( seq_number );
521 writer.String( handler.ue_id.c_str() );
522 writer.Key( "fromCell" );
523 writer.String( handler.serving_cell_id.c_str() );
524 writer.Key( "toCell" );
525 writer.String( highest_throughput_cell_id.c_str() );
526 writer.Key( "timestamp" );
527 writer.String( str_now.c_str() );
528 writer.Key( "reason" );
529 writer.String( "HandOff Control Request from TS xApp" );
533 // creates a message like
535 "command": "HandOff",
540 "timestamp": "Sat May 22 10:35:33 2021",
541 "reason": "HandOff Control Request from TS xApp",
545 // sending a control request message
546 send_handoff_request( s.GetString() );
549 cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
552 // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
553 // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
556 void send_prediction_request( vector<string> ues_to_predict ) {
558 std::unique_ptr<Message> msg;
559 Msg_component payload; // special type of unique pointer to the payload
564 Msg_component send_payload;
566 msg = xfw->Alloc_msg( 2048 );
568 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
570 fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
574 string ues_list = "[";
576 for (int i = 0; i < ues_to_predict.size(); i++) {
577 if (i == ues_to_predict.size() - 1) {
578 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
580 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
584 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
586 send_payload = msg->Get_payload(); // direct access to payload
587 snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
589 plen = strlen( (char *)send_payload.get() );
591 cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
593 // payload updated in place, nothing to copy from, so payload parm is nil
594 if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
595 fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
600 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
601 * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
602 * sends a prediction request to the QP Driver xApp.
604 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
605 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
607 cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
608 cout << "[INFO] Payload is " << json << "\n";
610 AnomalyHandler handler;
612 StringStream ss(json.c_str());
613 reader.Parse(ss,handler);
615 // just sending ACK to the AD xApp
616 mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
618 // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
619 // if( handler.degradation < rsrp_threshold )
620 send_prediction_request(handler.prediction_ues);
623 extern int main( int argc, char** argv ) {
626 char* port = (char *) "4560";
628 // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
629 if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
630 cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
634 fprintf( stderr, "[TS xApp] listening on port %s\n", port );
635 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
637 xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
638 xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
639 xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
641 xfw->Run( nthreads );