3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: Traffic Steering xApp;
25 2. Queries SDL to decide which UE to attempt Traffic Steering for
26 3. Requests prediction for UE throughput on current and neighbor cells
27 4. Receives prediction
28 5. Optionally exercises Traffic Steering action over E2
43 #include <sdl/syncstorage.hpp>
48 #include <unordered_map>
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
57 #include "ricxfcpp/xapp.hpp"
59 using namespace rapidjson;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
68 // ----------------------------------------------------------
70 std::unique_ptr<Xapp> xfw;
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
75 int rsrp_threshold = 0;
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
84 int serving_cell_rsrp;
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89 unordered_map<string, string> cell_pred;
91 bool ue_id_found = false;
93 string curr_value = "";
95 int policy_instance_id;
97 std::string operation;
98 bool found_threshold = false;
101 bool Null() { cout << "Null()" << endl; return true; }
102 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
104 cout << "Int(" << i << ")" << endl;
105 if (curr_key.compare("policy_type_id") == 0) {
107 } else if (curr_key.compare("policy_instance_id") == 0) {
108 policy_instance_id = i;
109 } else if (curr_key.compare("threshold") == 0) {
110 found_threshold = true;
116 bool Uint(unsigned u) {
117 cout << "Int(" << u << ")" << endl;
118 if (curr_key.compare("policy_type_id") == 0) {
120 } else if (curr_key.compare("policy_instance_id") == 0) {
121 policy_instance_id = u;
122 } else if (curr_key.compare("threshold") == 0) {
123 found_threshold = true;
129 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
130 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
131 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
132 bool String(const char* str, SizeType length, bool copy) {
133 cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
134 if (curr_key.compare("operation") != 0) {
141 cout << "StartObject()" << endl;
144 bool Key(const char* str, SizeType length, bool copy) {
145 cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
150 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
151 bool StartArray() { cout << "StartArray()" << endl; return true; }
152 bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157 unordered_map<string, int> cell_pred_down;
158 unordered_map<string, int> cell_pred_up;
160 bool ue_id_found = false;
161 string curr_key = "";
162 string curr_value = "";
163 bool down_val = true;
164 bool Null() { cout << "Null()" << endl; return true; }
165 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
166 bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
167 bool Uint(unsigned u) {
168 cout << "Uint(" << u << ")" << endl;
170 cell_pred_down[curr_key] = u;
171 cout << "Setting xput down val for " << curr_key << " to " << u << endl;
174 cell_pred_up[curr_key] = u;
175 cout << "Setting xput up val for " << curr_key << " to " << u << endl;
182 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
183 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
184 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
185 bool String(const char* str, SizeType length, bool copy) {
186 cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
190 bool StartObject() { cout << "StartObject()" << endl; return true; }
191 bool Key(const char* str, SizeType length, bool copy) {
192 cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
194 cout << "Found UE ID\n";
202 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
203 bool StartArray() { cout << "StartArray()" << endl; return true; }
204 bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
208 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
209 unordered_map<string, string> cell_pred;
210 std::string serving_cell_id;
211 int serving_cell_rsrp;
212 int serving_cell_rsrq;
213 int serving_cell_sinr;
214 bool in_serving_array = false;
215 int rf_meas_index = 0;
217 string curr_key = "";
218 string curr_value = "";
219 bool Null() { cout << "Null()" << endl; return true; }
220 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
222 fprintf(stderr, "Int(%d)\n", i);
223 if (in_serving_array) {
224 fprintf(stderr, "we are in serving array\n");
225 switch(rf_meas_index) {
227 serving_cell_rsrp = i;
230 serving_cell_rsrq = i;
233 serving_cell_sinr = i;
240 bool Uint(unsigned u) {
241 fprintf(stderr, "Int(%d)\n", u); return true; }
242 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
243 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
244 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
245 bool String(const char* str, SizeType length, bool copy) {
246 fprintf(stderr,"String(%s)\n", str);
247 if (curr_key.compare("ServingCellID") == 0) {
248 serving_cell_id = str;
253 bool StartObject() { cout << "StartObject()" << endl; return true; }
254 bool Key(const char* str, SizeType length, bool copy) {
255 fprintf(stderr,"Key(%s)\n", str);
259 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
261 fprintf(stderr,"StartArray()");
262 if (curr_key.compare("ServingCellRF") == 0) {
263 in_serving_array = true;
268 bool EndArray(SizeType elementCount) {
269 fprintf(stderr, "EndArray()\n");
270 if (curr_key.compare("servingCellRF") == 0) {
271 in_serving_array = false;
279 unordered_map<string, UEData> get_sdl_ue_data() {
281 fprintf(stderr, "In get_sdl_ue_data()\n");
283 unordered_map<string, string> ue_data;
285 unordered_map<string, UEData> return_ue_data_map;
287 std::string prefix3="12";
288 Keys K2 = sdl->findKeys(nsu, prefix3);
289 DataMap Dk2 = sdl->get(nsu, K2);
294 for(auto si=K2.begin();si!=K2.end();++si){
295 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
296 char val[val_v.size()+1]; // from Data
298 fprintf(stderr, "val size %d\n", val_v.size());
299 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
301 ue_id.assign((std::string)*si);
304 ue_data[ue_id] = ue_json;
307 fprintf(stderr, "after sdl get of ue data\n");
309 fprintf(stderr, "From UE data map\n");
311 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
312 UEDataHandler handler;
314 StringStream ss(map_iter->second.c_str());
315 reader.Parse(ss,handler);
317 string ueID = map_iter->first;
318 string serving_cell_id = handler.serving_cell_id;
319 int serv_rsrp = handler.serving_cell_rsrp;
321 fprintf(stderr,"UE data for %s\n", ueID.c_str());
322 fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
323 fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
325 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
329 fprintf(stderr, "\n");
330 return return_ue_data_map;
333 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
335 int response_to = 0; // max timeout wating for a response
336 int rmtype; // received message type
339 cout << "Policy Callback got a message, type=" << mtype << " , length=" << len << endl;
340 cout << "payload is " << payload.get() << endl;
343 const char *arg = (const char*)payload.get();
345 PolicyHandler handler;
347 StringStream ss(arg);
348 reader.Parse(ss,handler);
350 //Set the threshold value
352 if (handler.found_threshold) {
353 cout << "Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
354 rsrp_threshold = handler.threshold;
357 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
358 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
363 void send_prediction_request(vector<string> ues_to_predict) {
365 std::unique_ptr<Message> msg;
366 Msg_component payload; // special type of unique pointer to the payload
369 int response_to = 0; // max timeout wating for a response
373 Msg_component send_payload;
375 fprintf(stderr, "cb 1\n");
377 msg = xfw->Alloc_msg( 2048 );
379 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
381 fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
385 fprintf(stderr, "cb 2");
387 string ues_list = "[";
389 for (int i = 0; i < ues_to_predict.size(); i++) {
390 if (i == ues_to_predict.size() - 1) {
391 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
393 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
397 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
399 const char *body = message_body.c_str();
401 // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
403 send_payload = msg->Get_payload(); // direct access to payload
404 // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
405 // snprintf( (char *) send_payload.get(), 2048, body);
406 snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
408 fprintf(stderr, "message body %s\n", send_payload.get());
410 fprintf(stderr, "cb 3");
411 fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
413 // payload updated in place, nothing to copy from, so payload parm is nil
414 if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
415 fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
418 fprintf(stderr, "cb 4");
421 msg = xfw->Receive( response_to );
423 rmtype = msg->Get_mtype();
424 send_payload = msg->Get_payload();
425 fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
431 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
439 int response_to = 0; // max timeout wating for a response
442 int rmtype; // received message type
443 int delay = 1000000; // mu-sec delay; default 1s
445 cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
446 cout << "payload is " << payload.get() << "\n";
450 cout << "prediction callback 1" << endl;
452 const char* arg = (const char*)payload.get();
454 cout << "ready to parse " << arg << endl;
456 PredictionHandler handler;
461 StringStream ss(arg);
462 reader.Parse(ss,handler);
464 cout << "got an exception on stringstream read parse\n";
467 std::string pred_ue_id = handler.ue_id;
469 cout << "Prediction for " << pred_ue_id << endl;
471 unordered_map<string, int> throughput_map = handler.cell_pred_down;
476 unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
478 //Decision about CONTROL message
479 //(1) Identify UE Id in Prediction message
480 //(2) Get UEData struct for this UE Id
481 //(3) Identify the UE's service cell ID
482 //(4) Iterate through Prediction message.
483 // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
485 UEData pred_ue_data = sdl_data[pred_ue_id];
486 std::string serving_cell_id = pred_ue_data.serving_cell;
488 int serving_cell_throughput;
489 int highest_throughput;
490 std::string highest_throughput_cell_id;
491 std::string::size_type str_size;
493 cout << "Going through throughtput map:" << endl;
495 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
496 cout << map_iter->first << " : " << map_iter->second << endl;
497 std::string curr_cellid = map_iter->first;
498 cout << "Cell ID is " << curr_cellid;
499 int curr_throughput = map_iter->second;
500 cout << "Throughput is " << curr_throughput << endl;
502 if (curr_cellid.compare(serving_cell_id) == 0) {
503 serving_cell_throughput = curr_throughput;
504 highest_throughput = serving_cell_throughput;
509 //Iterating again to identify the highest throughput prediction
511 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
512 cout << map_iter->first << " : " << map_iter->second << endl;
513 std::string curr_cellid = map_iter->first;
514 cout << "Cell ID is " << curr_cellid;
515 int curr_throughput = map_iter->second;
516 cout << "Throughput is " << curr_throughput << endl;
518 if (curr_throughput > serving_cell_throughput) {
519 highest_throughput = curr_throughput;
520 highest_throughput_cell_id = curr_cellid;
524 if (highest_throughput > serving_cell_throughput) {
525 cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
526 cout << "UE ID: " << pred_ue_id << endl;
527 cout << "Source cell " << serving_cell_id << endl;
528 cout << "Target cell " << highest_throughput_cell_id << endl;
531 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
532 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
538 //This function runs a loop that continuously checks SDL for any UE
542 cout << "in run_loop()\n";
544 unordered_map<string, UEData> uemap;
546 vector<string> prediction_ues;
550 cout << "in while loop\n";
552 uemap = get_sdl_ue_data();
554 for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
555 string ueid = map_iter->first;
556 UEData data = map_iter->second;
557 if (data.serving_cell_rsrp < rsrp_threshold) {
558 prediction_ues.push_back(ueid);
562 if (prediction_ues.size() > 0) {
563 send_prediction_request(prediction_ues);
572 extern int main( int argc, char** argv ) {
576 char* port = (char *) "4560";
578 sdl = shareddatalayer::SyncStorage::create();
580 nsu = Namespace(sdl_namespace_u);
581 nsc = Namespace(sdl_namespace_c);
584 fprintf( stderr, "<XAPP> listening on port: %s\n", port );
585 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
586 fprintf(stderr, "code1\n");
589 xfw->Add_msg_cb( 20010, policy_callback, NULL );
590 xfw->Add_msg_cb( 30002, prediction_callback, NULL );
592 fprintf(stderr, "code2\n");
594 std::thread loop_thread;
596 loop_thread = std::thread(&run_loop);
598 xfw->Run( nthreads );