3 ==================================================================================
4 Copyright (c) 2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
22 Abstract: Traffic Steering xApp
24 2. Receives anomaly detection
25 3. Requests prediction for UE throughput on current and neighbor cells
26 4. Receives prediction
27 5. Optionally exercises Traffic Steering action over E2
32 Modified: 21 May 2021 (Alexandre Huff)
33 Update for traffic steering use case in release D.
48 #include <unordered_map>
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55 #include <rapidjson/prettywriter.h>
57 #include <curl/curl.h>
58 #include <rmr/RIC_message_types.h>
59 #include "ricxfcpp/xapp.hpp"
62 // Defines env name for the endpoint to POST handoff control messages
63 #define ENV_CONTROL_URL "TS_CONTROL_URL"
66 using namespace rapidjson;
70 using Namespace = std::string;
71 using Key = std::string;
72 using Data = std::vector<uint8_t>;
73 using DataMap = std::map<Key, Data>;
74 using Keys = std::set<Key>;
77 // ----------------------------------------------------------
79 // Stores the the URL to POST handoff control messages
80 const char *ts_control_url;
82 std::unique_ptr<Xapp> xfw;
84 int rsrp_threshold = 0;
88 int serving_cell_rsrp;
91 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
92 unordered_map<string, string> cell_pred;
94 bool ue_id_found = false;
96 string curr_value = "";
98 int policy_instance_id;
100 std::string operation;
101 bool found_threshold = false;
103 bool Null() { return true; }
104 bool Bool(bool b) { return true; }
107 if (curr_key.compare("policy_type_id") == 0) {
109 } else if (curr_key.compare("policy_instance_id") == 0) {
110 policy_instance_id = i;
111 } else if (curr_key.compare("threshold") == 0) {
112 found_threshold = true;
118 bool Uint(unsigned u) {
120 if (curr_key.compare("policy_type_id") == 0) {
122 } else if (curr_key.compare("policy_instance_id") == 0) {
123 policy_instance_id = u;
124 } else if (curr_key.compare("threshold") == 0) {
125 found_threshold = true;
131 bool Int64(int64_t i) { return true; }
132 bool Uint64(uint64_t u) { return true; }
133 bool Double(double d) { return true; }
134 bool String(const char* str, SizeType length, bool copy) {
136 if (curr_key.compare("operation") != 0) {
146 bool Key(const char* str, SizeType length, bool copy) {
152 bool EndObject(SizeType memberCount) { return true; }
153 bool StartArray() { return true; }
154 bool EndArray(SizeType elementCount) { return true; }
158 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
159 unordered_map<string, int> cell_pred_down;
160 unordered_map<string, int> cell_pred_up;
162 bool ue_id_found = false;
163 string curr_key = "";
164 string curr_value = "";
165 string serving_cell_id;
166 bool down_val = true;
167 bool Null() { return true; }
168 bool Bool(bool b) { return true; }
169 bool Int(int i) { return true; }
170 bool Uint(unsigned u) {
171 // Currently, we assume the first cell in the prediction message is the serving cell
172 if ( serving_cell_id.empty() ) {
173 serving_cell_id = curr_key;
177 cell_pred_down[curr_key] = u;
180 cell_pred_up[curr_key] = u;
187 bool Int64(int64_t i) { return true; }
188 bool Uint64(uint64_t u) { return true; }
189 bool Double(double d) { return true; }
190 bool String(const char* str, SizeType length, bool copy) {
194 bool StartObject() { return true; }
195 bool Key(const char* str, SizeType length, bool copy) {
205 bool EndObject(SizeType memberCount) { return true; }
206 bool StartArray() { return true; }
207 bool EndArray(SizeType elementCount) { return true; }
210 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
212 Assuming we receive the following payload from AD
213 [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
215 vector<string> prediction_ues;
216 string curr_key = "";
218 bool Key(const Ch* str, SizeType len, bool copy) {
223 bool String(const Ch* str, SizeType len, bool copy) {
224 // We are only interested in the "ue-id"
225 if ( curr_key.compare( "ue-id") == 0 ) {
226 prediction_ues.push_back( str );
233 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
234 unordered_map<string, string> cell_pred;
235 std::string serving_cell_id;
236 int serving_cell_rsrp;
237 int serving_cell_rsrq;
238 int serving_cell_sinr;
239 bool in_serving_array = false;
240 int rf_meas_index = 0;
242 bool in_serving_report_object = false;
244 string curr_key = "";
245 string curr_value = "";
246 bool Null() { return true; }
247 bool Bool(bool b) { return true; }
253 bool Uint(unsigned i) {
255 if (in_serving_report_object) {
256 if (curr_key.compare("rsrp") == 0) {
257 serving_cell_rsrp = i;
258 } else if (curr_key.compare("rsrq") == 0) {
259 serving_cell_rsrq = i;
260 } else if (curr_key.compare("rssinr") == 0) {
261 serving_cell_sinr = i;
266 bool Int64(int64_t i) {
269 bool Uint64(uint64_t i) {
272 bool Double(double d) { return true; }
273 bool String(const char* str, SizeType length, bool copy) {
275 if (curr_key.compare("ServingCellID") == 0) {
276 serving_cell_id = str;
282 if (curr_key.compare("ServingCellRF") == 0) {
283 in_serving_report_object = true;
287 bool Key(const char* str, SizeType length, bool copy) {
292 bool EndObject(SizeType memberCount) {
293 if (curr_key.compare("ServingCellRF") == 0) {
294 in_serving_report_object = false;
299 if (curr_key.compare("ServingCellRF") == 0) {
300 in_serving_array = true;
305 bool EndArray(SizeType elementCount) {
307 if (curr_key.compare("servingCellRF") == 0) {
308 in_serving_array = false;
316 /* unordered_map<string, UEData> get_sdl_ue_data() {
318 fprintf(stderr, "In get_sdl_ue_data()\n");
320 unordered_map<string, string> ue_data;
322 unordered_map<string, UEData> return_ue_data_map;
324 std::string prefix3="";
325 Keys K2 = sdl->findKeys(nsu, prefix3);
326 DataMap Dk2 = sdl->get(nsu, K2);
331 for(auto si=K2.begin();si!=K2.end();++si){
332 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
333 char val[val_v.size()+1]; // from Data
336 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
338 ue_id.assign((std::string)*si);
341 ue_data[ue_id] = ue_json;
344 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
345 UEDataHandler handler;
347 StringStream ss(map_iter->second.c_str());
348 reader.Parse(ss,handler);
350 string ueID = map_iter->first;
351 string serving_cell_id = handler.serving_cell_id;
352 int serv_rsrp = handler.serving_cell_rsrp;
354 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
358 return return_ue_data_map;
361 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
363 int response_to = 0; // max timeout wating for a response
364 int rmtype; // received message type
366 fprintf(stderr, "[INFO] Policy Callback got a message, type=%d, length=%d\n", mtype, len);
368 const char *arg = (const char*)payload.get();
370 fprintf(stderr, "[INFO] Payload is %s\n", arg);
372 PolicyHandler handler;
374 StringStream ss(arg);
375 reader.Parse(ss,handler);
377 //Set the threshold value
378 if (handler.found_threshold) {
379 fprintf(stderr, "[INFO] Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
380 rsrp_threshold = handler.threshold;
383 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
384 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
387 // callback to handle handover reply (json http response)
388 size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
389 const size_t totalBytes( size * num );
390 out->append( in, totalBytes );
394 // sends a handover message through REST
395 void send_handoff_request( string msg ) {
396 CURL *curl = curl_easy_init();
397 curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
398 curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
399 curl_easy_setopt( curl, CURLOPT_POST, 1L );
400 // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
402 // response information
404 unique_ptr<string> httpData( new string() );
406 curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
407 curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
408 curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
410 struct curl_slist *headers = NULL; // needs to free this after easy perform
411 headers = curl_slist_append( headers, "Accept: application/json" );
412 headers = curl_slist_append( headers, "Content-Type: application/json" );
413 curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
415 cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
416 cout << "[INFO] HandOff request is " << msg << endl;
419 CURLcode res = curl_easy_perform( curl );
420 if( res != CURLE_OK ) {
421 cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
425 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
426 if( httpCode == 200 ) {
427 // ============== DO SOMETHING USEFUL HERE ===============
428 // Currently, we only print out the HandOff reply
429 rapidjson::Document document;
430 document.Parse( httpData.get()->c_str() );
431 rapidjson::StringBuffer s;
432 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
433 document.Accept( writer );
434 cout << "[INFO] HandOff reply is " << s.GetString() << endl;
437 } else if ( httpCode == 404 ) {
438 cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
440 cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
441 "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
446 curl_slist_free_all( headers );
447 curl_easy_cleanup( curl );
450 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
454 static unsigned int seq_number = 0; // static counter, not thread-safe
456 int response_to = 0; // max timeout wating for a response
459 int rmtype; // received message type
460 int delay = 1000000; // mu-sec delay; default 1s
462 cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
463 cout << "[INFO] Payload is " << payload.get() << endl;
465 const char* arg = (const char*)payload.get();
466 PredictionHandler handler;
470 StringStream ss(arg);
471 reader.Parse(ss,handler);
473 cout << "[ERROR] Got an exception on stringstream read parse\n";
476 // We are only considering download throughput
477 unordered_map<string, int> throughput_map = handler.cell_pred_down;
479 // Decision about CONTROL message
480 // (1) Identify UE Id in Prediction message
481 // (2) Iterate through Prediction message.
482 // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
483 // We assume the first cell in the prediction message is the serving cell
485 int serving_cell_throughput = 0;
486 int highest_throughput = 0;
487 string highest_throughput_cell_id;
489 // Getting the current serving cell throughput prediction
490 auto cell = throughput_map.find( handler.serving_cell_id );
491 serving_cell_throughput = cell->second;
493 // Iterating to identify the highest throughput prediction
494 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
496 string curr_cellid = map_iter->first;
497 int curr_throughput = map_iter->second;
499 if ( highest_throughput < curr_throughput ) {
500 highest_throughput = curr_throughput;
501 highest_throughput_cell_id = curr_cellid;
506 if ( highest_throughput > serving_cell_throughput ) {
507 // building a handoff control message
508 now = time( nullptr );
509 str_now = ctime( &now );
510 str_now.pop_back(); // removing the \n character
512 seq_number++; // static counter, not thread-safe
514 rapidjson::StringBuffer s;
515 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
516 writer.StartObject();
517 writer.Key( "command" );
518 writer.String( "HandOff" );
519 writer.Key( "seqNo" );
520 writer.Int( seq_number );
522 writer.String( handler.ue_id.c_str() );
523 writer.Key( "fromCell" );
524 writer.String( handler.serving_cell_id.c_str() );
525 writer.Key( "toCell" );
526 writer.String( highest_throughput_cell_id.c_str() );
527 writer.Key( "timestamp" );
528 writer.String( str_now.c_str() );
529 writer.Key( "reason" );
530 writer.String( "HandOff Control Request from TS xApp" );
534 // creates a message like
536 "command": "HandOff",
541 "timestamp": "Sat May 22 10:35:33 2021",
542 "reason": "HandOff Control Request from TS xApp",
546 // sending a control request message
547 send_handoff_request( s.GetString() );
550 cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
553 // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
554 // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
557 void send_prediction_request( vector<string> ues_to_predict ) {
559 std::unique_ptr<Message> msg;
560 Msg_component payload; // special type of unique pointer to the payload
565 Msg_component send_payload;
567 msg = xfw->Alloc_msg( 2048 );
569 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
571 fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
575 string ues_list = "[";
577 for (int i = 0; i < ues_to_predict.size(); i++) {
578 if (i == ues_to_predict.size() - 1) {
579 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
581 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
585 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
587 const char *body = message_body.c_str();
589 send_payload = msg->Get_payload(); // direct access to payload
590 snprintf( (char *) send_payload.get(), 2048, "%s", body );
593 we are sending a string, so we have to include the nil byte in the RMR message
594 to keep things simple in the receiver side
596 plen = strlen( (char *) send_payload.get() ) + 1;
598 cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
600 // payload updated in place, nothing to copy from, so payload parm is nil
601 if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
602 fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
607 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
608 * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
609 * sends a prediction request to the QP Driver xApp.
611 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
612 const char *json = (const char *) payload.get();
614 cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
615 cout << "[INFO] Payload is " << json << "\n";
617 AnomalyHandler handler;
619 StringStream ss(json);
620 reader.Parse(ss,handler);
622 // just sending ACK to the AD xApp
623 mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
625 // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
626 // if( handler.degradation < rsrp_threshold )
627 send_prediction_request(handler.prediction_ues);
630 extern int main( int argc, char** argv ) {
633 char* port = (char *) "4560";
635 // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
636 if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
637 cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
641 fprintf( stderr, "[TS xApp] listening on port %s\n", port );
642 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
644 xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
645 xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
646 xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
648 xfw->Run( nthreads );