Removed unnecessary logging messages
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ts_xapp.cpp
23         Abstract:       Traffic Steering xApp;
24                                1. Receives A1 Policy
25                                2. Queries SDL to decide which UE to attempt Traffic Steering for
26                                3. Requests prediction for UE throughput on current and neighbor cells
27                                4. Receives prediction
28                                5. Optionally exercises Traffic Steering action over E2
29
30         Date:           22 April 2020
31         Author:         Ron Shacham
32                 
33 */
34
35 #include <stdio.h>
36 #include <string.h>
37 #include <unistd.h>
38
39 #include <thread>
40 #include <iostream>
41 #include <memory>
42
43 #include <sdl/syncstorage.hpp>
44 #include <set>
45 #include <map>
46 #include <vector>
47 #include <string>
48 #include <unordered_map>
49
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55
56
57 #include "ricxfcpp/xapp.hpp"
58
59 using namespace rapidjson;
60 using namespace std;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
66
67
68 // ----------------------------------------------------------
69
70 std::unique_ptr<Xapp> xfw;
71
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
74
75 int rsrp_threshold = 0;
76
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
78
79 Namespace nsu;
80 Namespace nsc;
81
82 struct UEData {
83   string serving_cell;
84   int serving_cell_rsrp;
85
86 };
87
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89   unordered_map<string, string> cell_pred;
90   std::string ue_id;
91   bool ue_id_found = false;
92   string curr_key = "";
93   string curr_value = "";
94   int policy_type_id;
95   int policy_instance_id;
96   int threshold;
97   std::string operation;
98   bool found_threshold = false;
99
100   
101   bool Null() { return true; }
102   bool Bool(bool b) { return true; }
103   bool Int(int i) {
104
105     if (curr_key.compare("policy_type_id") == 0) {
106       policy_type_id = i;
107     } else if (curr_key.compare("policy_instance_id") == 0) {
108       policy_instance_id = i;
109     } else if (curr_key.compare("threshold") == 0) {
110       found_threshold = true;
111       threshold = i;
112     }
113
114     return true;
115   }
116   bool Uint(unsigned u) {
117
118     if (curr_key.compare("policy_type_id") == 0) {
119       policy_type_id = u;
120     } else if (curr_key.compare("policy_instance_id") == 0) {
121       policy_instance_id = u;
122     } else if (curr_key.compare("threshold") == 0) {
123       found_threshold = true;
124       threshold = u;
125     }
126
127     return true;
128   }    
129   bool Int64(int64_t i) {  return true; }
130   bool Uint64(uint64_t u) {  return true; }
131   bool Double(double d) {  return true; }
132   bool String(const char* str, SizeType length, bool copy) {
133     
134     if (curr_key.compare("operation") != 0) {
135       operation = str;
136     }
137
138     return true;
139   }
140   bool StartObject() {
141
142     return true;
143   }
144   bool Key(const char* str, SizeType length, bool copy) {
145
146     curr_key = str;
147
148     return true;
149   }
150   bool EndObject(SizeType memberCount) {  return true; }
151   bool StartArray() {  return true; }
152   bool EndArray(SizeType elementCount) {  return true; }
153
154 };
155
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157   unordered_map<string, int> cell_pred_down;
158   unordered_map<string, int> cell_pred_up;
159   std::string ue_id;
160   bool ue_id_found = false;
161   string curr_key = "";
162   string curr_value = "";
163   bool down_val = true;
164   bool Null() {  return true; }
165   bool Bool(bool b) {  return true; }
166   bool Int(int i) {  return true; }
167   bool Uint(unsigned u) {    
168
169     if (down_val) {
170       cell_pred_down[curr_key] = u;
171       down_val = false;
172     } else {
173       cell_pred_up[curr_key] = u;
174       down_val = true;
175     }
176
177     return true;    
178
179   }
180   bool Int64(int64_t i) {  return true; }
181   bool Uint64(uint64_t u) {  return true; }
182   bool Double(double d) {  return true; }
183   bool String(const char* str, SizeType length, bool copy) {
184
185     return true;
186   }
187   bool StartObject() {  return true; }
188   bool Key(const char* str, SizeType length, bool copy) {
189     if (!ue_id_found) {
190
191       ue_id = str;
192       ue_id_found = true;
193     } else {
194       curr_key = str;
195     }
196     return true;
197   }
198   bool EndObject(SizeType memberCount) {  return true; }
199   bool StartArray() {  return true; }
200   bool EndArray(SizeType elementCount) {  return true; }
201 };
202
203
204 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
205   unordered_map<string, string> cell_pred;
206   std::string serving_cell_id;
207   int serving_cell_rsrp;
208   int serving_cell_rsrq;
209   int serving_cell_sinr;
210   bool in_serving_array = false;
211   int rf_meas_index = 0;
212
213   string curr_key = "";
214   string curr_value = "";
215   bool Null() { return true; }
216   bool Bool(bool b) { return true; }
217   bool Int(int i) {
218
219     if (in_serving_array) {
220
221       switch(rf_meas_index) {
222       case 0:
223         serving_cell_rsrp = i;
224         break;
225       case 1:
226         serving_cell_rsrq = i;
227         break;
228       case 2:
229         serving_cell_sinr = i;
230         break;
231       }
232       rf_meas_index++;
233     }
234     return true;
235   }
236   bool Uint(unsigned u) {
237     return true; }
238   bool Int64(int64_t i) { return true; }
239   bool Uint64(uint64_t u) { return true; }
240   bool Double(double d) { return true; }
241   bool String(const char* str, SizeType length, bool copy) {
242     
243     if (curr_key.compare("ServingCellID") == 0) {
244       serving_cell_id = str;
245     } 
246
247     return true;
248   }
249   bool StartObject() { return true; }
250   bool Key(const char* str, SizeType length, bool copy) {
251     
252     curr_key = str;
253     return true;
254   }
255   bool EndObject(SizeType memberCount) { return true; }
256   bool StartArray() {
257
258     if (curr_key.compare("ServingCellRF") == 0) {
259       in_serving_array = true;
260     }
261     
262     return true;
263   }
264   bool EndArray(SizeType elementCount) {
265
266     if (curr_key.compare("servingCellRF") == 0) {
267       in_serving_array = false;
268       rf_meas_index = 0;
269     }
270
271     return true; }
272 };
273
274
275 unordered_map<string, UEData> get_sdl_ue_data() {
276
277   fprintf(stderr, "In get_sdl_ue_data()\n");
278
279   unordered_map<string, string> ue_data;
280
281   unordered_map<string, UEData> return_ue_data_map;
282     
283   std::string prefix3="12";
284   Keys K2 = sdl->findKeys(nsu, prefix3);
285   DataMap Dk2 = sdl->get(nsu, K2);
286   
287   string ue_json;
288   string ue_id;
289   
290   for(auto si=K2.begin();si!=K2.end();++si){
291     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
292     char val[val_v.size()+1];                               // from Data
293     int i;
294
295     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
296     val[i]='\0';
297       ue_id.assign((std::string)*si);
298       
299       ue_json.assign(val);
300       ue_data[ue_id] =  ue_json;
301   }
302   
303   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
304     UEDataHandler handler;
305     Reader reader;
306     StringStream ss(map_iter->second.c_str());
307     reader.Parse(ss,handler);
308
309     string ueID = map_iter->first;
310     string serving_cell_id = handler.serving_cell_id;
311     int serv_rsrp = handler.serving_cell_rsrp;
312     
313     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
314     
315   }  
316
317   return return_ue_data_map;
318 }
319
320 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
321
322   int response_to = 0;   // max timeout wating for a response
323   int rmtype;           // received message type
324
325   
326   cout <<  "Policy Callback got a message, type=" << mtype << " , length=" << len << endl;
327   cout <<  "payload is " << payload.get() << endl;
328   
329
330   const char *arg = (const char*)payload.get();
331
332   PolicyHandler handler;
333   Reader reader;
334   StringStream ss(arg);
335   reader.Parse(ss,handler);
336
337   //Set the threshold value
338
339   if (handler.found_threshold) {
340     cout << "Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
341     rsrp_threshold = handler.threshold;
342   }
343
344   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
345   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
346   
347   
348 }
349
350 void send_prediction_request(vector<string> ues_to_predict) {
351
352   std::unique_ptr<Message> msg;
353   Msg_component payload;                                // special type of unique pointer to the payload
354   
355   int nthreads = 1;  
356   int response_to = 0;   // max timeout wating for a response
357   int mtype = 30000;
358   int sz;
359   int i;
360   Msg_component send_payload;
361   
362   msg = xfw->Alloc_msg( 2048 );
363   
364   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
365   if( sz < 2048 ) {
366     fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
367     exit( 1 );
368   }
369
370   string ues_list = "[";
371
372   for (int i = 0; i < ues_to_predict.size(); i++) {
373     if (i == ues_to_predict.size() - 1) {
374       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
375     } else {
376       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
377     }
378   }
379
380   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
381
382   const char *body = message_body.c_str();
383
384   //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
385   
386   send_payload = msg->Get_payload(); // direct access to payload
387   //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
388   //  snprintf( (char *) send_payload.get(), 2048, body);
389   snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
390
391   fprintf(stderr, "message body %s\n", send_payload.get());  
392   fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
393   
394   // payload updated in place, nothing to copy from, so payload parm is nil
395   if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
396     fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
397   }
398
399   /*
400   msg = xfw->Receive( response_to );
401   if( msg != NULL ) {
402     rmtype = msg->Get_mtype();
403     send_payload = msg->Get_payload();
404     fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
405   } 
406   */
407
408 }
409
410 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
411
412   long now;
413   long total_count;
414
415   int sz;
416   int i;
417
418   int response_to = 0;   // max timeout wating for a response
419
420   int send_mtype = 0;
421   int rmtype;                                                   // received message type
422   int delay = 1000000;                          // mu-sec delay; default 1s
423
424   cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
425   cout << "payload is " << payload.get() << "\n";
426
427   mtype = 0;
428
429   const char* arg = (const char*)payload.get();
430   PredictionHandler handler;
431
432   try {
433
434     Reader reader;
435     StringStream ss(arg);
436     reader.Parse(ss,handler);
437   } catch (...) {
438     cout << "got an exception on stringstream read parse\n";
439   }
440   
441   std::string pred_ue_id = handler.ue_id;
442   
443   cout << "Prediction for " << pred_ue_id << endl;
444   
445   unordered_map<string, int> throughput_map = handler.cell_pred_down;
446
447   cout << endl;
448  
449   unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
450
451   //Decision about CONTROL message
452   //(1) Identify UE Id in Prediction message
453   //(2) Get UEData struct for this UE Id
454   //(3) Identify the UE's service cell ID
455   //(4) Iterate through Prediction message.
456   //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
457
458   UEData pred_ue_data = sdl_data[pred_ue_id];
459   std::string serving_cell_id = pred_ue_data.serving_cell;
460
461   int serving_cell_throughput;
462   int highest_throughput;
463   std::string highest_throughput_cell_id;
464   std::string::size_type str_size;
465
466   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
467
468     std::string curr_cellid = map_iter->first;
469     int curr_throughput = map_iter->second;
470
471     if (curr_cellid.compare(serving_cell_id) == 0) {
472       serving_cell_throughput = curr_throughput;
473       highest_throughput = serving_cell_throughput;
474     }
475
476   }
477
478   //Iterating again to identify the highest throughput prediction
479
480   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
481
482     std::string curr_cellid = map_iter->first;
483     int curr_throughput = map_iter->second;
484
485     if (curr_throughput > serving_cell_throughput) {
486       highest_throughput = curr_throughput;
487       highest_throughput_cell_id = curr_cellid;
488     }
489   }
490
491   if (highest_throughput > serving_cell_throughput) {
492     cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
493     cout << "UE ID: " << pred_ue_id << endl;
494     cout << "Source cell " << serving_cell_id << endl;
495     cout << "Target cell " << highest_throughput_cell_id << endl;
496   }
497
498   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
499   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
500   
501   
502 }
503
504
505 //This function runs a loop that continuously checks SDL for any UE
506
507 void run_loop() {
508
509   cout << "in Traffic Steering run_loop()\n";
510
511   unordered_map<string, UEData> uemap;
512
513   vector<string> prediction_ues;
514
515   while (1) {
516
517     uemap = get_sdl_ue_data();
518
519     for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
520       string ueid = map_iter->first;
521       UEData data = map_iter->second;
522       if (data.serving_cell_rsrp < rsrp_threshold) {
523         prediction_ues.push_back(ueid);
524       }
525     }
526
527     if (prediction_ues.size() > 0) {
528       send_prediction_request(prediction_ues);
529     }
530
531     sleep(20);
532   }
533 }
534
535
536
537 extern int main( int argc, char** argv ) {
538
539   int nthreads = 1;
540
541   char* port = (char *) "4560";
542
543   sdl = shareddatalayer::SyncStorage::create();
544
545   nsu = Namespace(sdl_namespace_u);
546   nsc = Namespace(sdl_namespace_c);
547
548   
549   fprintf( stderr, "<XAPP> listening on port: %s\n", port );
550   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); 
551   
552   xfw->Add_msg_cb( 20010, policy_callback, NULL );
553   xfw->Add_msg_cb( 30002, prediction_callback, NULL );
554   
555   std::thread loop_thread;
556
557   loop_thread = std::thread(&run_loop);
558
559   xfw->Run( nthreads );
560   
561 }