Changes to SDL read to support new keys used by KPIMON
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ts_xapp.cpp
23         Abstract:       Traffic Steering xApp;
24                                1. Receives A1 Policy
25                                2. Queries SDL to decide which UE to attempt Traffic Steering for
26                                3. Requests prediction for UE throughput on current and neighbor cells
27                                4. Receives prediction
28                                5. Optionally exercises Traffic Steering action over E2
29
30         Date:           22 April 2020
31         Author:         Ron Shacham
32                 
33 */
34
35 #include <stdio.h>
36 #include <string.h>
37 #include <unistd.h>
38
39 #include <thread>
40 #include <iostream>
41 #include <memory>
42
43 #include <sdl/syncstorage.hpp>
44 #include <set>
45 #include <map>
46 #include <vector>
47 #include <string>
48 #include <unordered_map>
49
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55
56
57 #include "ricxfcpp/xapp.hpp"
58
59 using namespace rapidjson;
60 using namespace std;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
66
67
68 // ----------------------------------------------------------
69
70 std::unique_ptr<Xapp> xfw;
71
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
74
75 int rsrp_threshold = 0;
76
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
78
79 Namespace nsu;
80 Namespace nsc;
81
82 struct UEData {
83   string serving_cell;
84   int serving_cell_rsrp;
85
86 };
87
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89   unordered_map<string, string> cell_pred;
90   std::string ue_id;
91   bool ue_id_found = false;
92   string curr_key = "";
93   string curr_value = "";
94   int policy_type_id;
95   int policy_instance_id;
96   int threshold;
97   std::string operation;
98   bool found_threshold = false;
99
100   
101   bool Null() { return true; }
102   bool Bool(bool b) { return true; }
103   bool Int(int i) {
104
105     if (curr_key.compare("policy_type_id") == 0) {
106       policy_type_id = i;
107     } else if (curr_key.compare("policy_instance_id") == 0) {
108       policy_instance_id = i;
109     } else if (curr_key.compare("threshold") == 0) {
110       found_threshold = true;
111       threshold = i;
112     }
113
114     return true;
115   }
116   bool Uint(unsigned u) {
117
118     if (curr_key.compare("policy_type_id") == 0) {
119       policy_type_id = u;
120     } else if (curr_key.compare("policy_instance_id") == 0) {
121       policy_instance_id = u;
122     } else if (curr_key.compare("threshold") == 0) {
123       found_threshold = true;
124       threshold = u;
125     }
126
127     return true;
128   }    
129   bool Int64(int64_t i) {  return true; }
130   bool Uint64(uint64_t u) {  return true; }
131   bool Double(double d) {  return true; }
132   bool String(const char* str, SizeType length, bool copy) {
133     
134     if (curr_key.compare("operation") != 0) {
135       operation = str;
136     }
137
138     return true;
139   }
140   bool StartObject() {
141
142     return true;
143   }
144   bool Key(const char* str, SizeType length, bool copy) {
145
146     curr_key = str;
147
148     return true;
149   }
150   bool EndObject(SizeType memberCount) {  return true; }
151   bool StartArray() {  return true; }
152   bool EndArray(SizeType elementCount) {  return true; }
153
154 };
155
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157   unordered_map<string, int> cell_pred_down;
158   unordered_map<string, int> cell_pred_up;
159   std::string ue_id;
160   bool ue_id_found = false;
161   string curr_key = "";
162   string curr_value = "";
163   bool down_val = true;
164   bool Null() {  return true; }
165   bool Bool(bool b) {  return true; }
166   bool Int(int i) {  return true; }
167   bool Uint(unsigned u) {    
168
169     if (down_val) {
170       cell_pred_down[curr_key] = u;
171       down_val = false;
172     } else {
173       cell_pred_up[curr_key] = u;
174       down_val = true;
175     }
176
177     return true;    
178
179   }
180   bool Int64(int64_t i) {  return true; }
181   bool Uint64(uint64_t u) {  return true; }
182   bool Double(double d) {  return true; }
183   bool String(const char* str, SizeType length, bool copy) {
184
185     return true;
186   }
187   bool StartObject() {  return true; }
188   bool Key(const char* str, SizeType length, bool copy) {
189     if (!ue_id_found) {
190
191       ue_id = str;
192       ue_id_found = true;
193     } else {
194       curr_key = str;
195     }
196     return true;
197   }
198   bool EndObject(SizeType memberCount) {  return true; }
199   bool StartArray() {  return true; }
200   bool EndArray(SizeType elementCount) {  return true; }
201 };
202
203
204 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
205   unordered_map<string, string> cell_pred;
206   std::string serving_cell_id;
207   int serving_cell_rsrp;
208   int serving_cell_rsrq;
209   int serving_cell_sinr;
210   bool in_serving_array = false;
211   int rf_meas_index = 0;
212
213   bool in_serving_report_object = false;  
214
215   string curr_key = "";
216   string curr_value = "";
217   bool Null() { return true; }
218   bool Bool(bool b) { return true; }
219   bool Int(int i) {
220
221     return true;
222   }
223   
224   bool Uint(unsigned i) {
225
226     if (in_serving_report_object) {
227       if (curr_key.compare("rsrp") == 0) {
228         serving_cell_rsrp = i;
229       } else if (curr_key.compare("rsrq") == 0) {
230         serving_cell_rsrq = i;
231       } else if (curr_key.compare("rssinr") == 0) {
232         serving_cell_sinr = i;
233       }
234     }          
235     
236     return true; }
237   bool Int64(int64_t i) {
238
239     return true; }
240   bool Uint64(uint64_t i) {
241
242     return true; }
243   bool Double(double d) { return true; }
244   bool String(const char* str, SizeType length, bool copy) {
245     
246     if (curr_key.compare("ServingCellID") == 0) {
247       serving_cell_id = str;
248     } 
249
250     return true;
251   }
252   bool StartObject() {
253     if (curr_key.compare("ServingCellRF") == 0) {
254       in_serving_report_object = true;
255     }
256
257     return true; }
258   bool Key(const char* str, SizeType length, bool copy) {
259     
260     curr_key = str;
261     return true;
262   }
263   bool EndObject(SizeType memberCount) {
264     if (curr_key.compare("ServingCellRF") == 0) {
265       in_serving_report_object = false;
266     }
267     return true; }
268   bool StartArray() {
269
270     if (curr_key.compare("ServingCellRF") == 0) {
271       in_serving_array = true;
272     }
273     
274     return true;
275   }
276   bool EndArray(SizeType elementCount) {
277
278     if (curr_key.compare("servingCellRF") == 0) {
279       in_serving_array = false;
280       rf_meas_index = 0;
281     }
282
283     return true; }
284 };
285
286
287 unordered_map<string, UEData> get_sdl_ue_data() {
288
289   fprintf(stderr, "In get_sdl_ue_data()\n");
290
291   unordered_map<string, string> ue_data;
292
293   unordered_map<string, UEData> return_ue_data_map;
294     
295   std::string prefix3="";
296   Keys K2 = sdl->findKeys(nsu, prefix3);
297   DataMap Dk2 = sdl->get(nsu, K2);
298   
299   string ue_json;
300   string ue_id;
301   
302   for(auto si=K2.begin();si!=K2.end();++si){
303     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
304     char val[val_v.size()+1];                               // from Data
305     int i;
306
307     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
308     val[i]='\0';
309       ue_id.assign((std::string)*si);
310       
311       ue_json.assign(val);
312       ue_data[ue_id] =  ue_json;
313   }
314   
315   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
316     UEDataHandler handler;
317     Reader reader;
318     StringStream ss(map_iter->second.c_str());
319     reader.Parse(ss,handler);
320
321     string ueID = map_iter->first;
322     string serving_cell_id = handler.serving_cell_id;
323     int serv_rsrp = handler.serving_cell_rsrp;
324     
325     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
326     
327   }  
328
329   return return_ue_data_map;
330 }
331
332 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
333
334   int response_to = 0;   // max timeout wating for a response
335   int rmtype;           // received message type
336
337   
338   fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
339
340   const char *arg = (const char*)payload.get();
341
342   fprintf(stderr, "payload is %s\n", payload.get());
343
344   PolicyHandler handler;
345   Reader reader;
346   StringStream ss(arg);
347   reader.Parse(ss,handler);
348
349   //Set the threshold value
350
351   if (handler.found_threshold) {
352     fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
353     rsrp_threshold = handler.threshold;
354   }
355
356   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
357   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
358   
359   
360 }
361
362 void send_prediction_request(vector<string> ues_to_predict) {
363
364   std::unique_ptr<Message> msg;
365   Msg_component payload;                                // special type of unique pointer to the payload
366   
367   int nthreads = 1;  
368   int response_to = 0;   // max timeout wating for a response
369   int mtype = 30000;
370   int sz;
371   int i;
372   Msg_component send_payload;
373   
374   msg = xfw->Alloc_msg( 2048 );
375   
376   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
377   if( sz < 2048 ) {
378     fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
379     exit( 1 );
380   }
381
382   string ues_list = "[";
383
384   for (int i = 0; i < ues_to_predict.size(); i++) {
385     if (i == ues_to_predict.size() - 1) {
386       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
387     } else {
388       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
389     }
390   }
391
392   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
393
394   const char *body = message_body.c_str();
395
396   //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
397   
398   send_payload = msg->Get_payload(); // direct access to payload
399   //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
400   snprintf( (char *) send_payload.get(), 2048, body);
401   //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
402
403   fprintf(stderr, "message body %s\n", send_payload.get());  
404   fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
405   
406   // payload updated in place, nothing to copy from, so payload parm is nil
407   if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
408     fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
409   }
410
411   /*
412   msg = xfw->Receive( response_to );
413   if( msg != NULL ) {
414     rmtype = msg->Get_mtype();
415     send_payload = msg->Get_payload();
416     fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
417   } 
418   */
419
420 }
421
422 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
423
424   long now;
425   long total_count;
426
427   int sz;
428   int i;
429
430   int response_to = 0;   // max timeout wating for a response
431
432   int send_mtype = 0;
433   int rmtype;                                                   // received message type
434   int delay = 1000000;                          // mu-sec delay; default 1s
435
436   cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
437   cout << "payload is " << payload.get() << "\n";
438
439   mtype = 0;
440
441   const char* arg = (const char*)payload.get();
442   PredictionHandler handler;
443
444   try {
445
446     Reader reader;
447     StringStream ss(arg);
448     reader.Parse(ss,handler);
449   } catch (...) {
450     cout << "got an exception on stringstream read parse\n";
451   }
452   
453   std::string pred_ue_id = handler.ue_id;
454   
455   cout << "Prediction for " << pred_ue_id << endl;
456   
457   unordered_map<string, int> throughput_map = handler.cell_pred_down;
458
459   cout << endl;
460  
461   unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
462
463   //Decision about CONTROL message
464   //(1) Identify UE Id in Prediction message
465   //(2) Get UEData struct for this UE Id
466   //(3) Identify the UE's service cell ID
467   //(4) Iterate through Prediction message.
468   //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
469
470   UEData pred_ue_data = sdl_data[pred_ue_id];
471   std::string serving_cell_id = pred_ue_data.serving_cell;
472
473   int serving_cell_throughput;
474   int highest_throughput;
475   std::string highest_throughput_cell_id;
476   std::string::size_type str_size;
477
478   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
479
480     std::string curr_cellid = map_iter->first;
481     int curr_throughput = map_iter->second;
482
483     if (curr_cellid.compare(serving_cell_id) == 0) {
484       serving_cell_throughput = curr_throughput;
485       highest_throughput = serving_cell_throughput;
486     }
487
488   }
489
490   //Iterating again to identify the highest throughput prediction
491
492   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
493
494     std::string curr_cellid = map_iter->first;
495     int curr_throughput = map_iter->second;
496
497     if (curr_throughput > serving_cell_throughput) {
498       highest_throughput = curr_throughput;
499       highest_throughput_cell_id = curr_cellid;
500     }
501   }
502
503   if (highest_throughput > serving_cell_throughput) {
504     cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
505     cout << "UE ID: " << pred_ue_id << endl;
506     cout << "Source cell " << serving_cell_id << endl;
507     cout << "Target cell " << highest_throughput_cell_id << endl;
508   }
509
510   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
511   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
512   
513   
514 }
515
516
517 //This function runs a loop that continuously checks SDL for any UE
518
519 void run_loop() {
520
521   cout << "in Traffic Steering run_loop()\n";
522
523   unordered_map<string, UEData> uemap;
524
525   while (1) {
526
527     uemap = get_sdl_ue_data();
528
529     vector<string> prediction_ues;
530
531     for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
532       string ueid = map_iter->first;
533       fprintf(stderr,"found a ueid %s\n", ueid.c_str());
534       UEData data = map_iter->second;
535
536       fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
537
538       if (data.serving_cell_rsrp < rsrp_threshold) {
539         fprintf(stderr,"it is less than the rsrp threshold\n");
540         prediction_ues.push_back(ueid);
541       } else {
542         fprintf(stderr,"it is not less than the rsrp threshold\n");
543       }
544     }
545
546     fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
547
548     if (prediction_ues.size() > 0) {
549       send_prediction_request(prediction_ues);
550     }
551
552     sleep(20);
553   }
554 }
555
556 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
557  * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
558  */
559 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
560   cout << "payload is " << payload.get() << "\n";
561   mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
562 }
563
564 extern int main( int argc, char** argv ) {
565
566   int nthreads = 1;
567
568   char* port = (char *) "4560";
569
570   sdl = shareddatalayer::SyncStorage::create();
571
572   nsu = Namespace(sdl_namespace_u);
573   nsc = Namespace(sdl_namespace_c);
574
575   
576   fprintf( stderr, "<XAPP> listening on port: %s\n", port );
577   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); 
578   
579   xfw->Add_msg_cb( 20010, policy_callback, NULL );
580   xfw->Add_msg_cb( 30002, prediction_callback, NULL );
581   xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
582   
583   std::thread loop_thread;
584
585   loop_thread = std::thread(&run_loop);
586
587   xfw->Run( nthreads );
588   
589 }