3 ==================================================================================
4 Copyright (c) 2020 Nokia
5 Copyright (c) 2020 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: Traffic Steering xApp;
25 2. Queries SDL to decide which UE to attempt Traffic Steering for
26 3. Requests prediction for UE throughput on current and neighbor cells
27 4. Receives prediction
28 5. Optionally exercises Traffic Steering action over E2
43 #include <sdl/syncstorage.hpp>
48 #include <unordered_map>
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
57 #include "ricxfcpp/xapp.hpp"
59 using namespace rapidjson;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
68 // ----------------------------------------------------------
70 std::unique_ptr<Xapp> xfw;
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
75 int rsrp_threshold = 0;
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
84 int serving_cell_rsrp;
88 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
89 unordered_map<string, string> cell_pred;
91 bool ue_id_found = false;
93 string curr_value = "";
94 bool Null() { cout << "Null()" << endl; return true; }
95 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
96 bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
97 bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; }
98 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
99 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
100 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
101 bool String(const char* str, SizeType length, bool copy) {
102 cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
103 if (curr_key.compare("") != 0) {
104 cout << "Found throughput\n";
106 cell_pred[curr_key] = curr_value;
113 bool StartObject() { cout << "StartObject()" << endl; return true; }
114 bool Key(const char* str, SizeType length, bool copy) {
115 cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
117 cout << "Found UE ID\n";
125 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
126 bool StartArray() { cout << "StartArray()" << endl; return true; }
127 bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
131 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
132 unordered_map<string, string> cell_pred;
133 std::string serving_cell_id;
134 int serving_cell_rsrp;
135 int serving_cell_rsrq;
136 int serving_cell_sinr;
137 bool in_serving_array = false;
138 int rf_meas_index = 0;
140 string curr_key = "";
141 string curr_value = "";
142 bool Null() { cout << "Null()" << endl; return true; }
143 bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
145 fprintf(stderr, "Int(%d)\n", i);
146 if (in_serving_array) {
147 fprintf(stderr, "we are in serving array\n");
148 switch(rf_meas_index) {
150 serving_cell_rsrp = i;
153 serving_cell_rsrq = i;
156 serving_cell_sinr = i;
163 bool Uint(unsigned u) {
164 fprintf(stderr, "Int(%d)\n", u); return true; }
165 bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
166 bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
167 bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
168 bool String(const char* str, SizeType length, bool copy) {
169 fprintf(stderr,"String(%s)\n", str);
170 if (curr_key.compare("ServingCellID") == 0) {
171 serving_cell_id = str;
176 bool StartObject() { cout << "StartObject()" << endl; return true; }
177 bool Key(const char* str, SizeType length, bool copy) {
178 fprintf(stderr,"Key(%s)\n", str);
182 bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
184 fprintf(stderr,"StartArray()");
185 if (curr_key.compare("ServingCellRF") == 0) {
186 in_serving_array = true;
191 bool EndArray(SizeType elementCount) {
192 fprintf(stderr, "EndArray()\n");
193 if (curr_key.compare("servingCellRF") == 0) {
194 in_serving_array = false;
202 unordered_map<string, UEData> get_sdl_ue_data() {
204 fprintf(stderr, "In get_sdl_ue_data()\n");
206 unordered_map<string, string> ue_data;
208 unordered_map<string, UEData> return_ue_data_map;
210 std::string prefix3="12";
211 Keys K2 = sdl->findKeys(nsu, prefix3);
212 DataMap Dk2 = sdl->get(nsu, K2);
217 for(auto si=K2.begin();si!=K2.end();++si){
218 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
219 char val[val_v.size()+1]; // from Data
221 fprintf(stderr, "val size %d\n", val_v.size());
222 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
224 ue_id.assign((std::string)*si);
227 ue_data[ue_id] = ue_json;
230 fprintf(stderr, "after sdl get of ue data\n");
232 fprintf(stderr, "From UE data map\n");
234 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
235 UEDataHandler handler;
237 StringStream ss(map_iter->second.c_str());
238 reader.Parse(ss,handler);
240 string ueID = map_iter->first;
241 string serving_cell_id = handler.serving_cell_id;
242 int serv_rsrp = handler.serving_cell_rsrp;
244 fprintf(stderr,"UE data for %s\n", ueID.c_str());
245 fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
246 fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
248 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
252 fprintf(stderr, "\n");
253 return return_ue_data_map;
256 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
258 int response_to = 0; // max timeout wating for a response
259 int rmtype; // received message type
262 fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
263 fprintf(stderr, "payload is %s\n", payload.get());
265 //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
266 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
267 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
269 //Set the threshold value
274 void send_prediction_request(vector<string> ues_to_predict) {
276 std::unique_ptr<Message> msg;
277 Msg_component payload; // special type of unique pointer to the payload
280 int response_to = 0; // max timeout wating for a response
284 Msg_component send_payload;
286 fprintf(stderr, "cb 1\n");
288 msg = xfw->Alloc_msg( 2048 );
290 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
292 fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
296 fprintf(stderr, "cb 2");
298 string ues_list = "[";
300 for (int i = 0; i < ues_to_predict.size(); i++) {
301 if (i == ues_to_predict.size() - 1) {
302 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
304 ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
308 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
310 const char *body = message_body.c_str();
312 // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
314 send_payload = msg->Get_payload(); // direct access to payload
315 // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
316 // snprintf( (char *) send_payload.get(), 2048, body);
317 snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
319 fprintf(stderr, "message body %s\n", send_payload.get());
321 fprintf(stderr, "cb 3");
322 fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
324 // payload updated in place, nothing to copy from, so payload parm is nil
325 if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
326 fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
329 fprintf(stderr, "cb 4");
332 msg = xfw->Receive( response_to );
334 rmtype = msg->Get_mtype();
335 send_payload = msg->Get_payload();
336 fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
342 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
350 int response_to = 0; // max timeout wating for a response
353 int rmtype; // received message type
354 int delay = 1000000; // mu-sec delay; default 1s
356 fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
357 fprintf(stderr, "payload is %s\n", payload.get());
359 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
360 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
364 fprintf(stderr, "cb 1\n");
366 char *incoming_msg = "{\"12345\": {\"222\": \"20000\", \"333\" : \"50000\"} }";
368 PredictionHandler handler;
370 StringStream ss(incoming_msg);
371 reader.Parse(ss,handler);
373 std::string pred_ue_id = handler.ue_id;
375 cout << "Prediction for " << pred_ue_id << endl;
377 unordered_map<string, string> throughput_map = handler.cell_pred;
382 unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
384 //Decision about CONTROL message
385 //(1) Identify UE Id in Prediction message
386 //(2) Get UEData struct for this UE Id
387 //(3) Identify the UE's service cell ID
388 //(4) Iterate through Prediction message.
389 // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
391 UEData pred_ue_data = sdl_data[pred_ue_id];
392 std::string serving_cell_id = pred_ue_data.serving_cell;
394 int serving_cell_throughput;
395 int highest_throughput;
396 std::string highest_throughput_cell_id;
397 std::string::size_type str_size;
399 cout << "Going through throughtput map:" << endl;
401 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
402 cout << map_iter->first << " : " << map_iter->second << endl;
403 std::string curr_cellid = map_iter->first;
404 cout << "Cell ID is " << curr_cellid;
405 int curr_throughput = stoi(map_iter->second, &str_size);
406 cout << "Throughput is " << curr_throughput << endl;
408 if (curr_cellid.compare(serving_cell_id) == 0) {
409 serving_cell_throughput = curr_throughput;
410 highest_throughput = serving_cell_throughput;
415 //Iterating again to identify the highest throughput prediction
417 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
418 cout << map_iter->first << " : " << map_iter->second << endl;
419 std::string curr_cellid = map_iter->first;
420 cout << "Cell ID is " << curr_cellid;
421 int curr_throughput = stoi(map_iter->second, &str_size);
422 cout << "Throughput is " << curr_throughput << endl;
424 if (curr_throughput > serving_cell_throughput) {
425 highest_throughput = curr_throughput;
426 highest_throughput_cell_id = curr_cellid;
430 if (highest_throughput > serving_cell_throughput) {
431 cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
432 cout << "UE ID: " << pred_ue_id << endl;
433 cout << "Source cell " << serving_cell_id << endl;
434 cout << "Target cell " << highest_throughput_cell_id << endl;
441 //This function runs a loop that continuously checks SDL for any UE
445 fprintf(stderr, "in run_loop()\n");
447 unordered_map<string, UEData> uemap;
449 vector<string> prediction_ues;
453 fprintf(stderr, "in while loop\n");
455 uemap = get_sdl_ue_data();
457 for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
458 string ueid = map_iter->first;
459 UEData data = map_iter->second;
460 if (data.serving_cell_rsrp < rsrp_threshold) {
461 prediction_ues.push_back(ueid);
465 if (prediction_ues.size() > 0) {
466 send_prediction_request(prediction_ues);
475 extern int main( int argc, char** argv ) {
479 char* port = (char *) "4560";
481 sdl = shareddatalayer::SyncStorage::create();
483 nsu = Namespace(sdl_namespace_u);
484 nsc = Namespace(sdl_namespace_c);
487 fprintf( stderr, "<XAPP> listening on port: %s\n", port );
488 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
489 fprintf(stderr, "code1\n");
492 xfw->Add_msg_cb( 20010, policy_callback, NULL );
493 xfw->Add_msg_cb( 30002, prediction_callback, NULL );
495 fprintf(stderr, "code2\n");
497 std::thread loop_thread;
499 loop_thread = std::thread(&run_loop);
501 xfw->Run( nthreads );