3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: Traffic Steering xApp;
25 2. Queries SDL to decide which UE to attempt Traffic Steering for
26 3. Requests prediction for UE throughput on current and neighbor cells
27 4. Receives prediction
28 5. Optionally exercises Traffic Steering action over E2
43 #include <sdl/syncstorage.hpp>
48 #include <unordered_map>
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
57 #include "ricxfcpp/xapp.hpp"
59 using namespace rapidjson;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
68 // ----------------------------------------------------------
70 std::unique_ptr<Xapp> xfw;
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
75 int rsrp_threshold = 0;
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
84 int serving_cell_rsrp;
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89 unordered_map<string, string> cell_pred;
91 bool ue_id_found = false;
93 string curr_value = "";
95 int policy_instance_id;
97 std::string operation;
98 bool found_threshold = false;
101 bool Null() { return true; }
102 bool Bool(bool b) { return true; }
105 if (curr_key.compare("policy_type_id") == 0) {
107 } else if (curr_key.compare("policy_instance_id") == 0) {
108 policy_instance_id = i;
109 } else if (curr_key.compare("threshold") == 0) {
110 found_threshold = true;
116 bool Uint(unsigned u) {
118 if (curr_key.compare("policy_type_id") == 0) {
120 } else if (curr_key.compare("policy_instance_id") == 0) {
121 policy_instance_id = u;
122 } else if (curr_key.compare("threshold") == 0) {
123 found_threshold = true;
129 bool Int64(int64_t i) { return true; }
130 bool Uint64(uint64_t u) { return true; }
131 bool Double(double d) { return true; }
132 bool String(const char* str, SizeType length, bool copy) {
134 if (curr_key.compare("operation") != 0) {
144 bool Key(const char* str, SizeType length, bool copy) {
150 bool EndObject(SizeType memberCount) { return true; }
151 bool StartArray() { return true; }
152 bool EndArray(SizeType elementCount) { return true; }
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157 unordered_map<string, int> cell_pred_down;
158 unordered_map<string, int> cell_pred_up;
160 bool ue_id_found = false;
161 string curr_key = "";
162 string curr_value = "";
163 bool down_val = true;
164 bool Null() { return true; }
165 bool Bool(bool b) { return true; }
166 bool Int(int i) { return true; }
167 bool Uint(unsigned u) {
170 cell_pred_down[curr_key] = u;
173 cell_pred_up[curr_key] = u;
180 bool Int64(int64_t i) { return true; }
181 bool Uint64(uint64_t u) { return true; }
182 bool Double(double d) { return true; }
183 bool String(const char* str, SizeType length, bool copy) {
187 bool StartObject() { return true; }
188 bool Key(const char* str, SizeType length, bool copy) {
198 bool EndObject(SizeType memberCount) { return true; }
199 bool StartArray() { return true; }
200 bool EndArray(SizeType elementCount) { return true; }
204 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
205 unordered_map<string, string> cell_pred;
206 std::string serving_cell_id;
207 int serving_cell_rsrp;
208 int serving_cell_rsrq;
209 int serving_cell_sinr;
210 bool in_serving_array = false;
211 int rf_meas_index = 0;
213 bool in_serving_report_object = false;
215 string curr_key = "";
216 string curr_value = "";
217 bool Null() { return true; }
218 bool Bool(bool b) { return true; }
224 bool Uint(unsigned i) {
226 if (in_serving_report_object) {
227 if (curr_key.compare("rsrp") == 0) {
228 serving_cell_rsrp = i;
229 } else if (curr_key.compare("rsrq") == 0) {
230 serving_cell_rsrq = i;
231 } else if (curr_key.compare("rssinr") == 0) {
232 serving_cell_sinr = i;
237 bool Int64(int64_t i) {
240 bool Uint64(uint64_t i) {
243 bool Double(double d) { return true; }
244 bool String(const char* str, SizeType length, bool copy) {
246 if (curr_key.compare("ServingCellID") == 0) {
247 serving_cell_id = str;
253 if (curr_key.compare("ServingCellRF") == 0) {
254 in_serving_report_object = true;
258 bool Key(const char* str, SizeType length, bool copy) {
263 bool EndObject(SizeType memberCount) {
264 if (curr_key.compare("ServingCellRF") == 0) {
265 in_serving_report_object = false;
270 if (curr_key.compare("ServingCellRF") == 0) {
271 in_serving_array = true;
276 bool EndArray(SizeType elementCount) {
278 if (curr_key.compare("servingCellRF") == 0) {
279 in_serving_array = false;
287 unordered_map<string, UEData> get_sdl_ue_data() {
289 fprintf(stderr, "In get_sdl_ue_data()\n");
291 unordered_map<string, string> ue_data;
293 unordered_map<string, UEData> return_ue_data_map;
295 std::string prefix3="";
296 Keys K2 = sdl->findKeys(nsu, prefix3);
297 DataMap Dk2 = sdl->get(nsu, K2);
302 for(auto si=K2.begin();si!=K2.end();++si){
303 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
304 char val[val_v.size()+1]; // from Data
307 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
309 ue_id.assign((std::string)*si);
312 ue_data[ue_id] = ue_json;
315 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
316 UEDataHandler handler;
318 StringStream ss(map_iter->second.c_str());
319 reader.Parse(ss,handler);
321 string ueID = map_iter->first;
322 string serving_cell_id = handler.serving_cell_id;
323 int serv_rsrp = handler.serving_cell_rsrp;
325 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
329 return return_ue_data_map;
332 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
334 int response_to = 0; // max timeout wating for a response
335 int rmtype; // received message type
338 fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
340 const char *arg = (const char*)payload.get();
342 fprintf(stderr, "payload is %s\n", payload.get());
344 PolicyHandler handler;
346 StringStream ss(arg);
347 reader.Parse(ss,handler);
349 //Set the threshold value
351 if (handler.found_threshold) {
352 fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
353 rsrp_threshold = handler.threshold;
356 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
357 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
362 void send_prediction_request(vector<string> ues_to_predict) {
364 std::unique_ptr<Message> msg;
365 Msg_component payload; // special type of unique pointer to the payload
368 int response_to = 0; // max timeout wating for a response
372 Msg_component send_payload;
374 msg = xfw->Alloc_msg( 2048 );
376 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
378 fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
382 string ues_list = "[";
384 for (int i = 0; i < ues_to_predict.size(); i++) {
385 if (i == ues_to_predict.size() - 1) {
386 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
388 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
392 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
394 const char *body = message_body.c_str();
396 // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
398 send_payload = msg->Get_payload(); // direct access to payload
399 // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
400 snprintf( (char *) send_payload.get(), 2048, body);
401 //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
403 fprintf(stderr, "message body %s\n", send_payload.get());
404 fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
406 // payload updated in place, nothing to copy from, so payload parm is nil
407 if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
408 fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
412 msg = xfw->Receive( response_to );
414 rmtype = msg->Get_mtype();
415 send_payload = msg->Get_payload();
416 fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
422 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
430 int response_to = 0; // max timeout wating for a response
433 int rmtype; // received message type
434 int delay = 1000000; // mu-sec delay; default 1s
436 cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
437 cout << "payload is " << payload.get() << "\n";
441 const char* arg = (const char*)payload.get();
442 PredictionHandler handler;
447 StringStream ss(arg);
448 reader.Parse(ss,handler);
450 cout << "got an exception on stringstream read parse\n";
453 std::string pred_ue_id = handler.ue_id;
455 cout << "Prediction for " << pred_ue_id << endl;
457 unordered_map<string, int> throughput_map = handler.cell_pred_down;
461 unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
463 //Decision about CONTROL message
464 //(1) Identify UE Id in Prediction message
465 //(2) Get UEData struct for this UE Id
466 //(3) Identify the UE's service cell ID
467 //(4) Iterate through Prediction message.
468 // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
470 UEData pred_ue_data = sdl_data[pred_ue_id];
471 std::string serving_cell_id = pred_ue_data.serving_cell;
473 int serving_cell_throughput;
474 int highest_throughput;
475 std::string highest_throughput_cell_id;
476 std::string::size_type str_size;
478 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
480 std::string curr_cellid = map_iter->first;
481 int curr_throughput = map_iter->second;
483 if (curr_cellid.compare(serving_cell_id) == 0) {
484 serving_cell_throughput = curr_throughput;
485 highest_throughput = serving_cell_throughput;
490 //Iterating again to identify the highest throughput prediction
492 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
494 std::string curr_cellid = map_iter->first;
495 int curr_throughput = map_iter->second;
497 if (curr_throughput > serving_cell_throughput) {
498 highest_throughput = curr_throughput;
499 highest_throughput_cell_id = curr_cellid;
503 if (highest_throughput > serving_cell_throughput) {
504 cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
505 cout << "UE ID: " << pred_ue_id << endl;
506 cout << "Source cell " << serving_cell_id << endl;
507 cout << "Target cell " << highest_throughput_cell_id << endl;
510 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
511 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
517 //This function runs a loop that continuously checks SDL for any UE
521 cout << "in Traffic Steering run_loop()\n";
523 unordered_map<string, UEData> uemap;
527 uemap = get_sdl_ue_data();
529 vector<string> prediction_ues;
531 for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
532 string ueid = map_iter->first;
533 fprintf(stderr,"found a ueid %s\n", ueid.c_str());
534 UEData data = map_iter->second;
536 fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
538 if (data.serving_cell_rsrp < rsrp_threshold) {
539 fprintf(stderr,"it is less than the rsrp threshold\n");
540 prediction_ues.push_back(ueid);
542 fprintf(stderr,"it is not less than the rsrp threshold\n");
546 fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
548 if (prediction_ues.size() > 0) {
549 send_prediction_request(prediction_ues);
556 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
557 * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
559 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
560 cout << "payload is " << payload.get() << "\n";
561 mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
564 extern int main( int argc, char** argv ) {
568 char* port = (char *) "4560";
570 sdl = shareddatalayer::SyncStorage::create();
572 nsu = Namespace(sdl_namespace_u);
573 nsc = Namespace(sdl_namespace_c);
576 fprintf( stderr, "<XAPP> listening on port: %s\n", port );
577 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
579 xfw->Add_msg_cb( 20010, policy_callback, NULL );
580 xfw->Add_msg_cb( 30002, prediction_callback, NULL );
581 xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
583 std::thread loop_thread;
585 loop_thread = std::thread(&run_loop);
587 xfw->Run( nthreads );