TS pred logic
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 Nokia
5         Copyright (c) 2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       ts_xapp.cpp
23         Abstract:       Traffic Steering xApp;
24                                1. Receives A1 Policy
25                                2. Queries SDL to decide which UE to attempt Traffic Steering for
26                                3. Requests prediction for UE throughput on current and neighbor cells
27                                4. Receives prediction
28                                5. Optionally exercises Traffic Steering action over E2
29
30         Date:           22 April 2020
31         Author:         Ron Shacham
32                 
33 */
34
35 #include <stdio.h>
36 #include <string.h>
37 #include <unistd.h>
38
39 #include <thread>
40 #include <iostream>
41 #include <memory>
42
43 #include <sdl/syncstorage.hpp>
44 #include <set>
45 #include <map>
46 #include <vector>
47 #include <string>
48 #include <unordered_map>
49
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55
56
57 #include "ricxfcpp/xapp.hpp"
58
59 using namespace rapidjson;
60 using namespace std;
61 using Namespace = std::string;
62 using Key = std::string;
63 using Data = std::vector<uint8_t>;
64 using DataMap = std::map<Key, Data>;
65 using Keys = std::set<Key>;
66
67
68 // ----------------------------------------------------------
69
70 std::unique_ptr<Xapp> xfw;
71
72 std::string sdl_namespace_u = "TS-UE-metrics";
73 std::string sdl_namespace_c = "TS-cell-metrics";
74
75 int rsrp_threshold = 0;
76
77 std::unique_ptr<shareddatalayer::SyncStorage> sdl;
78
79 Namespace nsu;
80 Namespace nsc;
81
82 struct UEData {
83   string serving_cell;
84   int serving_cell_rsrp;
85
86 };
87
88 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
89   unordered_map<string, string> cell_pred;
90   std::string ue_id;
91   bool ue_id_found = false;
92   string curr_key = "";
93   string curr_value = "";
94   bool Null() { cout << "Null()" << endl; return true; }
95   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
96   bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
97   bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; }
98   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
99   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
100   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
101   bool String(const char* str, SizeType length, bool copy) {
102     cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
103     if (curr_key.compare("") != 0) {
104       cout << "Found throughput\n";
105       curr_value = str;
106       cell_pred[curr_key] = curr_value;
107       curr_key = "";
108       curr_value = "";
109     }
110
111     return true;
112   }
113   bool StartObject() { cout << "StartObject()" << endl; return true; }
114   bool Key(const char* str, SizeType length, bool copy) {
115     cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
116     if (!ue_id_found) {
117       cout << "Found UE ID\n";
118       ue_id = str;
119       ue_id_found = true;
120     } else {
121       curr_key = str;
122     }
123     return true;
124   }
125   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
126   bool StartArray() { cout << "StartArray()" << endl; return true; }
127   bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
128 };
129
130
131 struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
132   unordered_map<string, string> cell_pred;
133   std::string serving_cell_id;
134   int serving_cell_rsrp;
135   int serving_cell_rsrq;
136   int serving_cell_sinr;
137   bool in_serving_array = false;
138   int rf_meas_index = 0;
139
140   string curr_key = "";
141   string curr_value = "";
142   bool Null() { cout << "Null()" << endl; return true; }
143   bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
144   bool Int(int i) {
145     fprintf(stderr, "Int(%d)\n", i);
146     if (in_serving_array) {
147       fprintf(stderr, "we are in serving array\n");
148       switch(rf_meas_index) {
149       case 0:
150         serving_cell_rsrp = i;
151         break;
152       case 1:
153         serving_cell_rsrq = i;
154         break;
155       case 2:
156         serving_cell_sinr = i;
157         break;
158       }
159       rf_meas_index++;
160     }
161     return true;
162   }
163   bool Uint(unsigned u) {
164     fprintf(stderr, "Int(%d)\n", u); return true; }
165   bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
166   bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
167   bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
168   bool String(const char* str, SizeType length, bool copy) {
169     fprintf(stderr,"String(%s)\n", str);
170     if (curr_key.compare("ServingCellID") == 0) {
171       serving_cell_id = str;
172     } 
173
174     return true;
175   }
176   bool StartObject() { cout << "StartObject()" << endl; return true; }
177   bool Key(const char* str, SizeType length, bool copy) {
178     fprintf(stderr,"Key(%s)\n", str);
179     curr_key = str;
180     return true;
181   }
182   bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
183   bool StartArray() {
184     fprintf(stderr,"StartArray()");
185     if (curr_key.compare("ServingCellRF") == 0) {
186       in_serving_array = true;
187     }
188     
189     return true;
190   }
191   bool EndArray(SizeType elementCount) {
192     fprintf(stderr, "EndArray()\n");
193     if (curr_key.compare("servingCellRF") == 0) {
194       in_serving_array = false;
195       rf_meas_index = 0;
196     }
197
198     return true; }
199 };
200
201
202 unordered_map<string, UEData> get_sdl_ue_data() {
203
204   fprintf(stderr, "In get_sdl_ue_data()\n");
205
206   unordered_map<string, string> ue_data;
207
208   unordered_map<string, UEData> return_ue_data_map;
209     
210   std::string prefix3="12";
211   Keys K2 = sdl->findKeys(nsu, prefix3);
212   DataMap Dk2 = sdl->get(nsu, K2);
213   
214   string ue_json;
215   string ue_id;
216   
217   for(auto si=K2.begin();si!=K2.end();++si){
218     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
219     char val[val_v.size()+1];                               // from Data
220     int i;
221     fprintf(stderr, "val size %d\n", val_v.size());
222     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
223     val[i]='\0';
224       ue_id.assign((std::string)*si);
225       
226       ue_json.assign(val);
227       ue_data[ue_id] =  ue_json;
228   }
229   
230   fprintf(stderr, "after sdl get of ue data\n");
231   
232   fprintf(stderr, "From UE data map\n");
233   
234   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
235     UEDataHandler handler;
236     Reader reader;
237     StringStream ss(map_iter->second.c_str());
238     reader.Parse(ss,handler);
239
240     string ueID = map_iter->first;
241     string serving_cell_id = handler.serving_cell_id;
242     int serv_rsrp = handler.serving_cell_rsrp;
243     
244     fprintf(stderr,"UE data for %s\n", ueID.c_str());
245     fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
246     fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
247
248     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
249     
250   }
251   
252   fprintf(stderr, "\n");
253   return return_ue_data_map;
254 }
255
256 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
257
258   int response_to = 0;   // max timeout wating for a response
259   int rmtype;           // received message type
260
261   
262   fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
263   fprintf(stderr, "payload is %s\n", payload.get());
264   
265   //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
266   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
267   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
268
269   //Set the threshold value
270
271   
272 }
273
274 void send_prediction_request(vector<string> ues_to_predict) {
275
276   std::unique_ptr<Message> msg;
277   Msg_component payload;                                // special type of unique pointer to the payload
278   
279   int nthreads = 1;  
280   int response_to = 0;   // max timeout wating for a response
281   int mtype = 30000;
282   int sz;
283   int i;
284   Msg_component send_payload;
285   
286   fprintf(stderr, "cb 1\n");
287
288   msg = xfw->Alloc_msg( 2048 );
289   
290   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
291   if( sz < 2048 ) {
292     fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
293     exit( 1 );
294   }
295
296   fprintf(stderr, "cb 2");
297
298   string ues_list = "[";
299
300   for (int i = 0; i < ues_to_predict.size(); i++) {
301     if (i == ues_to_predict.size() - 1) {
302       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
303     } else {
304       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
305     }
306   }
307
308   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
309
310   const char *body = message_body.c_str();
311
312   //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
313   
314   send_payload = msg->Get_payload(); // direct access to payload
315   //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
316   //  snprintf( (char *) send_payload.get(), 2048, body);
317   snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
318
319   fprintf(stderr, "message body %s\n", send_payload.get());
320   
321   fprintf(stderr, "cb 3");
322   fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
323   
324   // payload updated in place, nothing to copy from, so payload parm is nil
325   if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
326     fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
327   }
328
329   fprintf(stderr, "cb 4");
330
331   /*
332   msg = xfw->Receive( response_to );
333   if( msg != NULL ) {
334     rmtype = msg->Get_mtype();
335     send_payload = msg->Get_payload();
336     fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
337   } 
338   */
339
340 }
341
342 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
343
344   long now;
345   long total_count;
346
347   int sz;
348   int i;
349
350   int response_to = 0;   // max timeout wating for a response
351
352   int send_mtype = 0;
353   int rmtype;                                                   // received message type
354   int delay = 1000000;                          // mu-sec delay; default 1s
355
356   fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
357   fprintf(stderr, "payload is %s\n", payload.get());
358   
359   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
360   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
361
362   mtype = 0;
363
364   fprintf(stderr, "cb 1\n");
365
366   char *incoming_msg = "{\"12345\": {\"222\": \"20000\", \"333\" : \"50000\"} }";
367
368   PredictionHandler handler;
369   Reader reader;
370   StringStream ss(incoming_msg);
371   reader.Parse(ss,handler);
372
373   std::string pred_ue_id = handler.ue_id;
374
375   cout << "Prediction for " << pred_ue_id << endl;
376
377   unordered_map<string, string> throughput_map = handler.cell_pred;
378
379
380   cout << endl;
381  
382   unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
383
384   //Decision about CONTROL message
385   //(1) Identify UE Id in Prediction message
386   //(2) Get UEData struct for this UE Id
387   //(3) Identify the UE's service cell ID
388   //(4) Iterate through Prediction message.
389   //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
390
391   UEData pred_ue_data = sdl_data[pred_ue_id];
392   std::string serving_cell_id = pred_ue_data.serving_cell;
393
394   int serving_cell_throughput;
395   int highest_throughput;
396   std::string highest_throughput_cell_id;
397   std::string::size_type str_size;
398
399   cout << "Going through throughtput map:" << endl;
400
401   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
402     cout << map_iter->first << " : " << map_iter->second << endl;    
403     std::string curr_cellid = map_iter->first;
404     cout << "Cell ID is " << curr_cellid;
405     int curr_throughput = stoi(map_iter->second, &str_size);
406     cout << "Throughput is " << curr_throughput << endl;
407
408     if (curr_cellid.compare(serving_cell_id) == 0) {
409       serving_cell_throughput = curr_throughput;
410       highest_throughput = serving_cell_throughput;
411     }
412
413   }
414
415   //Iterating again to identify the highest throughput prediction
416
417   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
418     cout << map_iter->first << " : " << map_iter->second << endl;    
419     std::string curr_cellid = map_iter->first;
420     cout << "Cell ID is " << curr_cellid;
421     int curr_throughput = stoi(map_iter->second, &str_size);
422     cout << "Throughput is " << curr_throughput << endl;
423
424     if (curr_throughput > serving_cell_throughput) {
425       highest_throughput = curr_throughput;
426       highest_throughput_cell_id = curr_cellid;
427     }
428   }
429
430   if (highest_throughput > serving_cell_throughput) {
431     cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
432     cout << "UE ID: " << pred_ue_id << endl;
433     cout << "Source cell " << serving_cell_id << endl;
434     cout << "Target cell " << highest_throughput_cell_id << endl;
435   }
436   
437   
438 }
439
440
441 //This function runs a loop that continuously checks SDL for any UE
442
443 void run_loop() {
444
445   fprintf(stderr, "in run_loop()\n");
446
447   unordered_map<string, UEData> uemap;
448
449   vector<string> prediction_ues;
450
451   while (1) {
452
453     fprintf(stderr, "in while loop\n");
454
455     uemap = get_sdl_ue_data();
456
457     for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
458       string ueid = map_iter->first;
459       UEData data = map_iter->second;
460       if (data.serving_cell_rsrp < rsrp_threshold) {
461         prediction_ues.push_back(ueid);
462       }
463     }
464
465     if (prediction_ues.size() > 0) {
466       send_prediction_request(prediction_ues);
467     }
468
469     sleep(20);
470   }
471 }
472
473
474
475 extern int main( int argc, char** argv ) {
476
477   int nthreads = 1;
478
479   char* port = (char *) "4560";
480
481   sdl = shareddatalayer::SyncStorage::create();
482
483   nsu = Namespace(sdl_namespace_u);
484   nsc = Namespace(sdl_namespace_c);
485   
486   
487   fprintf( stderr, "<XAPP> listening on port: %s\n", port );
488   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
489   fprintf(stderr, "code1\n");
490
491   
492   xfw->Add_msg_cb( 20010, policy_callback, NULL );
493   xfw->Add_msg_cb( 30002, prediction_callback, NULL );
494
495   fprintf(stderr, "code2\n");
496   
497   std::thread loop_thread;
498
499   loop_thread = std::thread(&run_loop);
500
501   xfw->Run( nthreads );
502   
503 }