Fixed parsing issue for incoming prediction message
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ts_xapp.cpp
23         Abstract:       Traffic Steering xApp;
24                                1. Receives A1 Policy
25                                2. Queries SDL to decide which UE to attempt Traffic Steering for
26                                3. Requests prediction for UE throughput on current and neighbor cells
27                                4. Receives prediction
28                                5. Optionally exercises Traffic Steering action over E2
29
30         Date:           22 April 2020
31         Author:         Ron Shacham
32                 
33 */
34
35 #include <stdio.h>
36 #include <string.h>
37 #include <unistd.h>
38
39 #include <thread>
40 #include <iostream>
41 #include <memory>
42
43 #include <sdl/syncstorage.hpp>
44 #include <set>
45 #include <map>
46 #include <vector>
47 #include <string>
48 #include <unordered_map>
49
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55
56
57 #include "ricxfcpp/xapp.hpp"
58
59 using namespace rapidjson;
60 using namespace std;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
66
67
68 // ----------------------------------------------------------
69
70 std::unique_ptr<Xapp> xfw;
71
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
74
75 int rsrp_threshold = 0;
76
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
78
79 Namespace nsu;
80 Namespace nsc;
81
82 struct UEData {
83   string serving_cell;
84   int serving_cell_rsrp;
85
86 };
87
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89   unordered_map<string, string> cell_pred;
90   std::string ue_id;
91   bool ue_id_found = false;
92   string curr_key = "";
93   string curr_value = "";
94   int policy_type_id;
95   int policy_instance_id;
96   int threshold;
97   std::string operation;
98   bool found_threshold = false;
99
100   
101   bool Null() { cout << "Null()" << endl; return true; }
102   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
103   bool Int(int i) {
104     cout << "Int(" << i << ")" << endl;
105     if (curr_key.compare("policy_type_id") == 0) {
106       policy_type_id = i;
107     } else if (curr_key.compare("policy_instance_id") == 0) {
108       policy_instance_id = i;
109     } else if (curr_key.compare("threshold") == 0) {
110       found_threshold = true;
111       threshold = i;
112     }
113
114     return true;
115   }
116   bool Uint(unsigned u) {
117     cout << "Int(" << u << ")" << endl;
118     if (curr_key.compare("policy_type_id") == 0) {
119       policy_type_id = u;
120     } else if (curr_key.compare("policy_instance_id") == 0) {
121       policy_instance_id = u;
122     } else if (curr_key.compare("threshold") == 0) {
123       found_threshold = true;
124       threshold = u;
125     }
126
127     return true;
128   }    
129   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
130   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
131   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
132   bool String(const char* str, SizeType length, bool copy) {
133     cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
134     if (curr_key.compare("operation") != 0) {
135       operation = str;
136     }
137
138     return true;
139   }
140   bool StartObject() {
141     cout << "StartObject()" << endl;
142     return true;
143   }
144   bool Key(const char* str, SizeType length, bool copy) {
145     cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
146     curr_key = str;
147
148     return true;
149   }
150   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
151   bool StartArray() { cout << "StartArray()" << endl; return true; }
152   bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
153
154 };
155
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157   unordered_map<string, int> cell_pred_down;
158   unordered_map<string, int> cell_pred_up;
159   std::string ue_id;
160   bool ue_id_found = false;
161   string curr_key = "";
162   string curr_value = "";
163   bool down_val = true;
164   bool Null() { cout << "Null()" << endl; return true; }
165   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
166   bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
167   bool Uint(unsigned u) {    
168     cout << "Uint(" << u << ")" << endl; 
169     if (down_val) {
170       cell_pred_down[curr_key] = u;
171       cout << "Setting xput down val for " << curr_key << " to " << u << endl;
172       down_val = false;
173     } else {
174       cell_pred_up[curr_key] = u;
175       cout << "Setting xput up val for " << curr_key << " to " << u << endl;
176       down_val = true;
177     }
178
179     return true;    
180
181   }
182   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
183   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
184   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
185   bool String(const char* str, SizeType length, bool copy) {
186     cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
187
188     return true;
189   }
190   bool StartObject() { cout << "StartObject()" << endl; return true; }
191   bool Key(const char* str, SizeType length, bool copy) {
192     cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
193     if (!ue_id_found) {
194       cout << "Found UE ID\n";
195       ue_id = str;
196       ue_id_found = true;
197     } else {
198       curr_key = str;
199     }
200     return true;
201   }
202   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
203   bool StartArray() { cout << "StartArray()" << endl; return true; }
204   bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
205 };
206
207
208 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
209   unordered_map<string, string> cell_pred;
210   std::string serving_cell_id;
211   int serving_cell_rsrp;
212   int serving_cell_rsrq;
213   int serving_cell_sinr;
214   bool in_serving_array = false;
215   int rf_meas_index = 0;
216
217   string curr_key = "";
218   string curr_value = "";
219   bool Null() { cout << "Null()" << endl; return true; }
220   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
221   bool Int(int i) {
222     fprintf(stderr, "Int(%d)\n", i);
223     if (in_serving_array) {
224       fprintf(stderr, "we are in serving array\n");
225       switch(rf_meas_index) {
226       case 0:
227         serving_cell_rsrp = i;
228         break;
229       case 1:
230         serving_cell_rsrq = i;
231         break;
232       case 2:
233         serving_cell_sinr = i;
234         break;
235       }
236       rf_meas_index++;
237     }
238     return true;
239   }
240   bool Uint(unsigned u) {
241     fprintf(stderr, "Int(%d)\n", u); return true; }
242   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
243   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
244   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
245   bool String(const char* str, SizeType length, bool copy) {
246     fprintf(stderr,"String(%s)\n", str);
247     if (curr_key.compare("ServingCellID") == 0) {
248       serving_cell_id = str;
249     } 
250
251     return true;
252   }
253   bool StartObject() { cout << "StartObject()" << endl; return true; }
254   bool Key(const char* str, SizeType length, bool copy) {
255     fprintf(stderr,"Key(%s)\n", str);
256     curr_key = str;
257     return true;
258   }
259   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
260   bool StartArray() {
261     fprintf(stderr,"StartArray()");
262     if (curr_key.compare("ServingCellRF") == 0) {
263       in_serving_array = true;
264     }
265     
266     return true;
267   }
268   bool EndArray(SizeType elementCount) {
269     fprintf(stderr, "EndArray()\n");
270     if (curr_key.compare("servingCellRF") == 0) {
271       in_serving_array = false;
272       rf_meas_index = 0;
273     }
274
275     return true; }
276 };
277
278
279 unordered_map<string, UEData> get_sdl_ue_data() {
280
281   fprintf(stderr, "In get_sdl_ue_data()\n");
282
283   unordered_map<string, string> ue_data;
284
285   unordered_map<string, UEData> return_ue_data_map;
286     
287   std::string prefix3="12";
288   Keys K2 = sdl->findKeys(nsu, prefix3);
289   DataMap Dk2 = sdl->get(nsu, K2);
290   
291   string ue_json;
292   string ue_id;
293   
294   for(auto si=K2.begin();si!=K2.end();++si){
295     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
296     char val[val_v.size()+1];                               // from Data
297     int i;
298     fprintf(stderr, "val size %d\n", val_v.size());
299     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
300     val[i]='\0';
301       ue_id.assign((std::string)*si);
302       
303       ue_json.assign(val);
304       ue_data[ue_id] =  ue_json;
305   }
306   
307   fprintf(stderr, "after sdl get of ue data\n");
308   
309   fprintf(stderr, "From UE data map\n");
310   
311   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
312     UEDataHandler handler;
313     Reader reader;
314     StringStream ss(map_iter->second.c_str());
315     reader.Parse(ss,handler);
316
317     string ueID = map_iter->first;
318     string serving_cell_id = handler.serving_cell_id;
319     int serv_rsrp = handler.serving_cell_rsrp;
320     
321     fprintf(stderr,"UE data for %s\n", ueID.c_str());
322     fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
323     fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
324
325     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
326     
327   }
328   
329   fprintf(stderr, "\n");
330   return return_ue_data_map;
331 }
332
333 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
334
335   int response_to = 0;   // max timeout wating for a response
336   int rmtype;           // received message type
337
338   
339   fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
340   fprintf(stderr, "payload is %s\n", payload.get());
341   
342
343   const char *arg = (const char*)payload.get();
344
345   PolicyHandler handler;
346   Reader reader;
347   StringStream ss(arg);
348   reader.Parse(ss,handler);
349
350   //Set the threshold value
351
352   if (handler.found_threshold) {
353     rsrp_threshold = handler.threshold;
354   }
355
356   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
357   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
358   
359   
360 }
361
362 void send_prediction_request(vector<string> ues_to_predict) {
363
364   std::unique_ptr<Message> msg;
365   Msg_component payload;                                // special type of unique pointer to the payload
366   
367   int nthreads = 1;  
368   int response_to = 0;   // max timeout wating for a response
369   int mtype = 30000;
370   int sz;
371   int i;
372   Msg_component send_payload;
373   
374   fprintf(stderr, "cb 1\n");
375
376   msg = xfw->Alloc_msg( 2048 );
377   
378   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
379   if( sz < 2048 ) {
380     fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
381     exit( 1 );
382   }
383
384   fprintf(stderr, "cb 2");
385
386   string ues_list = "[";
387
388   for (int i = 0; i < ues_to_predict.size(); i++) {
389     if (i == ues_to_predict.size() - 1) {
390       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
391     } else {
392       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
393     }
394   }
395
396   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
397
398   const char *body = message_body.c_str();
399
400   //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
401   
402   send_payload = msg->Get_payload(); // direct access to payload
403   //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
404   //  snprintf( (char *) send_payload.get(), 2048, body);
405   snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
406
407   fprintf(stderr, "message body %s\n", send_payload.get());
408   
409   fprintf(stderr, "cb 3");
410   fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
411   
412   // payload updated in place, nothing to copy from, so payload parm is nil
413   if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
414     fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
415   }
416
417   fprintf(stderr, "cb 4");
418
419   /*
420   msg = xfw->Receive( response_to );
421   if( msg != NULL ) {
422     rmtype = msg->Get_mtype();
423     send_payload = msg->Get_payload();
424     fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
425   } 
426   */
427
428 }
429
430 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
431
432   long now;
433   long total_count;
434
435   int sz;
436   int i;
437
438   int response_to = 0;   // max timeout wating for a response
439
440   int send_mtype = 0;
441   int rmtype;                                                   // received message type
442   int delay = 1000000;                          // mu-sec delay; default 1s
443
444   cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
445   cout << "payload is " << payload.get() << "\n";
446
447   mtype = 0;
448
449   cout << "prediction callback 1" << endl;
450
451   const char* arg = (const char*)payload.get();
452
453   cout << "ready to parse " << arg << endl;
454
455   PredictionHandler handler;
456
457   try {
458
459     Reader reader;
460     StringStream ss(arg);
461     reader.Parse(ss,handler);
462   } catch (...) {
463     cout << "got an exception on stringstream read parse\n";
464   }
465   
466   std::string pred_ue_id = handler.ue_id;
467   
468   cout << "Prediction for " << pred_ue_id << endl;
469   
470   unordered_map<string, int> throughput_map = handler.cell_pred_down;
471
472
473   cout << endl;
474  
475   unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
476
477   //Decision about CONTROL message
478   //(1) Identify UE Id in Prediction message
479   //(2) Get UEData struct for this UE Id
480   //(3) Identify the UE's service cell ID
481   //(4) Iterate through Prediction message.
482   //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
483
484   UEData pred_ue_data = sdl_data[pred_ue_id];
485   std::string serving_cell_id = pred_ue_data.serving_cell;
486
487   int serving_cell_throughput;
488   int highest_throughput;
489   std::string highest_throughput_cell_id;
490   std::string::size_type str_size;
491
492   cout << "Going through throughtput map:" << endl;
493
494   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
495     cout << map_iter->first << " : " << map_iter->second << endl;    
496     std::string curr_cellid = map_iter->first;
497     cout << "Cell ID is " << curr_cellid;
498     int curr_throughput = map_iter->second;
499     cout << "Throughput is " << curr_throughput << endl;
500
501     if (curr_cellid.compare(serving_cell_id) == 0) {
502       serving_cell_throughput = curr_throughput;
503       highest_throughput = serving_cell_throughput;
504     }
505
506   }
507
508   //Iterating again to identify the highest throughput prediction
509
510   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
511     cout << map_iter->first << " : " << map_iter->second << endl;    
512     std::string curr_cellid = map_iter->first;
513     cout << "Cell ID is " << curr_cellid;
514     int curr_throughput = map_iter->second;
515     cout << "Throughput is " << curr_throughput << endl;
516
517     if (curr_throughput > serving_cell_throughput) {
518       highest_throughput = curr_throughput;
519       highest_throughput_cell_id = curr_cellid;
520     }
521   }
522
523   if (highest_throughput > serving_cell_throughput) {
524     cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
525     cout << "UE ID: " << pred_ue_id << endl;
526     cout << "Source cell " << serving_cell_id << endl;
527     cout << "Target cell " << highest_throughput_cell_id << endl;
528   }
529
530   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
531   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
532   
533   
534 }
535
536
537 //This function runs a loop that continuously checks SDL for any UE
538
539 void run_loop() {
540
541   cout << "in run_loop()\n";
542
543   unordered_map<string, UEData> uemap;
544
545   vector<string> prediction_ues;
546
547   while (1) {
548
549     cout <<  "in while loop\n";
550
551     uemap = get_sdl_ue_data();
552
553     for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
554       string ueid = map_iter->first;
555       UEData data = map_iter->second;
556       if (data.serving_cell_rsrp < rsrp_threshold) {
557         prediction_ues.push_back(ueid);
558       }
559     }
560
561     if (prediction_ues.size() > 0) {
562       send_prediction_request(prediction_ues);
563     }
564
565     sleep(20);
566   }
567 }
568
569
570
571 extern int main( int argc, char** argv ) {
572
573   int nthreads = 1;
574
575   char* port = (char *) "4560";
576
577   sdl = shareddatalayer::SyncStorage::create();
578
579   nsu = Namespace(sdl_namespace_u);
580   nsc = Namespace(sdl_namespace_c);
581
582   
583   fprintf( stderr, "<XAPP> listening on port: %s\n", port );
584   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
585   fprintf(stderr, "code1\n");
586
587   
588   xfw->Add_msg_cb( 20010, policy_callback, NULL );
589   xfw->Add_msg_cb( 30002, prediction_callback, NULL );
590
591   fprintf(stderr, "code2\n");
592   
593   std::thread loop_thread;
594
595   loop_thread = std::thread(&run_loop);
596
597   xfw->Run( nthreads );
598   
599 }