Fixed up error from last commit
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ts_xapp.cpp
23         Abstract:       Traffic Steering xApp;
24                                1. Receives A1 Policy
25                                2. Queries SDL to decide which UE to attempt Traffic Steering for
26                                3. Requests prediction for UE throughput on current and neighbor cells
27                                4. Receives prediction
28                                5. Optionally exercises Traffic Steering action over E2
29
30         Date:           22 April 2020
31         Author:         Ron Shacham
32                 
33 */
34
35 #include <stdio.h>
36 #include <string.h>
37 #include <unistd.h>
38
39 #include <thread>
40 #include <iostream>
41 #include <memory>
42
43 #include <sdl/syncstorage.hpp>
44 #include <set>
45 #include <map>
46 #include <vector>
47 #include <string>
48 #include <unordered_map>
49
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55
56
57 #include "ricxfcpp/xapp.hpp"
58
59 using namespace rapidjson;
60 using namespace std;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
66
67
68 // ----------------------------------------------------------
69
70 std::unique_ptr<Xapp> xfw;
71
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
74
75 int rsrp_threshold = 0;
76
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
78
79 Namespace nsu;
80 Namespace nsc;
81
82 struct UEData {
83   string serving_cell;
84   int serving_cell_rsrp;
85
86 };
87
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89   unordered_map<string, string> cell_pred;
90   std::string ue_id;
91   bool ue_id_found = false;
92   string curr_key = "";
93   string curr_value = "";
94   int policy_type_id;
95   int policy_instance_id;
96   int threshold;
97   std::string operation;
98   bool found_threshold = false;
99
100   
101   bool Null() { cout << "Null()" << endl; return true; }
102   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
103   bool Int(int i) {
104     cout << "Int(" << i << ")" << endl;
105     if (curr_key.compare("policy_type_id") == 0) {
106       policy_type_id = i;
107     } else if (curr_key.compare("policy_instance_id") == 0) {
108       policy_instance_id = i;
109     } else if (curr_key.compare("threshold") == 0) {
110       found_threshold = true;
111       threshold = i;
112     }
113
114     return true;
115   }
116   bool Uint(unsigned u) {
117     cout << "Int(" << u << ")" << endl;
118     if (curr_key.compare("policy_type_id") == 0) {
119       policy_type_id = u;
120     } else if (curr_key.compare("policy_instance_id") == 0) {
121       policy_instance_id = u;
122     } else if (curr_key.compare("threshold") == 0) {
123       found_threshold = true;
124       threshold = u;
125     }
126
127     return true;
128   }    
129   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
130   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
131   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
132   bool String(const char* str, SizeType length, bool copy) {
133     cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
134     if (curr_key.compare("operation") != 0) {
135       operation = str;
136     }
137
138     return true;
139   }
140   bool StartObject() {
141     cout << "StartObject()" << endl;
142     return true;
143   }
144   bool Key(const char* str, SizeType length, bool copy) {
145     cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
146     curr_key = str;
147
148     return true;
149   }
150   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
151   bool StartArray() { cout << "StartArray()" << endl; return true; }
152   bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
153
154 };
155
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157   unordered_map<string, int> cell_pred_down;
158   unordered_map<string, int> cell_pred_up;
159   std::string ue_id;
160   bool ue_id_found = false;
161   string curr_key = "";
162   string curr_value = "";
163   bool down_val = true;
164   bool Null() { cout << "Null()" << endl; return true; }
165   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
166   bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
167   bool Uint(unsigned u) {    
168     cout << "Uint(" << u << ")" << endl; 
169     if (down_val) {
170       cell_pred_down[curr_key] = u;
171       cout << "Setting xput down val for " << curr_key << " to " << u << endl;
172       down_val = false;
173     } else {
174       cell_pred_up[curr_key] = u;
175       cout << "Setting xput up val for " << curr_key << " to " << u << endl;
176       down_val = true;
177     }
178
179     return true;    
180
181   }
182   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
183   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
184   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
185   bool String(const char* str, SizeType length, bool copy) {
186     cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
187
188     return true;
189   }
190   bool StartObject() { cout << "StartObject()" << endl; return true; }
191   bool Key(const char* str, SizeType length, bool copy) {
192     cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
193     if (!ue_id_found) {
194       cout << "Found UE ID\n";
195       ue_id = str;
196       ue_id_found = true;
197     } else {
198       curr_key = str;
199     }
200     return true;
201   }
202   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
203   bool StartArray() { cout << "StartArray()" << endl; return true; }
204   bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
205 };
206
207
208 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
209   unordered_map<string, string> cell_pred;
210   std::string serving_cell_id;
211   int serving_cell_rsrp;
212   int serving_cell_rsrq;
213   int serving_cell_sinr;
214   bool in_serving_array = false;
215   int rf_meas_index = 0;
216
217   string curr_key = "";
218   string curr_value = "";
219   bool Null() { cout << "Null()" << endl; return true; }
220   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
221   bool Int(int i) {
222     fprintf(stderr, "Int(%d)\n", i);
223     if (in_serving_array) {
224       fprintf(stderr, "we are in serving array\n");
225       switch(rf_meas_index) {
226       case 0:
227         serving_cell_rsrp = i;
228         break;
229       case 1:
230         serving_cell_rsrq = i;
231         break;
232       case 2:
233         serving_cell_sinr = i;
234         break;
235       }
236       rf_meas_index++;
237     }
238     return true;
239   }
240   bool Uint(unsigned u) {
241     fprintf(stderr, "Int(%d)\n", u); return true; }
242   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
243   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
244   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
245   bool String(const char* str, SizeType length, bool copy) {
246     fprintf(stderr,"String(%s)\n", str);
247     if (curr_key.compare("ServingCellID") == 0) {
248       serving_cell_id = str;
249     } 
250
251     return true;
252   }
253   bool StartObject() { cout << "StartObject()" << endl; return true; }
254   bool Key(const char* str, SizeType length, bool copy) {
255     fprintf(stderr,"Key(%s)\n", str);
256     curr_key = str;
257     return true;
258   }
259   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
260   bool StartArray() {
261     fprintf(stderr,"StartArray()");
262     if (curr_key.compare("ServingCellRF") == 0) {
263       in_serving_array = true;
264     }
265     
266     return true;
267   }
268   bool EndArray(SizeType elementCount) {
269     fprintf(stderr, "EndArray()\n");
270     if (curr_key.compare("servingCellRF") == 0) {
271       in_serving_array = false;
272       rf_meas_index = 0;
273     }
274
275     return true; }
276 };
277
278
279 unordered_map<string, UEData> get_sdl_ue_data() {
280
281   fprintf(stderr, "In get_sdl_ue_data()\n");
282
283   unordered_map<string, string> ue_data;
284
285   unordered_map<string, UEData> return_ue_data_map;
286     
287   std::string prefix3="12";
288   Keys K2 = sdl->findKeys(nsu, prefix3);
289   DataMap Dk2 = sdl->get(nsu, K2);
290   
291   string ue_json;
292   string ue_id;
293   
294   for(auto si=K2.begin();si!=K2.end();++si){
295     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
296     char val[val_v.size()+1];                               // from Data
297     int i;
298     fprintf(stderr, "val size %d\n", val_v.size());
299     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
300     val[i]='\0';
301       ue_id.assign((std::string)*si);
302       
303       ue_json.assign(val);
304       ue_data[ue_id] =  ue_json;
305   }
306   
307   fprintf(stderr, "after sdl get of ue data\n");
308   
309   fprintf(stderr, "From UE data map\n");
310   
311   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
312     UEDataHandler handler;
313     Reader reader;
314     StringStream ss(map_iter->second.c_str());
315     reader.Parse(ss,handler);
316
317     string ueID = map_iter->first;
318     string serving_cell_id = handler.serving_cell_id;
319     int serv_rsrp = handler.serving_cell_rsrp;
320     
321     fprintf(stderr,"UE data for %s\n", ueID.c_str());
322     fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
323     fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
324
325     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
326     
327   }
328   
329   fprintf(stderr, "\n");
330   return return_ue_data_map;
331 }
332
333 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
334
335   int response_to = 0;   // max timeout wating for a response
336   int rmtype;           // received message type
337
338   
339   cout <<  "Policy Callback got a message, type=" << mtype << " , length=" << len << endl;
340   cout <<  "payload is " << payload.get() << endl;
341   
342
343   const char *arg = (const char*)payload.get();
344
345   PolicyHandler handler;
346   Reader reader;
347   StringStream ss(arg);
348   reader.Parse(ss,handler);
349
350   //Set the threshold value
351
352   if (handler.found_threshold) {
353     cout << "Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
354     rsrp_threshold = handler.threshold;
355   }
356
357   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
358   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
359   
360   
361 }
362
363 void send_prediction_request(vector<string> ues_to_predict) {
364
365   std::unique_ptr<Message> msg;
366   Msg_component payload;                                // special type of unique pointer to the payload
367   
368   int nthreads = 1;  
369   int response_to = 0;   // max timeout wating for a response
370   int mtype = 30000;
371   int sz;
372   int i;
373   Msg_component send_payload;
374   
375   fprintf(stderr, "cb 1\n");
376
377   msg = xfw->Alloc_msg( 2048 );
378   
379   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
380   if( sz < 2048 ) {
381     fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
382     exit( 1 );
383   }
384
385   fprintf(stderr, "cb 2");
386
387   string ues_list = "[";
388
389   for (int i = 0; i < ues_to_predict.size(); i++) {
390     if (i == ues_to_predict.size() - 1) {
391       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
392     } else {
393       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
394     }
395   }
396
397   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
398
399   const char *body = message_body.c_str();
400
401   //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
402   
403   send_payload = msg->Get_payload(); // direct access to payload
404   //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
405   //  snprintf( (char *) send_payload.get(), 2048, body);
406   snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
407
408   fprintf(stderr, "message body %s\n", send_payload.get());
409   
410   fprintf(stderr, "cb 3");
411   fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
412   
413   // payload updated in place, nothing to copy from, so payload parm is nil
414   if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
415     fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
416   }
417
418   fprintf(stderr, "cb 4");
419
420   /*
421   msg = xfw->Receive( response_to );
422   if( msg != NULL ) {
423     rmtype = msg->Get_mtype();
424     send_payload = msg->Get_payload();
425     fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
426   } 
427   */
428
429 }
430
431 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
432
433   long now;
434   long total_count;
435
436   int sz;
437   int i;
438
439   int response_to = 0;   // max timeout wating for a response
440
441   int send_mtype = 0;
442   int rmtype;                                                   // received message type
443   int delay = 1000000;                          // mu-sec delay; default 1s
444
445   cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
446   cout << "payload is " << payload.get() << "\n";
447
448   mtype = 0;
449
450   cout << "prediction callback 1" << endl;
451
452   const char* arg = (const char*)payload.get();
453
454   cout << "ready to parse " << arg << endl;
455
456   PredictionHandler handler;
457
458   try {
459
460     Reader reader;
461     StringStream ss(arg);
462     reader.Parse(ss,handler);
463   } catch (...) {
464     cout << "got an exception on stringstream read parse\n";
465   }
466   
467   std::string pred_ue_id = handler.ue_id;
468   
469   cout << "Prediction for " << pred_ue_id << endl;
470   
471   unordered_map<string, int> throughput_map = handler.cell_pred_down;
472
473
474   cout << endl;
475  
476   unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
477
478   //Decision about CONTROL message
479   //(1) Identify UE Id in Prediction message
480   //(2) Get UEData struct for this UE Id
481   //(3) Identify the UE's service cell ID
482   //(4) Iterate through Prediction message.
483   //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
484
485   UEData pred_ue_data = sdl_data[pred_ue_id];
486   std::string serving_cell_id = pred_ue_data.serving_cell;
487
488   int serving_cell_throughput;
489   int highest_throughput;
490   std::string highest_throughput_cell_id;
491   std::string::size_type str_size;
492
493   cout << "Going through throughtput map:" << endl;
494
495   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
496     cout << map_iter->first << " : " << map_iter->second << endl;    
497     std::string curr_cellid = map_iter->first;
498     cout << "Cell ID is " << curr_cellid;
499     int curr_throughput = map_iter->second;
500     cout << "Throughput is " << curr_throughput << endl;
501
502     if (curr_cellid.compare(serving_cell_id) == 0) {
503       serving_cell_throughput = curr_throughput;
504       highest_throughput = serving_cell_throughput;
505     }
506
507   }
508
509   //Iterating again to identify the highest throughput prediction
510
511   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
512     cout << map_iter->first << " : " << map_iter->second << endl;    
513     std::string curr_cellid = map_iter->first;
514     cout << "Cell ID is " << curr_cellid;
515     int curr_throughput = map_iter->second;
516     cout << "Throughput is " << curr_throughput << endl;
517
518     if (curr_throughput > serving_cell_throughput) {
519       highest_throughput = curr_throughput;
520       highest_throughput_cell_id = curr_cellid;
521     }
522   }
523
524   if (highest_throughput > serving_cell_throughput) {
525     cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
526     cout << "UE ID: " << pred_ue_id << endl;
527     cout << "Source cell " << serving_cell_id << endl;
528     cout << "Target cell " << highest_throughput_cell_id << endl;
529   }
530
531   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
532   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
533   
534   
535 }
536
537
538 //This function runs a loop that continuously checks SDL for any UE
539
540 void run_loop() {
541
542   cout << "in run_loop()\n";
543
544   unordered_map<string, UEData> uemap;
545
546   vector<string> prediction_ues;
547
548   while (1) {
549
550     cout <<  "in while loop\n";
551
552     uemap = get_sdl_ue_data();
553
554     for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
555       string ueid = map_iter->first;
556       UEData data = map_iter->second;
557       if (data.serving_cell_rsrp < rsrp_threshold) {
558         prediction_ues.push_back(ueid);
559       }
560     }
561
562     if (prediction_ues.size() > 0) {
563       send_prediction_request(prediction_ues);
564     }
565
566     sleep(20);
567   }
568 }
569
570
571
572 extern int main( int argc, char** argv ) {
573
574   int nthreads = 1;
575
576   char* port = (char *) "4560";
577
578   sdl = shareddatalayer::SyncStorage::create();
579
580   nsu = Namespace(sdl_namespace_u);
581   nsc = Namespace(sdl_namespace_c);
582
583   
584   fprintf( stderr, "<XAPP> listening on port: %s\n", port );
585   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
586   fprintf(stderr, "code1\n");
587
588   
589   xfw->Add_msg_cb( 20010, policy_callback, NULL );
590   xfw->Add_msg_cb( 30002, prediction_callback, NULL );
591
592   fprintf(stderr, "code2\n");
593   
594   std::thread loop_thread;
595
596   loop_thread = std::thread(&run_loop);
597
598   xfw->Run( nthreads );
599   
600 }