Update TS xApp for Release D use case
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*
21         Mnemonic:       ts_xapp.cpp
22         Abstract:       Traffic Steering xApp
23                    1. Receives A1 Policy
24                                2. Receives anomaly detection
25                                3. Requests prediction for UE throughput on current and neighbor cells
26                                4. Receives prediction
27                                5. Optionally exercises Traffic Steering action over E2
28
29         Date:     22 April 2020
30         Author:         Ron Shacham
31
32   Modified: 21 May 2021 (Alexandre Huff)
33             Update for traffic steering use case in release D.
34 */
35
36 #include <stdio.h>
37 #include <string.h>
38 #include <unistd.h>
39
40 #include <thread>
41 #include <iostream>
42 #include <memory>
43
44 #include <set>
45 #include <map>
46 #include <vector>
47 #include <string>
48 #include <unordered_map>
49
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55 #include <rapidjson/prettywriter.h>
56
57 #include <curl/curl.h>
58 #include <rmr/RIC_message_types.h>
59 #include "ricxfcpp/xapp.hpp"
60
61
62 // Defines env name for the endpoint to POST handoff control messages
63 #define ENV_CONTROL_URL "TS_CONTROL_URL"
64
65
66 using namespace rapidjson;
67 using namespace std;
68 using namespace xapp;
69
70 using Namespace = std::string;
71 using Key = std::string;
72 using Data = std::vector<uint8_t>;
73 using DataMap = std::map<Key, Data>;
74 using Keys = std::set<Key>;
75
76
77 // ----------------------------------------------------------
78
79 // Stores the the URL to POST handoff control messages
80 const char *ts_control_url;
81
82 std::unique_ptr<Xapp> xfw;
83
84 int rsrp_threshold = 0;
85
86 /* struct UEData {
87   string serving_cell;
88   int serving_cell_rsrp;
89 }; */
90
91 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
92   unordered_map<string, string> cell_pred;
93   std::string ue_id;
94   bool ue_id_found = false;
95   string curr_key = "";
96   string curr_value = "";
97   int policy_type_id;
98   int policy_instance_id;
99   int threshold;
100   std::string operation;
101   bool found_threshold = false;
102
103   bool Null() { return true; }
104   bool Bool(bool b) { return true; }
105   bool Int(int i) {
106
107     if (curr_key.compare("policy_type_id") == 0) {
108       policy_type_id = i;
109     } else if (curr_key.compare("policy_instance_id") == 0) {
110       policy_instance_id = i;
111     } else if (curr_key.compare("threshold") == 0) {
112       found_threshold = true;
113       threshold = i;
114     }
115
116     return true;
117   }
118   bool Uint(unsigned u) {
119
120     if (curr_key.compare("policy_type_id") == 0) {
121       policy_type_id = u;
122     } else if (curr_key.compare("policy_instance_id") == 0) {
123       policy_instance_id = u;
124     } else if (curr_key.compare("threshold") == 0) {
125       found_threshold = true;
126       threshold = u;
127     }
128
129     return true;
130   }
131   bool Int64(int64_t i) {  return true; }
132   bool Uint64(uint64_t u) {  return true; }
133   bool Double(double d) {  return true; }
134   bool String(const char* str, SizeType length, bool copy) {
135
136     if (curr_key.compare("operation") != 0) {
137       operation = str;
138     }
139
140     return true;
141   }
142   bool StartObject() {
143
144     return true;
145   }
146   bool Key(const char* str, SizeType length, bool copy) {
147
148     curr_key = str;
149
150     return true;
151   }
152   bool EndObject(SizeType memberCount) {  return true; }
153   bool StartArray() {  return true; }
154   bool EndArray(SizeType elementCount) {  return true; }
155
156 };
157
158 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
159   unordered_map<string, int> cell_pred_down;
160   unordered_map<string, int> cell_pred_up;
161   std::string ue_id;
162   bool ue_id_found = false;
163   string curr_key = "";
164   string curr_value = "";
165   string serving_cell_id;
166   bool down_val = true;
167   bool Null() {  return true; }
168   bool Bool(bool b) {  return true; }
169   bool Int(int i) {  return true; }
170   bool Uint(unsigned u) {
171     // Currently, we assume the first cell in the prediction message is the serving cell
172     if ( serving_cell_id.empty() ) {
173       serving_cell_id = curr_key;
174     }
175
176     if (down_val) {
177       cell_pred_down[curr_key] = u;
178       down_val = false;
179     } else {
180       cell_pred_up[curr_key] = u;
181       down_val = true;
182     }
183
184     return true;
185
186   }
187   bool Int64(int64_t i) {  return true; }
188   bool Uint64(uint64_t u) {  return true; }
189   bool Double(double d) {  return true; }
190   bool String(const char* str, SizeType length, bool copy) {
191
192     return true;
193   }
194   bool StartObject() {  return true; }
195   bool Key(const char* str, SizeType length, bool copy) {
196     if (!ue_id_found) {
197
198       ue_id = str;
199       ue_id_found = true;
200     } else {
201       curr_key = str;
202     }
203     return true;
204   }
205   bool EndObject(SizeType memberCount) {  return true; }
206   bool StartArray() {  return true; }
207   bool EndArray(SizeType elementCount) {  return true; }
208 };
209
210 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
211   /*
212     Assuming we receive the following payload from AD
213     [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
214   */
215   vector<string> prediction_ues;
216   string curr_key = "";
217
218   bool Key(const Ch* str, SizeType len, bool copy) {
219     curr_key = str;
220     return true;
221   }
222
223   bool String(const Ch* str, SizeType len, bool copy) {
224     // We are only interested in the "ue-id"
225     if ( curr_key.compare( "ue-id") == 0 ) {
226       prediction_ues.push_back( str );
227     }
228     return true;
229   }
230 };
231
232
233 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
234   unordered_map<string, string> cell_pred;
235   std::string serving_cell_id;
236   int serving_cell_rsrp;
237   int serving_cell_rsrq;
238   int serving_cell_sinr;
239   bool in_serving_array = false;
240   int rf_meas_index = 0;
241
242   bool in_serving_report_object = false;
243
244   string curr_key = "";
245   string curr_value = "";
246   bool Null() { return true; }
247   bool Bool(bool b) { return true; }
248   bool Int(int i) {
249
250     return true;
251   }
252
253   bool Uint(unsigned i) {
254
255     if (in_serving_report_object) {
256       if (curr_key.compare("rsrp") == 0) {
257         serving_cell_rsrp = i;
258       } else if (curr_key.compare("rsrq") == 0) {
259         serving_cell_rsrq = i;
260       } else if (curr_key.compare("rssinr") == 0) {
261         serving_cell_sinr = i;
262       }
263     }
264
265     return true; }
266   bool Int64(int64_t i) {
267
268     return true; }
269   bool Uint64(uint64_t i) {
270
271     return true; }
272   bool Double(double d) { return true; }
273   bool String(const char* str, SizeType length, bool copy) {
274
275     if (curr_key.compare("ServingCellID") == 0) {
276       serving_cell_id = str;
277     }
278
279     return true;
280   }
281   bool StartObject() {
282     if (curr_key.compare("ServingCellRF") == 0) {
283       in_serving_report_object = true;
284     }
285
286     return true; }
287   bool Key(const char* str, SizeType length, bool copy) {
288
289     curr_key = str;
290     return true;
291   }
292   bool EndObject(SizeType memberCount) {
293     if (curr_key.compare("ServingCellRF") == 0) {
294       in_serving_report_object = false;
295     }
296     return true; }
297   bool StartArray() {
298
299     if (curr_key.compare("ServingCellRF") == 0) {
300       in_serving_array = true;
301     }
302
303     return true;
304   }
305   bool EndArray(SizeType elementCount) {
306
307     if (curr_key.compare("servingCellRF") == 0) {
308       in_serving_array = false;
309       rf_meas_index = 0;
310     }
311
312     return true; }
313 }; */
314
315
316 /* unordered_map<string, UEData> get_sdl_ue_data() {
317
318   fprintf(stderr, "In get_sdl_ue_data()\n");
319
320   unordered_map<string, string> ue_data;
321
322   unordered_map<string, UEData> return_ue_data_map;
323
324   std::string prefix3="";
325   Keys K2 = sdl->findKeys(nsu, prefix3);
326   DataMap Dk2 = sdl->get(nsu, K2);
327
328   string ue_json;
329   string ue_id;
330
331   for(auto si=K2.begin();si!=K2.end();++si){
332     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
333     char val[val_v.size()+1];                               // from Data
334     int i;
335
336     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
337     val[i]='\0';
338       ue_id.assign((std::string)*si);
339
340       ue_json.assign(val);
341       ue_data[ue_id] =  ue_json;
342   }
343
344   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
345     UEDataHandler handler;
346     Reader reader;
347     StringStream ss(map_iter->second.c_str());
348     reader.Parse(ss,handler);
349
350     string ueID = map_iter->first;
351     string serving_cell_id = handler.serving_cell_id;
352     int serv_rsrp = handler.serving_cell_rsrp;
353
354     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
355
356   }
357
358   return return_ue_data_map;
359 } */
360
361 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
362
363   int response_to = 0;   // max timeout wating for a response
364   int rmtype;           // received message type
365
366   fprintf(stderr, "[INFO] Policy Callback got a message, type=%d, length=%d\n", mtype, len);
367
368   const char *arg = (const char*)payload.get();
369
370   fprintf(stderr, "[INFO] Payload is %s\n", arg);
371
372   PolicyHandler handler;
373   Reader reader;
374   StringStream ss(arg);
375   reader.Parse(ss,handler);
376
377   //Set the threshold value
378   if (handler.found_threshold) {
379     fprintf(stderr, "[INFO] Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
380     rsrp_threshold = handler.threshold;
381   }
382
383   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
384   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
385 }
386
387 // callback to handle handover reply (json http response)
388 size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
389   const size_t totalBytes( size * num );
390   out->append( in, totalBytes );
391   return totalBytes;
392 }
393
394 // sends a handover message through REST
395 void send_handoff_request( string msg ) {
396   CURL *curl = curl_easy_init();
397   curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
398   curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
399   curl_easy_setopt( curl, CURLOPT_POST, 1L );
400   // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
401
402   // response information
403   long httpCode( 0 );
404   unique_ptr<string> httpData( new string() );
405
406   curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
407   curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
408   curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
409
410   struct curl_slist *headers = NULL;  // needs to free this after easy perform
411   headers = curl_slist_append( headers, "Accept: application/json" );
412   headers = curl_slist_append( headers, "Content-Type: application/json" );
413   curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
414
415   cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
416   cout << "[INFO] HandOff request is " << msg << endl;
417
418   // sending request
419   CURLcode res = curl_easy_perform( curl );
420   if( res != CURLE_OK ) {
421     cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
422
423   } else {
424
425     curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
426     if( httpCode == 200 ) {
427       // ============== DO SOMETHING USEFUL HERE ===============
428       // Currently, we only print out the HandOff reply
429       rapidjson::Document document;
430       document.Parse( httpData.get()->c_str() );
431       rapidjson::StringBuffer s;
432             rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
433       document.Accept( writer );
434       cout << "[INFO] HandOff reply is " << s.GetString() << endl;
435
436
437     } else if ( httpCode == 404 ) {
438       cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
439     } else {
440       cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
441               "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
442     }
443
444   }
445
446   curl_slist_free_all( headers );
447   curl_easy_cleanup( curl );
448 }
449
450 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
451
452   time_t now;
453   string str_now;
454   static unsigned int seq_number = 0; // static counter, not thread-safe
455
456   int response_to = 0;   // max timeout wating for a response
457
458   int send_mtype = 0;
459   int rmtype;                                                   // received message type
460   int delay = 1000000;                          // mu-sec delay; default 1s
461
462   cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
463   cout << "[INFO] Payload is " << payload.get() << endl;
464
465   const char* arg = (const char*)payload.get();
466   PredictionHandler handler;
467
468   try {
469     Reader reader;
470     StringStream ss(arg);
471     reader.Parse(ss,handler);
472   } catch (...) {
473     cout << "[ERROR] Got an exception on stringstream read parse\n";
474   }
475
476   // We are only considering download throughput
477   unordered_map<string, int> throughput_map = handler.cell_pred_down;
478
479   // Decision about CONTROL message
480   // (1) Identify UE Id in Prediction message
481   // (2) Iterate through Prediction message.
482   //     If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
483   //     We assume the first cell in the prediction message is the serving cell
484
485   int serving_cell_throughput = 0;
486   int highest_throughput = 0;
487   string highest_throughput_cell_id;
488
489   // Getting the current serving cell throughput prediction
490   auto cell = throughput_map.find( handler.serving_cell_id );
491   serving_cell_throughput = cell->second;
492
493    // Iterating to identify the highest throughput prediction
494   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
495
496     string curr_cellid = map_iter->first;
497     int curr_throughput = map_iter->second;
498
499     if ( highest_throughput < curr_throughput ) {
500       highest_throughput = curr_throughput;
501       highest_throughput_cell_id = curr_cellid;
502     }
503
504   }
505
506   if ( highest_throughput > serving_cell_throughput ) {
507     // building a handoff control message
508     now = time( nullptr );
509     str_now = ctime( &now );
510     str_now.pop_back(); // removing the \n character
511
512     seq_number++;       // static counter, not thread-safe
513
514     rapidjson::StringBuffer s;
515           rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
516     writer.StartObject();
517     writer.Key( "command" );
518     writer.String( "HandOff" );
519     writer.Key( "seqNo" );
520     writer.Int( seq_number );
521     writer.Key( "ue" );
522     writer.String( handler.ue_id.c_str() );
523     writer.Key( "fromCell" );
524     writer.String( handler.serving_cell_id.c_str() );
525     writer.Key( "toCell" );
526     writer.String( highest_throughput_cell_id.c_str() );
527     writer.Key( "timestamp" );
528     writer.String( str_now.c_str() );
529     writer.Key( "reason" );
530     writer.String( "HandOff Control Request from TS xApp" );
531     writer.Key( "ttl" );
532     writer.Int( 10 );
533     writer.EndObject();
534     // creates a message like
535     /* {
536       "command": "HandOff",
537       "seqNo": 1,
538       "ue": "ueid-here",
539       "fromCell": "CID1",
540       "toCell": "CID3",
541       "timestamp": "Sat May 22 10:35:33 2021",
542       "reason": "HandOff Control Request from TS xApp",
543       "ttl": 10
544     } */
545
546     // sending a control request message
547     send_handoff_request( s.GetString() );
548
549   } else {
550     cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
551   }
552
553   // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );       // validate that we can use the same buffer for 2 rts calls
554   // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
555 }
556
557 void send_prediction_request( vector<string> ues_to_predict ) {
558
559   std::unique_ptr<Message> msg;
560   Msg_component payload;                                // special type of unique pointer to the payload
561
562   int sz;
563   int i;
564   size_t plen;
565   Msg_component send_payload;
566
567   msg = xfw->Alloc_msg( 2048 );
568
569   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
570   if( sz < 2048 ) {
571     fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
572     exit( 1 );
573   }
574
575   string ues_list = "[";
576
577   for (int i = 0; i < ues_to_predict.size(); i++) {
578     if (i == ues_to_predict.size() - 1) {
579       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
580     } else {
581       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
582     }
583   }
584
585   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
586
587   const char *body = message_body.c_str();
588
589   send_payload = msg->Get_payload(); // direct access to payload
590   snprintf( (char *) send_payload.get(), 2048, "%s", body );
591
592   /*
593     we are sending a string, so we have to include the nil byte in the RMR message
594     to keep things simple in the receiver side
595    */
596   plen = strlen( (char *) send_payload.get() ) + 1;
597
598   cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
599
600   // payload updated in place, nothing to copy from, so payload parm is nil
601   if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
602     fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
603   }
604
605 }
606
607 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
608  * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
609  * sends a prediction request to the QP Driver xApp.
610  */
611 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
612   const char *json = (const char *) payload.get();
613
614   cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
615   cout << "[INFO] Payload is " << json << "\n";
616
617   AnomalyHandler handler;
618   Reader reader;
619   StringStream ss(json);
620   reader.Parse(ss,handler);
621
622   // just sending ACK to the AD xApp
623   mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
624
625   // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
626   // if( handler.degradation < rsrp_threshold )
627   send_prediction_request(handler.prediction_ues);
628 }
629
630 extern int main( int argc, char** argv ) {
631
632   int nthreads = 1;
633   char* port = (char *) "4560";
634
635   // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
636   if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
637     cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
638     return 1;
639   }
640
641   fprintf( stderr, "[TS xApp] listening on port %s\n", port );
642   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
643
644   xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL );          // msg type 20010
645   xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL );  // msg type 30002
646   xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
647
648   xfw->Run( nthreads );
649
650 }