3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: Traffic Steering xApp;
25 2. Queries SDL to decide which UE to attempt Traffic Steering for
26 3. Requests prediction for UE throughput on current and neighbor cells
27 4. Receives prediction
28 5. Optionally exercises Traffic Steering action over E2
43 #include <sdl/syncstorage.hpp>
48 #include <unordered_map>
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
57 #include "ricxfcpp/xapp.hpp"
59 using namespace rapidjson;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
68 // ----------------------------------------------------------
70 std::unique_ptr<Xapp> xfw;
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
75 int rsrp_threshold = 0;
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
84 int serving_cell_rsrp;
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89 unordered_map<string, string> cell_pred;
91 bool ue_id_found = false;
93 string curr_value = "";
95 int policy_instance_id;
97 std::string operation;
98 bool found_threshold = false;
101 bool Null() { return true; }
102 bool Bool(bool b) { return true; }
105 if (curr_key.compare("policy_type_id") == 0) {
107 } else if (curr_key.compare("policy_instance_id") == 0) {
108 policy_instance_id = i;
109 } else if (curr_key.compare("threshold") == 0) {
110 found_threshold = true;
116 bool Uint(unsigned u) {
118 if (curr_key.compare("policy_type_id") == 0) {
120 } else if (curr_key.compare("policy_instance_id") == 0) {
121 policy_instance_id = u;
122 } else if (curr_key.compare("threshold") == 0) {
123 found_threshold = true;
129 bool Int64(int64_t i) { return true; }
130 bool Uint64(uint64_t u) { return true; }
131 bool Double(double d) { return true; }
132 bool String(const char* str, SizeType length, bool copy) {
134 if (curr_key.compare("operation") != 0) {
144 bool Key(const char* str, SizeType length, bool copy) {
150 bool EndObject(SizeType memberCount) { return true; }
151 bool StartArray() { return true; }
152 bool EndArray(SizeType elementCount) { return true; }
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157 unordered_map<string, int> cell_pred_down;
158 unordered_map<string, int> cell_pred_up;
160 bool ue_id_found = false;
161 string curr_key = "";
162 string curr_value = "";
163 bool down_val = true;
164 bool Null() { return true; }
165 bool Bool(bool b) { return true; }
166 bool Int(int i) { return true; }
167 bool Uint(unsigned u) {
170 cell_pred_down[curr_key] = u;
173 cell_pred_up[curr_key] = u;
180 bool Int64(int64_t i) { return true; }
181 bool Uint64(uint64_t u) { return true; }
182 bool Double(double d) { return true; }
183 bool String(const char* str, SizeType length, bool copy) {
187 bool StartObject() { return true; }
188 bool Key(const char* str, SizeType length, bool copy) {
198 bool EndObject(SizeType memberCount) { return true; }
199 bool StartArray() { return true; }
200 bool EndArray(SizeType elementCount) { return true; }
204 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
205 unordered_map<string, string> cell_pred;
206 std::string serving_cell_id;
207 int serving_cell_rsrp;
208 int serving_cell_rsrq;
209 int serving_cell_sinr;
210 bool in_serving_array = false;
211 int rf_meas_index = 0;
213 string curr_key = "";
214 string curr_value = "";
215 bool Null() { return true; }
216 bool Bool(bool b) { return true; }
219 if (in_serving_array) {
221 switch(rf_meas_index) {
223 serving_cell_rsrp = i;
226 serving_cell_rsrq = i;
229 serving_cell_sinr = i;
236 bool Uint(unsigned u) {
238 bool Int64(int64_t i) { return true; }
239 bool Uint64(uint64_t u) { return true; }
240 bool Double(double d) { return true; }
241 bool String(const char* str, SizeType length, bool copy) {
243 if (curr_key.compare("ServingCellID") == 0) {
244 serving_cell_id = str;
249 bool StartObject() { return true; }
250 bool Key(const char* str, SizeType length, bool copy) {
255 bool EndObject(SizeType memberCount) { return true; }
258 if (curr_key.compare("ServingCellRF") == 0) {
259 in_serving_array = true;
264 bool EndArray(SizeType elementCount) {
266 if (curr_key.compare("servingCellRF") == 0) {
267 in_serving_array = false;
275 unordered_map<string, UEData> get_sdl_ue_data() {
277 fprintf(stderr, "In get_sdl_ue_data()\n");
279 unordered_map<string, string> ue_data;
281 unordered_map<string, UEData> return_ue_data_map;
283 std::string prefix3="12";
284 Keys K2 = sdl->findKeys(nsu, prefix3);
285 DataMap Dk2 = sdl->get(nsu, K2);
290 for(auto si=K2.begin();si!=K2.end();++si){
291 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
292 char val[val_v.size()+1]; // from Data
295 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
297 ue_id.assign((std::string)*si);
300 ue_data[ue_id] = ue_json;
303 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
304 UEDataHandler handler;
306 StringStream ss(map_iter->second.c_str());
307 reader.Parse(ss,handler);
309 string ueID = map_iter->first;
310 string serving_cell_id = handler.serving_cell_id;
311 int serv_rsrp = handler.serving_cell_rsrp;
313 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
317 return return_ue_data_map;
320 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
322 int response_to = 0; // max timeout wating for a response
323 int rmtype; // received message type
326 cout << "Policy Callback got a message, type=" << mtype << " , length=" << len << endl;
327 cout << "payload is " << payload.get() << endl;
330 const char *arg = (const char*)payload.get();
332 PolicyHandler handler;
334 StringStream ss(arg);
335 reader.Parse(ss,handler);
337 //Set the threshold value
339 if (handler.found_threshold) {
340 cout << "Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
341 rsrp_threshold = handler.threshold;
344 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
345 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
350 void send_prediction_request(vector<string> ues_to_predict) {
352 std::unique_ptr<Message> msg;
353 Msg_component payload; // special type of unique pointer to the payload
356 int response_to = 0; // max timeout wating for a response
360 Msg_component send_payload;
362 msg = xfw->Alloc_msg( 2048 );
364 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
366 fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
370 string ues_list = "[";
372 for (int i = 0; i < ues_to_predict.size(); i++) {
373 if (i == ues_to_predict.size() - 1) {
374 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
376 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
380 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
382 const char *body = message_body.c_str();
384 // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
386 send_payload = msg->Get_payload(); // direct access to payload
387 // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
388 // snprintf( (char *) send_payload.get(), 2048, body);
389 snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
391 fprintf(stderr, "message body %s\n", send_payload.get());
392 fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
394 // payload updated in place, nothing to copy from, so payload parm is nil
395 if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
396 fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
400 msg = xfw->Receive( response_to );
402 rmtype = msg->Get_mtype();
403 send_payload = msg->Get_payload();
404 fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
410 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
418 int response_to = 0; // max timeout wating for a response
421 int rmtype; // received message type
422 int delay = 1000000; // mu-sec delay; default 1s
424 cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
425 cout << "payload is " << payload.get() << "\n";
429 const char* arg = (const char*)payload.get();
430 PredictionHandler handler;
435 StringStream ss(arg);
436 reader.Parse(ss,handler);
438 cout << "got an exception on stringstream read parse\n";
441 std::string pred_ue_id = handler.ue_id;
443 cout << "Prediction for " << pred_ue_id << endl;
445 unordered_map<string, int> throughput_map = handler.cell_pred_down;
449 unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
451 //Decision about CONTROL message
452 //(1) Identify UE Id in Prediction message
453 //(2) Get UEData struct for this UE Id
454 //(3) Identify the UE's service cell ID
455 //(4) Iterate through Prediction message.
456 // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
458 UEData pred_ue_data = sdl_data[pred_ue_id];
459 std::string serving_cell_id = pred_ue_data.serving_cell;
461 int serving_cell_throughput;
462 int highest_throughput;
463 std::string highest_throughput_cell_id;
464 std::string::size_type str_size;
466 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
468 std::string curr_cellid = map_iter->first;
469 int curr_throughput = map_iter->second;
471 if (curr_cellid.compare(serving_cell_id) == 0) {
472 serving_cell_throughput = curr_throughput;
473 highest_throughput = serving_cell_throughput;
478 //Iterating again to identify the highest throughput prediction
480 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
482 std::string curr_cellid = map_iter->first;
483 int curr_throughput = map_iter->second;
485 if (curr_throughput > serving_cell_throughput) {
486 highest_throughput = curr_throughput;
487 highest_throughput_cell_id = curr_cellid;
491 if (highest_throughput > serving_cell_throughput) {
492 cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
493 cout << "UE ID: " << pred_ue_id << endl;
494 cout << "Source cell " << serving_cell_id << endl;
495 cout << "Target cell " << highest_throughput_cell_id << endl;
498 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
499 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
505 //This function runs a loop that continuously checks SDL for any UE
509 cout << "in Traffic Steering run_loop()\n";
511 unordered_map<string, UEData> uemap;
513 vector<string> prediction_ues;
517 uemap = get_sdl_ue_data();
519 for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
520 string ueid = map_iter->first;
521 UEData data = map_iter->second;
522 if (data.serving_cell_rsrp < rsrp_threshold) {
523 prediction_ues.push_back(ueid);
527 if (prediction_ues.size() > 0) {
528 send_prediction_request(prediction_ues);
537 extern int main( int argc, char** argv ) {
541 char* port = (char *) "4560";
543 sdl = shareddatalayer::SyncStorage::create();
545 nsu = Namespace(sdl_namespace_u);
546 nsc = Namespace(sdl_namespace_c);
549 fprintf( stderr, "<XAPP> listening on port: %s\n", port );
550 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
552 xfw->Add_msg_cb( 20010, policy_callback, NULL );
553 xfw->Add_msg_cb( 30002, prediction_callback, NULL );
555 std::thread loop_thread;
557 loop_thread = std::thread(&run_loop);
559 xfw->Run( nthreads );