3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: Traffic Steering xApp;
25 2. Queries SDL to decide which UE to attempt Traffic Steering for
26 3. Requests prediction for UE throughput on current and neighbor cells
27 4. Receives prediction
28 5. Optionally exercises Traffic Steering action over E2
43 #include <sdl/syncstorage.hpp>
48 #include <unordered_map>
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
57 #include "ricxfcpp/xapp.hpp"
59 using namespace rapidjson;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
68 // ----------------------------------------------------------
70 std::unique_ptr<Xapp> xfw;
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
75 int rsrp_threshold = 0;
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
84 int serving_cell_rsrp;
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89 unordered_map<string, string> cell_pred;
91 bool ue_id_found = false;
93 string curr_value = "";
95 int policy_instance_id;
97 std::string operation;
98 bool found_threshold = false;
101 bool Null() { cout << "Null()" << endl; return true; }
102 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
104 cout << "Int(" << i << ")" << endl;
105 if (curr_key.compare("policy_type_id") == 0) {
107 } else if (curr_key.compare("policy_instance_id") == 0) {
108 policy_instance_id = i;
109 } else if (curr_key.compare("threshold") == 0) {
110 found_threshold = true;
116 bool Uint(unsigned u) {
117 cout << "Int(" << u << ")" << endl;
118 if (curr_key.compare("policy_type_id") == 0) {
120 } else if (curr_key.compare("policy_instance_id") == 0) {
121 policy_instance_id = u;
122 } else if (curr_key.compare("threshold") == 0) {
123 found_threshold = true;
129 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
130 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
131 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
132 bool String(const char* str, SizeType length, bool copy) {
133 cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
134 if (curr_key.compare("operation") != 0) {
141 cout << "StartObject()" << endl;
144 bool Key(const char* str, SizeType length, bool copy) {
145 cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
150 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
151 bool StartArray() { cout << "StartArray()" << endl; return true; }
152 bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157 unordered_map<string, int> cell_pred_down;
158 unordered_map<string, int> cell_pred_up;
160 bool ue_id_found = false;
161 string curr_key = "";
162 string curr_value = "";
163 bool down_val = true;
164 bool Null() { cout << "Null()" << endl; return true; }
165 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
166 bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
167 bool Uint(unsigned u) {
168 cout << "Uint(" << u << ")" << endl;
170 cell_pred_down[curr_key] = u;
171 cout << "Setting xput down val for " << curr_key << " to " << u << endl;
174 cell_pred_up[curr_key] = u;
175 cout << "Setting xput up val for " << curr_key << " to " << u << endl;
182 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
183 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
184 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
185 bool String(const char* str, SizeType length, bool copy) {
186 cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
190 bool StartObject() { cout << "StartObject()" << endl; return true; }
191 bool Key(const char* str, SizeType length, bool copy) {
192 cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
194 cout << "Found UE ID\n";
202 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
203 bool StartArray() { cout << "StartArray()" << endl; return true; }
204 bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
208 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
209 unordered_map<string, string> cell_pred;
210 std::string serving_cell_id;
211 int serving_cell_rsrp;
212 int serving_cell_rsrq;
213 int serving_cell_sinr;
214 bool in_serving_array = false;
215 int rf_meas_index = 0;
217 string curr_key = "";
218 string curr_value = "";
219 bool Null() { cout << "Null()" << endl; return true; }
220 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
222 fprintf(stderr, "Int(%d)\n", i);
223 if (in_serving_array) {
224 fprintf(stderr, "we are in serving array\n");
225 switch(rf_meas_index) {
227 serving_cell_rsrp = i;
230 serving_cell_rsrq = i;
233 serving_cell_sinr = i;
240 bool Uint(unsigned u) {
241 fprintf(stderr, "Int(%d)\n", u); return true; }
242 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
243 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
244 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
245 bool String(const char* str, SizeType length, bool copy) {
246 fprintf(stderr,"String(%s)\n", str);
247 if (curr_key.compare("ServingCellID") == 0) {
248 serving_cell_id = str;
253 bool StartObject() { cout << "StartObject()" << endl; return true; }
254 bool Key(const char* str, SizeType length, bool copy) {
255 fprintf(stderr,"Key(%s)\n", str);
259 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
261 fprintf(stderr,"StartArray()");
262 if (curr_key.compare("ServingCellRF") == 0) {
263 in_serving_array = true;
268 bool EndArray(SizeType elementCount) {
269 fprintf(stderr, "EndArray()\n");
270 if (curr_key.compare("servingCellRF") == 0) {
271 in_serving_array = false;
279 unordered_map<string, UEData> get_sdl_ue_data() {
281 fprintf(stderr, "In get_sdl_ue_data()\n");
283 unordered_map<string, string> ue_data;
285 unordered_map<string, UEData> return_ue_data_map;
287 std::string prefix3="12";
288 Keys K2 = sdl->findKeys(nsu, prefix3);
289 DataMap Dk2 = sdl->get(nsu, K2);
294 for(auto si=K2.begin();si!=K2.end();++si){
295 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
296 char val[val_v.size()+1]; // from Data
298 fprintf(stderr, "val size %d\n", val_v.size());
299 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
301 ue_id.assign((std::string)*si);
304 ue_data[ue_id] = ue_json;
307 fprintf(stderr, "after sdl get of ue data\n");
309 fprintf(stderr, "From UE data map\n");
311 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
312 UEDataHandler handler;
314 StringStream ss(map_iter->second.c_str());
315 reader.Parse(ss,handler);
317 string ueID = map_iter->first;
318 string serving_cell_id = handler.serving_cell_id;
319 int serv_rsrp = handler.serving_cell_rsrp;
321 fprintf(stderr,"UE data for %s\n", ueID.c_str());
322 fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
323 fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
325 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
329 fprintf(stderr, "\n");
330 return return_ue_data_map;
333 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
335 int response_to = 0; // max timeout wating for a response
336 int rmtype; // received message type
339 fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
340 fprintf(stderr, "payload is %s\n", payload.get());
343 const char *arg = (const char*)payload.get();
345 PolicyHandler handler;
347 StringStream ss(arg);
348 reader.Parse(ss,handler);
350 //Set the threshold value
352 if (handler.found_threshold) {
353 rsrp_threshold = handler.threshold;
356 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
357 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
362 void send_prediction_request(vector<string> ues_to_predict) {
364 std::unique_ptr<Message> msg;
365 Msg_component payload; // special type of unique pointer to the payload
368 int response_to = 0; // max timeout wating for a response
372 Msg_component send_payload;
374 fprintf(stderr, "cb 1\n");
376 msg = xfw->Alloc_msg( 2048 );
378 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
380 fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
384 fprintf(stderr, "cb 2");
386 string ues_list = "[";
388 for (int i = 0; i < ues_to_predict.size(); i++) {
389 if (i == ues_to_predict.size() - 1) {
390 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
392 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
396 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
398 const char *body = message_body.c_str();
400 // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
402 send_payload = msg->Get_payload(); // direct access to payload
403 // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
404 // snprintf( (char *) send_payload.get(), 2048, body);
405 snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
407 fprintf(stderr, "message body %s\n", send_payload.get());
409 fprintf(stderr, "cb 3");
410 fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
412 // payload updated in place, nothing to copy from, so payload parm is nil
413 if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
414 fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
417 fprintf(stderr, "cb 4");
420 msg = xfw->Receive( response_to );
422 rmtype = msg->Get_mtype();
423 send_payload = msg->Get_payload();
424 fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
430 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
438 int response_to = 0; // max timeout wating for a response
441 int rmtype; // received message type
442 int delay = 1000000; // mu-sec delay; default 1s
444 cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
445 cout << "payload is " << payload.get() << "\n";
449 cout << "prediction callback 1" << endl;
451 const char* arg = (const char*)payload.get();
453 cout << "ready to parse " << arg << endl;
455 PredictionHandler handler;
460 StringStream ss(arg);
461 reader.Parse(ss,handler);
463 cout << "got an exception on stringstream read parse\n";
466 std::string pred_ue_id = handler.ue_id;
468 cout << "Prediction for " << pred_ue_id << endl;
470 unordered_map<string, int> throughput_map = handler.cell_pred_down;
475 unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
477 //Decision about CONTROL message
478 //(1) Identify UE Id in Prediction message
479 //(2) Get UEData struct for this UE Id
480 //(3) Identify the UE's service cell ID
481 //(4) Iterate through Prediction message.
482 // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
484 UEData pred_ue_data = sdl_data[pred_ue_id];
485 std::string serving_cell_id = pred_ue_data.serving_cell;
487 int serving_cell_throughput;
488 int highest_throughput;
489 std::string highest_throughput_cell_id;
490 std::string::size_type str_size;
492 cout << "Going through throughtput map:" << endl;
494 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
495 cout << map_iter->first << " : " << map_iter->second << endl;
496 std::string curr_cellid = map_iter->first;
497 cout << "Cell ID is " << curr_cellid;
498 int curr_throughput = map_iter->second;
499 cout << "Throughput is " << curr_throughput << endl;
501 if (curr_cellid.compare(serving_cell_id) == 0) {
502 serving_cell_throughput = curr_throughput;
503 highest_throughput = serving_cell_throughput;
508 //Iterating again to identify the highest throughput prediction
510 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
511 cout << map_iter->first << " : " << map_iter->second << endl;
512 std::string curr_cellid = map_iter->first;
513 cout << "Cell ID is " << curr_cellid;
514 int curr_throughput = map_iter->second;
515 cout << "Throughput is " << curr_throughput << endl;
517 if (curr_throughput > serving_cell_throughput) {
518 highest_throughput = curr_throughput;
519 highest_throughput_cell_id = curr_cellid;
523 if (highest_throughput > serving_cell_throughput) {
524 cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
525 cout << "UE ID: " << pred_ue_id << endl;
526 cout << "Source cell " << serving_cell_id << endl;
527 cout << "Target cell " << highest_throughput_cell_id << endl;
530 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
531 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
537 //This function runs a loop that continuously checks SDL for any UE
541 cout << "in run_loop()\n";
543 unordered_map<string, UEData> uemap;
545 vector<string> prediction_ues;
549 cout << "in while loop\n";
551 uemap = get_sdl_ue_data();
553 for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
554 string ueid = map_iter->first;
555 UEData data = map_iter->second;
556 if (data.serving_cell_rsrp < rsrp_threshold) {
557 prediction_ues.push_back(ueid);
561 if (prediction_ues.size() > 0) {
562 send_prediction_request(prediction_ues);
571 extern int main( int argc, char** argv ) {
575 char* port = (char *) "4560";
577 sdl = shareddatalayer::SyncStorage::create();
579 nsu = Namespace(sdl_namespace_u);
580 nsc = Namespace(sdl_namespace_c);
583 fprintf( stderr, "<XAPP> listening on port: %s\n", port );
584 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
585 fprintf(stderr, "code1\n");
588 xfw->Add_msg_cb( 20010, policy_callback, NULL );
589 xfw->Add_msg_cb( 30002, prediction_callback, NULL );
591 fprintf(stderr, "code2\n");
593 std::thread loop_thread;
595 loop_thread = std::thread(&run_loop);
597 xfw->Run( nthreads );