3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: Traffic Steering xApp;
25 2. Queries SDL to decide which UE to attempt Traffic Steering for
26 3. Requests prediction for UE throughput on current and neighbor cells
27 4. Receives prediction
28 5. Optionally exercises Traffic Steering action over E2
43 #include <sdl/syncstorage.hpp>
48 #include <unordered_map>
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
57 #include "ricxfcpp/xapp.hpp"
59 using namespace rapidjson;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
68 // ----------------------------------------------------------
70 std::unique_ptr<Xapp> xfw;
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
75 int rsrp_threshold = 0;
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
84 int serving_cell_rsrp;
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89 unordered_map<string, string> cell_pred;
91 bool ue_id_found = false;
93 string curr_value = "";
95 int policy_instance_id;
97 std::string operation;
98 bool found_threshold = false;
101 bool Null() { cout << "Null()" << endl; return true; }
102 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
104 cout << "Int(" << i << ")" << endl;
105 if (curr_key.compare("policy_type_id") == 0) {
107 } else if (curr_key.compare("policy_instance_id") == 0) {
108 policy_instance_id = i;
109 } else if (curr_key.compare("threshold") == 0) {
110 found_threshold = true;
116 bool Uint(unsigned u) {
117 cout << "Int(" << u << ")" << endl;
118 if (curr_key.compare("policy_type_id") == 0) {
120 } else if (curr_key.compare("policy_instance_id") == 0) {
121 policy_instance_id = u;
122 } else if (curr_key.compare("threshold") == 0) {
123 found_threshold = true;
129 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
130 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
131 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
132 bool String(const char* str, SizeType length, bool copy) {
133 cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
134 if (curr_key.compare("operation") != 0) {
141 cout << "StartObject()" << endl;
144 bool Key(const char* str, SizeType length, bool copy) {
145 cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
150 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
151 bool StartArray() { cout << "StartArray()" << endl; return true; }
152 bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157 unordered_map<string, string> cell_pred;
159 bool ue_id_found = false;
160 string curr_key = "";
161 string curr_value = "";
162 bool Null() { cout << "Null()" << endl; return true; }
163 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
164 bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
165 bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; }
166 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
167 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
168 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
169 bool String(const char* str, SizeType length, bool copy) {
170 cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
171 if (curr_key.compare("") != 0) {
172 cout << "Found throughput\n";
174 cell_pred[curr_key] = curr_value;
181 bool StartObject() { cout << "StartObject()" << endl; return true; }
182 bool Key(const char* str, SizeType length, bool copy) {
183 cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
185 cout << "Found UE ID\n";
193 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
194 bool StartArray() { cout << "StartArray()" << endl; return true; }
195 bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
199 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
200 unordered_map<string, string> cell_pred;
201 std::string serving_cell_id;
202 int serving_cell_rsrp;
203 int serving_cell_rsrq;
204 int serving_cell_sinr;
205 bool in_serving_array = false;
206 int rf_meas_index = 0;
208 string curr_key = "";
209 string curr_value = "";
210 bool Null() { cout << "Null()" << endl; return true; }
211 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
213 fprintf(stderr, "Int(%d)\n", i);
214 if (in_serving_array) {
215 fprintf(stderr, "we are in serving array\n");
216 switch(rf_meas_index) {
218 serving_cell_rsrp = i;
221 serving_cell_rsrq = i;
224 serving_cell_sinr = i;
231 bool Uint(unsigned u) {
232 fprintf(stderr, "Int(%d)\n", u); return true; }
233 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
234 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
235 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
236 bool String(const char* str, SizeType length, bool copy) {
237 fprintf(stderr,"String(%s)\n", str);
238 if (curr_key.compare("ServingCellID") == 0) {
239 serving_cell_id = str;
244 bool StartObject() { cout << "StartObject()" << endl; return true; }
245 bool Key(const char* str, SizeType length, bool copy) {
246 fprintf(stderr,"Key(%s)\n", str);
250 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
252 fprintf(stderr,"StartArray()");
253 if (curr_key.compare("ServingCellRF") == 0) {
254 in_serving_array = true;
259 bool EndArray(SizeType elementCount) {
260 fprintf(stderr, "EndArray()\n");
261 if (curr_key.compare("servingCellRF") == 0) {
262 in_serving_array = false;
270 unordered_map<string, UEData> get_sdl_ue_data() {
272 fprintf(stderr, "In get_sdl_ue_data()\n");
274 unordered_map<string, string> ue_data;
276 unordered_map<string, UEData> return_ue_data_map;
278 std::string prefix3="12";
279 Keys K2 = sdl->findKeys(nsu, prefix3);
280 DataMap Dk2 = sdl->get(nsu, K2);
285 for(auto si=K2.begin();si!=K2.end();++si){
286 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
287 char val[val_v.size()+1]; // from Data
289 fprintf(stderr, "val size %d\n", val_v.size());
290 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
292 ue_id.assign((std::string)*si);
295 ue_data[ue_id] = ue_json;
298 fprintf(stderr, "after sdl get of ue data\n");
300 fprintf(stderr, "From UE data map\n");
302 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
303 UEDataHandler handler;
305 StringStream ss(map_iter->second.c_str());
306 reader.Parse(ss,handler);
308 string ueID = map_iter->first;
309 string serving_cell_id = handler.serving_cell_id;
310 int serv_rsrp = handler.serving_cell_rsrp;
312 fprintf(stderr,"UE data for %s\n", ueID.c_str());
313 fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
314 fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
316 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
320 fprintf(stderr, "\n");
321 return return_ue_data_map;
324 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
326 int response_to = 0; // max timeout wating for a response
327 int rmtype; // received message type
330 fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
331 fprintf(stderr, "payload is %s\n", payload.get());
333 //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
334 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
335 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
337 const char *arg = (const char*)payload.get();
339 PolicyHandler handler;
341 StringStream ss(arg);
342 reader.Parse(ss,handler);
344 //Set the threshold value
346 if (handler.found_threshold) {
347 rsrp_threshold = handler.threshold;
352 void send_prediction_request(vector<string> ues_to_predict) {
354 std::unique_ptr<Message> msg;
355 Msg_component payload; // special type of unique pointer to the payload
358 int response_to = 0; // max timeout wating for a response
362 Msg_component send_payload;
364 fprintf(stderr, "cb 1\n");
366 msg = xfw->Alloc_msg( 2048 );
368 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
370 fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
374 fprintf(stderr, "cb 2");
376 string ues_list = "[";
378 for (int i = 0; i < ues_to_predict.size(); i++) {
379 if (i == ues_to_predict.size() - 1) {
380 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
382 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
386 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
388 const char *body = message_body.c_str();
390 // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
392 send_payload = msg->Get_payload(); // direct access to payload
393 // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
394 // snprintf( (char *) send_payload.get(), 2048, body);
395 snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
397 fprintf(stderr, "message body %s\n", send_payload.get());
399 fprintf(stderr, "cb 3");
400 fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
402 // payload updated in place, nothing to copy from, so payload parm is nil
403 if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
404 fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
407 fprintf(stderr, "cb 4");
410 msg = xfw->Receive( response_to );
412 rmtype = msg->Get_mtype();
413 send_payload = msg->Get_payload();
414 fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
420 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
428 int response_to = 0; // max timeout wating for a response
431 int rmtype; // received message type
432 int delay = 1000000; // mu-sec delay; default 1s
434 fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
435 fprintf(stderr, "payload is %s\n", payload.get());
437 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
438 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
442 fprintf(stderr, "cb 1\n");
444 const char* arg = (const char*)payload.get();
446 PredictionHandler handler;
448 StringStream ss(arg);
449 reader.Parse(ss,handler);
451 std::string pred_ue_id = handler.ue_id;
453 cout << "Prediction for " << pred_ue_id << endl;
455 unordered_map<string, string> throughput_map = handler.cell_pred;
460 unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
462 //Decision about CONTROL message
463 //(1) Identify UE Id in Prediction message
464 //(2) Get UEData struct for this UE Id
465 //(3) Identify the UE's service cell ID
466 //(4) Iterate through Prediction message.
467 // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
469 UEData pred_ue_data = sdl_data[pred_ue_id];
470 std::string serving_cell_id = pred_ue_data.serving_cell;
472 int serving_cell_throughput;
473 int highest_throughput;
474 std::string highest_throughput_cell_id;
475 std::string::size_type str_size;
477 cout << "Going through throughtput map:" << endl;
479 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
480 cout << map_iter->first << " : " << map_iter->second << endl;
481 std::string curr_cellid = map_iter->first;
482 cout << "Cell ID is " << curr_cellid;
483 int curr_throughput = stoi(map_iter->second, &str_size);
484 cout << "Throughput is " << curr_throughput << endl;
486 if (curr_cellid.compare(serving_cell_id) == 0) {
487 serving_cell_throughput = curr_throughput;
488 highest_throughput = serving_cell_throughput;
493 //Iterating again to identify the highest throughput prediction
495 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
496 cout << map_iter->first << " : " << map_iter->second << endl;
497 std::string curr_cellid = map_iter->first;
498 cout << "Cell ID is " << curr_cellid;
499 int curr_throughput = stoi(map_iter->second, &str_size);
500 cout << "Throughput is " << curr_throughput << endl;
502 if (curr_throughput > serving_cell_throughput) {
503 highest_throughput = curr_throughput;
504 highest_throughput_cell_id = curr_cellid;
508 if (highest_throughput > serving_cell_throughput) {
509 cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
510 cout << "UE ID: " << pred_ue_id << endl;
511 cout << "Source cell " << serving_cell_id << endl;
512 cout << "Target cell " << highest_throughput_cell_id << endl;
519 //This function runs a loop that continuously checks SDL for any UE
523 fprintf(stderr, "in run_loop()\n");
525 unordered_map<string, UEData> uemap;
527 vector<string> prediction_ues;
531 fprintf(stderr, "in while loop\n");
533 uemap = get_sdl_ue_data();
535 for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
536 string ueid = map_iter->first;
537 UEData data = map_iter->second;
538 if (data.serving_cell_rsrp < rsrp_threshold) {
539 prediction_ues.push_back(ueid);
543 if (prediction_ues.size() > 0) {
544 send_prediction_request(prediction_ues);
553 extern int main( int argc, char** argv ) {
557 char* port = (char *) "4560";
559 sdl = shareddatalayer::SyncStorage::create();
561 nsu = Namespace(sdl_namespace_u);
562 nsc = Namespace(sdl_namespace_c);
565 fprintf( stderr, "<XAPP> listening on port: %s\n", port );
566 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
567 fprintf(stderr, "code1\n");
570 xfw->Add_msg_cb( 20010, policy_callback, NULL );
571 xfw->Add_msg_cb( 30002, prediction_callback, NULL );
573 fprintf(stderr, "code2\n");
575 std::thread loop_thread;
577 loop_thread = std::thread(&run_loop);
579 xfw->Run( nthreads );