b32e71ee0ce662e906e974c27a1cf4998d59f22f
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*
21         Mnemonic:       ts_xapp.cpp
22         Abstract:       Traffic Steering xApp;
23                                1. Receives A1 Policy
24                                2. Queries SDL to decide which UE to attempt Traffic Steering for
25                                3. Requests prediction for UE throughput on current and neighbor cells
26                                4. Receives prediction
27                                5. Optionally exercises Traffic Steering action over E2
28
29         Date:           22 April 2020
30         Author:         Ron Shacham
31
32 */
33
34 #include <stdio.h>
35 #include <string.h>
36 #include <unistd.h>
37
38 #include <thread>
39 #include <iostream>
40 #include <memory>
41
42 #include <sdl/syncstorage.hpp>
43 #include <set>
44 #include <map>
45 #include <vector>
46 #include <string>
47 #include <unordered_map>
48
49 #include <rapidjson/document.h>
50 #include <rapidjson/writer.h>
51 #include <rapidjson/stringbuffer.h>
52 #include <rapidjson/schema.h>
53 #include <rapidjson/reader.h>
54
55
56 #include "ricxfcpp/xapp.hpp"
57
58 using namespace rapidjson;
59 using namespace std;
60 using Namespace = std::string;
61 using Key = std::string;
62 using Data = std::vector<uint8_t>;
63 using DataMap = std::map<Key, Data>;
64 using Keys = std::set<Key>;
65
66
67 // ----------------------------------------------------------
68
69 std::unique_ptr<Xapp> xfw;
70
71 std::string sdl_namespace_u = "TS-UE-metrics";
72 std::string sdl_namespace_c = "TS-cell-metrics";
73
74 int rsrp_threshold = 0;
75
76 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
77
78 Namespace nsu;
79 Namespace nsc;
80
81 struct UEData {
82   string serving_cell;
83   int serving_cell_rsrp;
84
85 };
86
87 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
88   unordered_map<string, string> cell_pred;
89   std::string ue_id;
90   bool ue_id_found = false;
91   string curr_key = "";
92   string curr_value = "";
93   int policy_type_id;
94   int policy_instance_id;
95   int threshold;
96   std::string operation;
97   bool found_threshold = false;
98
99   
100   bool Null() { return true; }
101   bool Bool(bool b) { return true; }
102   bool Int(int i) {
103
104     if (curr_key.compare("policy_type_id") == 0) {
105       policy_type_id = i;
106     } else if (curr_key.compare("policy_instance_id") == 0) {
107       policy_instance_id = i;
108     } else if (curr_key.compare("threshold") == 0) {
109       found_threshold = true;
110       threshold = i;
111     }
112
113     return true;
114   }
115   bool Uint(unsigned u) {
116
117     if (curr_key.compare("policy_type_id") == 0) {
118       policy_type_id = u;
119     } else if (curr_key.compare("policy_instance_id") == 0) {
120       policy_instance_id = u;
121     } else if (curr_key.compare("threshold") == 0) {
122       found_threshold = true;
123       threshold = u;
124     }
125
126     return true;
127   }    
128   bool Int64(int64_t i) {  return true; }
129   bool Uint64(uint64_t u) {  return true; }
130   bool Double(double d) {  return true; }
131   bool String(const char* str, SizeType length, bool copy) {
132     
133     if (curr_key.compare("operation") != 0) {
134       operation = str;
135     }
136
137     return true;
138   }
139   bool StartObject() {
140
141     return true;
142   }
143   bool Key(const char* str, SizeType length, bool copy) {
144
145     curr_key = str;
146
147     return true;
148   }
149   bool EndObject(SizeType memberCount) {  return true; }
150   bool StartArray() {  return true; }
151   bool EndArray(SizeType elementCount) {  return true; }
152
153 };
154
155 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
156   unordered_map<string, int> cell_pred_down;
157   unordered_map<string, int> cell_pred_up;
158   std::string ue_id;
159   bool ue_id_found = false;
160   string curr_key = "";
161   string curr_value = "";
162   bool down_val = true;
163   bool Null() {  return true; }
164   bool Bool(bool b) {  return true; }
165   bool Int(int i) {  return true; }
166   bool Uint(unsigned u) {    
167
168     if (down_val) {
169       cell_pred_down[curr_key] = u;
170       down_val = false;
171     } else {
172       cell_pred_up[curr_key] = u;
173       down_val = true;
174     }
175
176     return true;    
177
178   }
179   bool Int64(int64_t i) {  return true; }
180   bool Uint64(uint64_t u) {  return true; }
181   bool Double(double d) {  return true; }
182   bool String(const char* str, SizeType length, bool copy) {
183
184     return true;
185   }
186   bool StartObject() {  return true; }
187   bool Key(const char* str, SizeType length, bool copy) {
188     if (!ue_id_found) {
189
190       ue_id = str;
191       ue_id_found = true;
192     } else {
193       curr_key = str;
194     }
195     return true;
196   }
197   bool EndObject(SizeType memberCount) {  return true; }
198   bool StartArray() {  return true; }
199   bool EndArray(SizeType elementCount) {  return true; }
200 };
201
202
203 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
204   unordered_map<string, string> cell_pred;
205   std::string serving_cell_id;
206   int serving_cell_rsrp;
207   int serving_cell_rsrq;
208   int serving_cell_sinr;
209   bool in_serving_array = false;
210   int rf_meas_index = 0;
211
212   bool in_serving_report_object = false;  
213
214   string curr_key = "";
215   string curr_value = "";
216   bool Null() { return true; }
217   bool Bool(bool b) { return true; }
218   bool Int(int i) {
219
220     return true;
221   }
222   
223   bool Uint(unsigned i) {
224
225     if (in_serving_report_object) {
226       if (curr_key.compare("rsrp") == 0) {
227         serving_cell_rsrp = i;
228       } else if (curr_key.compare("rsrq") == 0) {
229         serving_cell_rsrq = i;
230       } else if (curr_key.compare("rssinr") == 0) {
231         serving_cell_sinr = i;
232       }
233     }          
234     
235     return true; }
236   bool Int64(int64_t i) {
237
238     return true; }
239   bool Uint64(uint64_t i) {
240
241     return true; }
242   bool Double(double d) { return true; }
243   bool String(const char* str, SizeType length, bool copy) {
244     
245     if (curr_key.compare("ServingCellID") == 0) {
246       serving_cell_id = str;
247     } 
248
249     return true;
250   }
251   bool StartObject() {
252     if (curr_key.compare("ServingCellRF") == 0) {
253       in_serving_report_object = true;
254     }
255
256     return true; }
257   bool Key(const char* str, SizeType length, bool copy) {
258     
259     curr_key = str;
260     return true;
261   }
262   bool EndObject(SizeType memberCount) {
263     if (curr_key.compare("ServingCellRF") == 0) {
264       in_serving_report_object = false;
265     }
266     return true; }
267   bool StartArray() {
268
269     if (curr_key.compare("ServingCellRF") == 0) {
270       in_serving_array = true;
271     }
272     
273     return true;
274   }
275   bool EndArray(SizeType elementCount) {
276
277     if (curr_key.compare("servingCellRF") == 0) {
278       in_serving_array = false;
279       rf_meas_index = 0;
280     }
281
282     return true; }
283 };
284
285
286 unordered_map<string, UEData> get_sdl_ue_data() {
287
288   fprintf(stderr, "In get_sdl_ue_data()\n");
289
290   unordered_map<string, string> ue_data;
291
292   unordered_map<string, UEData> return_ue_data_map;
293     
294   std::string prefix3="";
295   Keys K2 = sdl->findKeys(nsu, prefix3);
296   DataMap Dk2 = sdl->get(nsu, K2);
297   
298   string ue_json;
299   string ue_id;
300   
301   for(auto si=K2.begin();si!=K2.end();++si){
302     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
303     char val[val_v.size()+1];                               // from Data
304     int i;
305
306     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
307     val[i]='\0';
308       ue_id.assign((std::string)*si);
309       
310       ue_json.assign(val);
311       ue_data[ue_id] =  ue_json;
312   }
313   
314   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
315     UEDataHandler handler;
316     Reader reader;
317     StringStream ss(map_iter->second.c_str());
318     reader.Parse(ss,handler);
319
320     string ueID = map_iter->first;
321     string serving_cell_id = handler.serving_cell_id;
322     int serv_rsrp = handler.serving_cell_rsrp;
323     
324     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
325     
326   }  
327
328   return return_ue_data_map;
329 }
330
331 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
332
333   int response_to = 0;   // max timeout wating for a response
334   int rmtype;           // received message type
335
336   
337   fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
338
339   const char *arg = (const char*)payload.get();
340
341   fprintf(stderr, "payload is %s\n", payload.get());
342
343   PolicyHandler handler;
344   Reader reader;
345   StringStream ss(arg);
346   reader.Parse(ss,handler);
347
348   //Set the threshold value
349
350   if (handler.found_threshold) {
351     fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
352     rsrp_threshold = handler.threshold;
353   }
354
355   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
356   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
357   
358   
359 }
360
361 void send_prediction_request(vector<string> ues_to_predict) {
362
363   std::unique_ptr<Message> msg;
364   Msg_component payload;                                // special type of unique pointer to the payload
365   
366   int nthreads = 1;  
367   int response_to = 0;   // max timeout wating for a response
368   int mtype = 30000;
369   int sz;
370   int i;
371   Msg_component send_payload;
372   
373   msg = xfw->Alloc_msg( 2048 );
374   
375   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
376   if( sz < 2048 ) {
377     fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
378     exit( 1 );
379   }
380
381   string ues_list = "[";
382
383   for (int i = 0; i < ues_to_predict.size(); i++) {
384     if (i == ues_to_predict.size() - 1) {
385       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
386     } else {
387       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
388     }
389   }
390
391   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
392
393   const char *body = message_body.c_str();
394
395   //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
396   
397   send_payload = msg->Get_payload(); // direct access to payload
398   //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
399   snprintf( (char *) send_payload.get(), 2048, body);
400   //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
401
402   fprintf(stderr, "message body %s\n", send_payload.get());  
403   fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
404   
405   // payload updated in place, nothing to copy from, so payload parm is nil
406   if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
407     fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
408   }
409
410   /*
411   msg = xfw->Receive( response_to );
412   if( msg != NULL ) {
413     rmtype = msg->Get_mtype();
414     send_payload = msg->Get_payload();
415     fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
416   } 
417   */
418
419 }
420
421 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
422
423   long now;
424   long total_count;
425
426   int sz;
427   int i;
428
429   int response_to = 0;   // max timeout wating for a response
430
431   int send_mtype = 0;
432   int rmtype;                                                   // received message type
433   int delay = 1000000;                          // mu-sec delay; default 1s
434
435   cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
436   cout << "payload is " << payload.get() << "\n";
437
438   mtype = 0;
439
440   const char* arg = (const char*)payload.get();
441   PredictionHandler handler;
442
443   try {
444
445     Reader reader;
446     StringStream ss(arg);
447     reader.Parse(ss,handler);
448   } catch (...) {
449     cout << "got an exception on stringstream read parse\n";
450   }
451   
452   std::string pred_ue_id = handler.ue_id;
453   
454   cout << "Prediction for " << pred_ue_id << endl;
455   
456   unordered_map<string, int> throughput_map = handler.cell_pred_down;
457
458   cout << endl;
459  
460   unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
461
462   //Decision about CONTROL message
463   //(1) Identify UE Id in Prediction message
464   //(2) Get UEData struct for this UE Id
465   //(3) Identify the UE's service cell ID
466   //(4) Iterate through Prediction message.
467   //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
468
469   UEData pred_ue_data = sdl_data[pred_ue_id];
470   std::string serving_cell_id = pred_ue_data.serving_cell;
471
472   int serving_cell_throughput;
473   int highest_throughput;
474   std::string highest_throughput_cell_id;
475   std::string::size_type str_size;
476
477   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
478
479     std::string curr_cellid = map_iter->first;
480     int curr_throughput = map_iter->second;
481
482     if (curr_cellid.compare(serving_cell_id) == 0) {
483       serving_cell_throughput = curr_throughput;
484       highest_throughput = serving_cell_throughput;
485     }
486
487   }
488
489   //Iterating again to identify the highest throughput prediction
490
491   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
492
493     std::string curr_cellid = map_iter->first;
494     int curr_throughput = map_iter->second;
495
496     if (curr_throughput > serving_cell_throughput) {
497       highest_throughput = curr_throughput;
498       highest_throughput_cell_id = curr_cellid;
499     }
500   }
501
502   if (highest_throughput > serving_cell_throughput) {
503     cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
504     cout << "UE ID: " << pred_ue_id << endl;
505     cout << "Source cell " << serving_cell_id << endl;
506     cout << "Target cell " << highest_throughput_cell_id << endl;
507   }
508
509   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
510   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
511   
512   
513 }
514
515
516 //This function runs a loop that continuously checks SDL for any UE
517
518 void run_loop() {
519
520   cout << "in Traffic Steering run_loop()\n";
521
522   unordered_map<string, UEData> uemap;
523
524   while (1) {
525
526     uemap = get_sdl_ue_data();
527
528     vector<string> prediction_ues;
529
530     for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
531       string ueid = map_iter->first;
532       fprintf(stderr,"found a ueid %s\n", ueid.c_str());
533       UEData data = map_iter->second;
534
535       fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
536
537       if (data.serving_cell_rsrp < rsrp_threshold) {
538         fprintf(stderr,"it is less than the rsrp threshold\n");
539         prediction_ues.push_back(ueid);
540       } else {
541         fprintf(stderr,"it is not less than the rsrp threshold\n");
542       }
543     }
544
545     fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
546
547     if (prediction_ues.size() > 0) {
548       send_prediction_request(prediction_ues);
549     }
550
551     sleep(20);
552   }
553 }
554
555 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
556  * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
557  */
558 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
559   cout << "payload is " << payload.get() << "\n";
560   mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
561 }
562
563 extern int main( int argc, char** argv ) {
564
565   int nthreads = 1;
566
567   char* port = (char *) "4560";
568
569   sdl = shareddatalayer::SyncStorage::create();
570
571   nsu = Namespace(sdl_namespace_u);
572   nsc = Namespace(sdl_namespace_c);
573
574   
575   fprintf( stderr, "<XAPP> listening on port: %s\n", port );
576   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); 
577   
578   xfw->Add_msg_cb( 20010, policy_callback, NULL );
579   xfw->Add_msg_cb( 30002, prediction_callback, NULL );
580   xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
581   
582   std::thread loop_thread;
583
584   loop_thread = std::thread(&run_loop);
585
586   xfw->Run( nthreads );
587   
588 }