Added support for A1 Policy Create message parsing
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ts_xapp.cpp
23         Abstract:       Traffic Steering xApp;
24                                1. Receives A1 Policy
25                                2. Queries SDL to decide which UE to attempt Traffic Steering for
26                                3. Requests prediction for UE throughput on current and neighbor cells
27                                4. Receives prediction
28                                5. Optionally exercises Traffic Steering action over E2
29
30         Date:           22 April 2020
31         Author:         Ron Shacham
32                 
33 */
34
35 #include <stdio.h>
36 #include <string.h>
37 #include <unistd.h>
38
39 #include <thread>
40 #include <iostream>
41 #include <memory>
42
43 #include <sdl/syncstorage.hpp>
44 #include <set>
45 #include <map>
46 #include <vector>
47 #include <string>
48 #include <unordered_map>
49
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55
56
57 #include "ricxfcpp/xapp.hpp"
58
59 using namespace rapidjson;
60 using namespace std;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
66
67
68 // ----------------------------------------------------------
69
70 std::unique_ptr<Xapp> xfw;
71
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
74
75 int rsrp_threshold = 0;
76
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
78
79 Namespace nsu;
80 Namespace nsc;
81
82 struct UEData {
83   string serving_cell;
84   int serving_cell_rsrp;
85
86 };
87
88 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
89   unordered_map<string, string> cell_pred;
90   std::string ue_id;
91   bool ue_id_found = false;
92   string curr_key = "";
93   string curr_value = "";
94   int policy_type_id;
95   int policy_instance_id;
96   int threshold;
97   std::string operation;
98   bool found_threshold = false;
99
100   
101   bool Null() { cout << "Null()" << endl; return true; }
102   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
103   bool Int(int i) {
104     cout << "Int(" << i << ")" << endl;
105     if (curr_key.compare("policy_type_id") == 0) {
106       policy_type_id = i;
107     } else if (curr_key.compare("policy_instance_id") == 0) {
108       policy_instance_id = i;
109     } else if (curr_key.compare("threshold") == 0) {
110       found_threshold = true;
111       threshold = i;
112     }
113
114     return true;
115   }
116   bool Uint(unsigned u) {
117     cout << "Int(" << u << ")" << endl;
118     if (curr_key.compare("policy_type_id") == 0) {
119       policy_type_id = u;
120     } else if (curr_key.compare("policy_instance_id") == 0) {
121       policy_instance_id = u;
122     } else if (curr_key.compare("threshold") == 0) {
123       found_threshold = true;
124       threshold = u;
125     }
126
127     return true;
128   }    
129   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
130   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
131   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
132   bool String(const char* str, SizeType length, bool copy) {
133     cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
134     if (curr_key.compare("operation") != 0) {
135       operation = str;
136     }
137
138     return true;
139   }
140   bool StartObject() {
141     cout << "StartObject()" << endl;
142     return true;
143   }
144   bool Key(const char* str, SizeType length, bool copy) {
145     cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
146     curr_key = str;
147
148     return true;
149   }
150   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
151   bool StartArray() { cout << "StartArray()" << endl; return true; }
152   bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
153
154 };
155
156 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
157   unordered_map<string, string> cell_pred;
158   std::string ue_id;
159   bool ue_id_found = false;
160   string curr_key = "";
161   string curr_value = "";
162   bool Null() { cout << "Null()" << endl; return true; }
163   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
164   bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
165   bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; }
166   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
167   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
168   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
169   bool String(const char* str, SizeType length, bool copy) {
170     cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
171     if (curr_key.compare("") != 0) {
172       cout << "Found throughput\n";
173       curr_value = str;
174       cell_pred[curr_key] = curr_value;
175       curr_key = "";
176       curr_value = "";
177     }
178
179     return true;
180   }
181   bool StartObject() { cout << "StartObject()" << endl; return true; }
182   bool Key(const char* str, SizeType length, bool copy) {
183     cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
184     if (!ue_id_found) {
185       cout << "Found UE ID\n";
186       ue_id = str;
187       ue_id_found = true;
188     } else {
189       curr_key = str;
190     }
191     return true;
192   }
193   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
194   bool StartArray() { cout << "StartArray()" << endl; return true; }
195   bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
196 };
197
198
199 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
200   unordered_map<string, string> cell_pred;
201   std::string serving_cell_id;
202   int serving_cell_rsrp;
203   int serving_cell_rsrq;
204   int serving_cell_sinr;
205   bool in_serving_array = false;
206   int rf_meas_index = 0;
207
208   string curr_key = "";
209   string curr_value = "";
210   bool Null() { cout << "Null()" << endl; return true; }
211   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
212   bool Int(int i) {
213     fprintf(stderr, "Int(%d)\n", i);
214     if (in_serving_array) {
215       fprintf(stderr, "we are in serving array\n");
216       switch(rf_meas_index) {
217       case 0:
218         serving_cell_rsrp = i;
219         break;
220       case 1:
221         serving_cell_rsrq = i;
222         break;
223       case 2:
224         serving_cell_sinr = i;
225         break;
226       }
227       rf_meas_index++;
228     }
229     return true;
230   }
231   bool Uint(unsigned u) {
232     fprintf(stderr, "Int(%d)\n", u); return true; }
233   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
234   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
235   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
236   bool String(const char* str, SizeType length, bool copy) {
237     fprintf(stderr,"String(%s)\n", str);
238     if (curr_key.compare("ServingCellID") == 0) {
239       serving_cell_id = str;
240     } 
241
242     return true;
243   }
244   bool StartObject() { cout << "StartObject()" << endl; return true; }
245   bool Key(const char* str, SizeType length, bool copy) {
246     fprintf(stderr,"Key(%s)\n", str);
247     curr_key = str;
248     return true;
249   }
250   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
251   bool StartArray() {
252     fprintf(stderr,"StartArray()");
253     if (curr_key.compare("ServingCellRF") == 0) {
254       in_serving_array = true;
255     }
256     
257     return true;
258   }
259   bool EndArray(SizeType elementCount) {
260     fprintf(stderr, "EndArray()\n");
261     if (curr_key.compare("servingCellRF") == 0) {
262       in_serving_array = false;
263       rf_meas_index = 0;
264     }
265
266     return true; }
267 };
268
269
270 unordered_map<string, UEData> get_sdl_ue_data() {
271
272   fprintf(stderr, "In get_sdl_ue_data()\n");
273
274   unordered_map<string, string> ue_data;
275
276   unordered_map<string, UEData> return_ue_data_map;
277     
278   std::string prefix3="12";
279   Keys K2 = sdl->findKeys(nsu, prefix3);
280   DataMap Dk2 = sdl->get(nsu, K2);
281   
282   string ue_json;
283   string ue_id;
284   
285   for(auto si=K2.begin();si!=K2.end();++si){
286     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
287     char val[val_v.size()+1];                               // from Data
288     int i;
289     fprintf(stderr, "val size %d\n", val_v.size());
290     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
291     val[i]='\0';
292       ue_id.assign((std::string)*si);
293       
294       ue_json.assign(val);
295       ue_data[ue_id] =  ue_json;
296   }
297   
298   fprintf(stderr, "after sdl get of ue data\n");
299   
300   fprintf(stderr, "From UE data map\n");
301   
302   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
303     UEDataHandler handler;
304     Reader reader;
305     StringStream ss(map_iter->second.c_str());
306     reader.Parse(ss,handler);
307
308     string ueID = map_iter->first;
309     string serving_cell_id = handler.serving_cell_id;
310     int serv_rsrp = handler.serving_cell_rsrp;
311     
312     fprintf(stderr,"UE data for %s\n", ueID.c_str());
313     fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
314     fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
315
316     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
317     
318   }
319   
320   fprintf(stderr, "\n");
321   return return_ue_data_map;
322 }
323
324 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
325
326   int response_to = 0;   // max timeout wating for a response
327   int rmtype;           // received message type
328
329   
330   fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
331   fprintf(stderr, "payload is %s\n", payload.get());
332   
333   //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
334   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
335   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
336
337   const char *arg = (const char*)payload.get();
338
339   PolicyHandler handler;
340   Reader reader;
341   StringStream ss(arg);
342   reader.Parse(ss,handler);
343
344   //Set the threshold value
345
346   if (handler.found_threshold) {
347     rsrp_threshold = handler.threshold;
348   }
349   
350 }
351
352 void send_prediction_request(vector<string> ues_to_predict) {
353
354   std::unique_ptr<Message> msg;
355   Msg_component payload;                                // special type of unique pointer to the payload
356   
357   int nthreads = 1;  
358   int response_to = 0;   // max timeout wating for a response
359   int mtype = 30000;
360   int sz;
361   int i;
362   Msg_component send_payload;
363   
364   fprintf(stderr, "cb 1\n");
365
366   msg = xfw->Alloc_msg( 2048 );
367   
368   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
369   if( sz < 2048 ) {
370     fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
371     exit( 1 );
372   }
373
374   fprintf(stderr, "cb 2");
375
376   string ues_list = "[";
377
378   for (int i = 0; i < ues_to_predict.size(); i++) {
379     if (i == ues_to_predict.size() - 1) {
380       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
381     } else {
382       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
383     }
384   }
385
386   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
387
388   const char *body = message_body.c_str();
389
390   //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
391   
392   send_payload = msg->Get_payload(); // direct access to payload
393   //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
394   //  snprintf( (char *) send_payload.get(), 2048, body);
395   snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
396
397   fprintf(stderr, "message body %s\n", send_payload.get());
398   
399   fprintf(stderr, "cb 3");
400   fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
401   
402   // payload updated in place, nothing to copy from, so payload parm is nil
403   if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
404     fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
405   }
406
407   fprintf(stderr, "cb 4");
408
409   /*
410   msg = xfw->Receive( response_to );
411   if( msg != NULL ) {
412     rmtype = msg->Get_mtype();
413     send_payload = msg->Get_payload();
414     fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
415   } 
416   */
417
418 }
419
420 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
421
422   long now;
423   long total_count;
424
425   int sz;
426   int i;
427
428   int response_to = 0;   // max timeout wating for a response
429
430   int send_mtype = 0;
431   int rmtype;                                                   // received message type
432   int delay = 1000000;                          // mu-sec delay; default 1s
433
434   fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
435   fprintf(stderr, "payload is %s\n", payload.get());
436   
437   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
438   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
439
440   mtype = 0;
441
442   fprintf(stderr, "cb 1\n");
443
444   const char* arg = (const char*)payload.get();
445
446   PredictionHandler handler;
447   Reader reader;
448   StringStream ss(arg);
449   reader.Parse(ss,handler);
450
451   std::string pred_ue_id = handler.ue_id;
452
453   cout << "Prediction for " << pred_ue_id << endl;
454
455   unordered_map<string, string> throughput_map = handler.cell_pred;
456
457
458   cout << endl;
459  
460   unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
461
462   //Decision about CONTROL message
463   //(1) Identify UE Id in Prediction message
464   //(2) Get UEData struct for this UE Id
465   //(3) Identify the UE's service cell ID
466   //(4) Iterate through Prediction message.
467   //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
468
469   UEData pred_ue_data = sdl_data[pred_ue_id];
470   std::string serving_cell_id = pred_ue_data.serving_cell;
471
472   int serving_cell_throughput;
473   int highest_throughput;
474   std::string highest_throughput_cell_id;
475   std::string::size_type str_size;
476
477   cout << "Going through throughtput map:" << endl;
478
479   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
480     cout << map_iter->first << " : " << map_iter->second << endl;    
481     std::string curr_cellid = map_iter->first;
482     cout << "Cell ID is " << curr_cellid;
483     int curr_throughput = stoi(map_iter->second, &str_size);
484     cout << "Throughput is " << curr_throughput << endl;
485
486     if (curr_cellid.compare(serving_cell_id) == 0) {
487       serving_cell_throughput = curr_throughput;
488       highest_throughput = serving_cell_throughput;
489     }
490
491   }
492
493   //Iterating again to identify the highest throughput prediction
494
495   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
496     cout << map_iter->first << " : " << map_iter->second << endl;    
497     std::string curr_cellid = map_iter->first;
498     cout << "Cell ID is " << curr_cellid;
499     int curr_throughput = stoi(map_iter->second, &str_size);
500     cout << "Throughput is " << curr_throughput << endl;
501
502     if (curr_throughput > serving_cell_throughput) {
503       highest_throughput = curr_throughput;
504       highest_throughput_cell_id = curr_cellid;
505     }
506   }
507
508   if (highest_throughput > serving_cell_throughput) {
509     cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
510     cout << "UE ID: " << pred_ue_id << endl;
511     cout << "Source cell " << serving_cell_id << endl;
512     cout << "Target cell " << highest_throughput_cell_id << endl;
513   }
514   
515   
516 }
517
518
519 //This function runs a loop that continuously checks SDL for any UE
520
521 void run_loop() {
522
523   fprintf(stderr, "in run_loop()\n");
524
525   unordered_map<string, UEData> uemap;
526
527   vector<string> prediction_ues;
528
529   while (1) {
530
531     fprintf(stderr, "in while loop\n");
532
533     uemap = get_sdl_ue_data();
534
535     for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
536       string ueid = map_iter->first;
537       UEData data = map_iter->second;
538       if (data.serving_cell_rsrp < rsrp_threshold) {
539         prediction_ues.push_back(ueid);
540       }
541     }
542
543     if (prediction_ues.size() > 0) {
544       send_prediction_request(prediction_ues);
545     }
546
547     sleep(20);
548   }
549 }
550
551
552
553 extern int main( int argc, char** argv ) {
554
555   int nthreads = 1;
556
557   char* port = (char *) "4560";
558
559   sdl = shareddatalayer::SyncStorage::create();
560
561   nsu = Namespace(sdl_namespace_u);
562   nsc = Namespace(sdl_namespace_c);
563
564   
565   fprintf( stderr, "<XAPP> listening on port: %s\n", port );
566   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
567   fprintf(stderr, "code1\n");
568
569   
570   xfw->Add_msg_cb( 20010, policy_callback, NULL );
571   xfw->Add_msg_cb( 30002, prediction_callback, NULL );
572
573   fprintf(stderr, "code2\n");
574   
575   std::thread loop_thread;
576
577   loop_thread = std::thread(&run_loop);
578
579   xfw->Run( nthreads );
580   
581 }