3 ==================================================================================
4 Copyright (c) 2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
22 Abstract: Traffic Steering xApp;
24 2. Queries SDL to decide which UE to attempt Traffic Steering for
25 3. Requests prediction for UE throughput on current and neighbor cells
26 4. Receives prediction
27 5. Optionally exercises Traffic Steering action over E2
42 #include <sdl/syncstorage.hpp>
47 #include <unordered_map>
49 #include <rapidjson/document.h>
50 #include <rapidjson/writer.h>
51 #include <rapidjson/stringbuffer.h>
52 #include <rapidjson/schema.h>
53 #include <rapidjson/reader.h>
56 #include "ricxfcpp/xapp.hpp"
58 using namespace rapidjson;
60 using Namespace = std::string;
61 using Key = std::string;
62 using Data = std::vector<uint8_t>;
63 using DataMap = std::map<Key, Data>;
64 using Keys = std::set<Key>;
67 // ----------------------------------------------------------
69 std::unique_ptr<Xapp> xfw;
71 std::string sdl_namespace_u = "TS-UE-metrics";
72 std::string sdl_namespace_c = "TS-cell-metrics";
74 int rsrp_threshold = 0;
76 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
83 int serving_cell_rsrp;
87 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
88 unordered_map<string, string> cell_pred;
90 bool ue_id_found = false;
92 string curr_value = "";
94 int policy_instance_id;
96 std::string operation;
97 bool found_threshold = false;
100 bool Null() { return true; }
101 bool Bool(bool b) { return true; }
104 if (curr_key.compare("policy_type_id") == 0) {
106 } else if (curr_key.compare("policy_instance_id") == 0) {
107 policy_instance_id = i;
108 } else if (curr_key.compare("threshold") == 0) {
109 found_threshold = true;
115 bool Uint(unsigned u) {
117 if (curr_key.compare("policy_type_id") == 0) {
119 } else if (curr_key.compare("policy_instance_id") == 0) {
120 policy_instance_id = u;
121 } else if (curr_key.compare("threshold") == 0) {
122 found_threshold = true;
128 bool Int64(int64_t i) { return true; }
129 bool Uint64(uint64_t u) { return true; }
130 bool Double(double d) { return true; }
131 bool String(const char* str, SizeType length, bool copy) {
133 if (curr_key.compare("operation") != 0) {
143 bool Key(const char* str, SizeType length, bool copy) {
149 bool EndObject(SizeType memberCount) { return true; }
150 bool StartArray() { return true; }
151 bool EndArray(SizeType elementCount) { return true; }
155 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
156 unordered_map<string, int> cell_pred_down;
157 unordered_map<string, int> cell_pred_up;
159 bool ue_id_found = false;
160 string curr_key = "";
161 string curr_value = "";
162 bool down_val = true;
163 bool Null() { return true; }
164 bool Bool(bool b) { return true; }
165 bool Int(int i) { return true; }
166 bool Uint(unsigned u) {
169 cell_pred_down[curr_key] = u;
172 cell_pred_up[curr_key] = u;
179 bool Int64(int64_t i) { return true; }
180 bool Uint64(uint64_t u) { return true; }
181 bool Double(double d) { return true; }
182 bool String(const char* str, SizeType length, bool copy) {
186 bool StartObject() { return true; }
187 bool Key(const char* str, SizeType length, bool copy) {
197 bool EndObject(SizeType memberCount) { return true; }
198 bool StartArray() { return true; }
199 bool EndArray(SizeType elementCount) { return true; }
203 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
204 unordered_map<string, string> cell_pred;
205 std::string serving_cell_id;
206 int serving_cell_rsrp;
207 int serving_cell_rsrq;
208 int serving_cell_sinr;
209 bool in_serving_array = false;
210 int rf_meas_index = 0;
212 bool in_serving_report_object = false;
214 string curr_key = "";
215 string curr_value = "";
216 bool Null() { return true; }
217 bool Bool(bool b) { return true; }
223 bool Uint(unsigned i) {
225 if (in_serving_report_object) {
226 if (curr_key.compare("rsrp") == 0) {
227 serving_cell_rsrp = i;
228 } else if (curr_key.compare("rsrq") == 0) {
229 serving_cell_rsrq = i;
230 } else if (curr_key.compare("rssinr") == 0) {
231 serving_cell_sinr = i;
236 bool Int64(int64_t i) {
239 bool Uint64(uint64_t i) {
242 bool Double(double d) { return true; }
243 bool String(const char* str, SizeType length, bool copy) {
245 if (curr_key.compare("ServingCellID") == 0) {
246 serving_cell_id = str;
252 if (curr_key.compare("ServingCellRF") == 0) {
253 in_serving_report_object = true;
257 bool Key(const char* str, SizeType length, bool copy) {
262 bool EndObject(SizeType memberCount) {
263 if (curr_key.compare("ServingCellRF") == 0) {
264 in_serving_report_object = false;
269 if (curr_key.compare("ServingCellRF") == 0) {
270 in_serving_array = true;
275 bool EndArray(SizeType elementCount) {
277 if (curr_key.compare("servingCellRF") == 0) {
278 in_serving_array = false;
286 unordered_map<string, UEData> get_sdl_ue_data() {
288 fprintf(stderr, "In get_sdl_ue_data()\n");
290 unordered_map<string, string> ue_data;
292 unordered_map<string, UEData> return_ue_data_map;
294 std::string prefix3="";
295 Keys K2 = sdl->findKeys(nsu, prefix3);
296 DataMap Dk2 = sdl->get(nsu, K2);
301 for(auto si=K2.begin();si!=K2.end();++si){
302 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
303 char val[val_v.size()+1]; // from Data
306 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
308 ue_id.assign((std::string)*si);
311 ue_data[ue_id] = ue_json;
314 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
315 UEDataHandler handler;
317 StringStream ss(map_iter->second.c_str());
318 reader.Parse(ss,handler);
320 string ueID = map_iter->first;
321 string serving_cell_id = handler.serving_cell_id;
322 int serv_rsrp = handler.serving_cell_rsrp;
324 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
328 return return_ue_data_map;
331 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
333 int response_to = 0; // max timeout wating for a response
334 int rmtype; // received message type
337 fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
339 const char *arg = (const char*)payload.get();
341 fprintf(stderr, "payload is %s\n", payload.get());
343 PolicyHandler handler;
345 StringStream ss(arg);
346 reader.Parse(ss,handler);
348 //Set the threshold value
350 if (handler.found_threshold) {
351 fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
352 rsrp_threshold = handler.threshold;
355 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
356 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
361 void send_prediction_request(vector<string> ues_to_predict) {
363 std::unique_ptr<Message> msg;
364 Msg_component payload; // special type of unique pointer to the payload
367 int response_to = 0; // max timeout wating for a response
371 Msg_component send_payload;
373 msg = xfw->Alloc_msg( 2048 );
375 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
377 fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
381 string ues_list = "[";
383 for (int i = 0; i < ues_to_predict.size(); i++) {
384 if (i == ues_to_predict.size() - 1) {
385 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
387 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
391 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
393 const char *body = message_body.c_str();
395 // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
397 send_payload = msg->Get_payload(); // direct access to payload
398 // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
399 snprintf( (char *) send_payload.get(), 2048, body);
400 //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
402 fprintf(stderr, "message body %s\n", send_payload.get());
403 fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
405 // payload updated in place, nothing to copy from, so payload parm is nil
406 if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
407 fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
411 msg = xfw->Receive( response_to );
413 rmtype = msg->Get_mtype();
414 send_payload = msg->Get_payload();
415 fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
421 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
429 int response_to = 0; // max timeout wating for a response
432 int rmtype; // received message type
433 int delay = 1000000; // mu-sec delay; default 1s
435 cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
436 cout << "payload is " << payload.get() << "\n";
440 const char* arg = (const char*)payload.get();
441 PredictionHandler handler;
446 StringStream ss(arg);
447 reader.Parse(ss,handler);
449 cout << "got an exception on stringstream read parse\n";
452 std::string pred_ue_id = handler.ue_id;
454 cout << "Prediction for " << pred_ue_id << endl;
456 unordered_map<string, int> throughput_map = handler.cell_pred_down;
460 unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
462 //Decision about CONTROL message
463 //(1) Identify UE Id in Prediction message
464 //(2) Get UEData struct for this UE Id
465 //(3) Identify the UE's service cell ID
466 //(4) Iterate through Prediction message.
467 // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
469 UEData pred_ue_data = sdl_data[pred_ue_id];
470 std::string serving_cell_id = pred_ue_data.serving_cell;
472 int serving_cell_throughput;
473 int highest_throughput;
474 std::string highest_throughput_cell_id;
475 std::string::size_type str_size;
477 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
479 std::string curr_cellid = map_iter->first;
480 int curr_throughput = map_iter->second;
482 if (curr_cellid.compare(serving_cell_id) == 0) {
483 serving_cell_throughput = curr_throughput;
484 highest_throughput = serving_cell_throughput;
489 //Iterating again to identify the highest throughput prediction
491 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
493 std::string curr_cellid = map_iter->first;
494 int curr_throughput = map_iter->second;
496 if (curr_throughput > serving_cell_throughput) {
497 highest_throughput = curr_throughput;
498 highest_throughput_cell_id = curr_cellid;
502 if (highest_throughput > serving_cell_throughput) {
503 cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
504 cout << "UE ID: " << pred_ue_id << endl;
505 cout << "Source cell " << serving_cell_id << endl;
506 cout << "Target cell " << highest_throughput_cell_id << endl;
509 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
510 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
516 //This function runs a loop that continuously checks SDL for any UE
520 cout << "in Traffic Steering run_loop()\n";
522 unordered_map<string, UEData> uemap;
526 uemap = get_sdl_ue_data();
528 vector<string> prediction_ues;
530 for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
531 string ueid = map_iter->first;
532 fprintf(stderr,"found a ueid %s\n", ueid.c_str());
533 UEData data = map_iter->second;
535 fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
537 if (data.serving_cell_rsrp < rsrp_threshold) {
538 fprintf(stderr,"it is less than the rsrp threshold\n");
539 prediction_ues.push_back(ueid);
541 fprintf(stderr,"it is not less than the rsrp threshold\n");
545 fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
547 if (prediction_ues.size() > 0) {
548 send_prediction_request(prediction_ues);
555 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
556 * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
558 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
559 cout << "payload is " << payload.get() << "\n";
560 mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
563 extern int main( int argc, char** argv ) {
567 char* port = (char *) "4560";
569 sdl = shareddatalayer::SyncStorage::create();
571 nsu = Namespace(sdl_namespace_u);
572 nsc = Namespace(sdl_namespace_c);
575 fprintf( stderr, "<XAPP> listening on port: %s\n", port );
576 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
578 xfw->Add_msg_cb( 20010, policy_callback, NULL );
579 xfw->Add_msg_cb( 30002, prediction_callback, NULL );
580 xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
582 std::thread loop_thread;
584 loop_thread = std::thread(&run_loop);
586 xfw->Run( nthreads );