Fix extra nil termination char sent in RMR payload
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*
21         Mnemonic:       ts_xapp.cpp
22         Abstract:       Traffic Steering xApp
23                    1. Receives A1 Policy
24                                2. Receives anomaly detection
25                                3. Requests prediction for UE throughput on current and neighbor cells
26                                4. Receives prediction
27                                5. Optionally exercises Traffic Steering action over E2
28
29         Date:     22 April 2020
30         Author:         Ron Shacham
31
32   Modified: 21 May 2021 (Alexandre Huff)
33             Update for traffic steering use case in release D.
34 */
35
36 #include <stdio.h>
37 #include <string.h>
38 #include <unistd.h>
39
40 #include <thread>
41 #include <iostream>
42 #include <memory>
43
44 #include <set>
45 #include <map>
46 #include <vector>
47 #include <string>
48 #include <unordered_map>
49
50 #include <rapidjson/document.h>
51 #include <rapidjson/writer.h>
52 #include <rapidjson/stringbuffer.h>
53 #include <rapidjson/schema.h>
54 #include <rapidjson/reader.h>
55 #include <rapidjson/prettywriter.h>
56
57 #include <curl/curl.h>
58 #include <rmr/RIC_message_types.h>
59 #include "ricxfcpp/xapp.hpp"
60
61
62 // Defines env name for the endpoint to POST handoff control messages
63 #define ENV_CONTROL_URL "TS_CONTROL_URL"
64
65
66 using namespace rapidjson;
67 using namespace std;
68 using namespace xapp;
69
70 using Namespace = std::string;
71 using Key = std::string;
72 using Data = std::vector<uint8_t>;
73 using DataMap = std::map<Key, Data>;
74 using Keys = std::set<Key>;
75
76
77 // ----------------------------------------------------------
78
79 // Stores the the URL to POST handoff control messages
80 const char *ts_control_url;
81
82 std::unique_ptr<Xapp> xfw;
83
84 int rsrp_threshold = 0;
85
86 /* struct UEData {
87   string serving_cell;
88   int serving_cell_rsrp;
89 }; */
90
91 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
92   unordered_map<string, string> cell_pred;
93   std::string ue_id;
94   bool ue_id_found = false;
95   string curr_key = "";
96   string curr_value = "";
97   int policy_type_id;
98   int policy_instance_id;
99   int threshold;
100   std::string operation;
101   bool found_threshold = false;
102
103   bool Null() { return true; }
104   bool Bool(bool b) { return true; }
105   bool Int(int i) {
106
107     if (curr_key.compare("policy_type_id") == 0) {
108       policy_type_id = i;
109     } else if (curr_key.compare("policy_instance_id") == 0) {
110       policy_instance_id = i;
111     } else if (curr_key.compare("threshold") == 0) {
112       found_threshold = true;
113       threshold = i;
114     }
115
116     return true;
117   }
118   bool Uint(unsigned u) {
119
120     if (curr_key.compare("policy_type_id") == 0) {
121       policy_type_id = u;
122     } else if (curr_key.compare("policy_instance_id") == 0) {
123       policy_instance_id = u;
124     } else if (curr_key.compare("threshold") == 0) {
125       found_threshold = true;
126       threshold = u;
127     }
128
129     return true;
130   }
131   bool Int64(int64_t i) {  return true; }
132   bool Uint64(uint64_t u) {  return true; }
133   bool Double(double d) {  return true; }
134   bool String(const char* str, SizeType length, bool copy) {
135
136     if (curr_key.compare("operation") != 0) {
137       operation = str;
138     }
139
140     return true;
141   }
142   bool StartObject() {
143
144     return true;
145   }
146   bool Key(const char* str, SizeType length, bool copy) {
147
148     curr_key = str;
149
150     return true;
151   }
152   bool EndObject(SizeType memberCount) {  return true; }
153   bool StartArray() {  return true; }
154   bool EndArray(SizeType elementCount) {  return true; }
155
156 };
157
158 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
159   unordered_map<string, int> cell_pred_down;
160   unordered_map<string, int> cell_pred_up;
161   std::string ue_id;
162   bool ue_id_found = false;
163   string curr_key = "";
164   string curr_value = "";
165   string serving_cell_id;
166   bool down_val = true;
167   bool Null() {  return true; }
168   bool Bool(bool b) {  return true; }
169   bool Int(int i) {  return true; }
170   bool Uint(unsigned u) {
171     // Currently, we assume the first cell in the prediction message is the serving cell
172     if ( serving_cell_id.empty() ) {
173       serving_cell_id = curr_key;
174     }
175
176     if (down_val) {
177       cell_pred_down[curr_key] = u;
178       down_val = false;
179     } else {
180       cell_pred_up[curr_key] = u;
181       down_val = true;
182     }
183
184     return true;
185
186   }
187   bool Int64(int64_t i) {  return true; }
188   bool Uint64(uint64_t u) {  return true; }
189   bool Double(double d) {  return true; }
190   bool String(const char* str, SizeType length, bool copy) {
191
192     return true;
193   }
194   bool StartObject() {  return true; }
195   bool Key(const char* str, SizeType length, bool copy) {
196     if (!ue_id_found) {
197
198       ue_id = str;
199       ue_id_found = true;
200     } else {
201       curr_key = str;
202     }
203     return true;
204   }
205   bool EndObject(SizeType memberCount) {  return true; }
206   bool StartArray() {  return true; }
207   bool EndArray(SizeType elementCount) {  return true; }
208 };
209
210 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
211   /*
212     Assuming we receive the following payload from AD
213     [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
214   */
215   vector<string> prediction_ues;
216   string curr_key = "";
217
218   bool Key(const Ch* str, SizeType len, bool copy) {
219     curr_key = str;
220     return true;
221   }
222
223   bool String(const Ch* str, SizeType len, bool copy) {
224     // We are only interested in the "ue-id"
225     if ( curr_key.compare( "ue-id") == 0 ) {
226       prediction_ues.push_back( str );
227     }
228     return true;
229   }
230 };
231
232
233 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
234   unordered_map<string, string> cell_pred;
235   std::string serving_cell_id;
236   int serving_cell_rsrp;
237   int serving_cell_rsrq;
238   int serving_cell_sinr;
239   bool in_serving_array = false;
240   int rf_meas_index = 0;
241
242   bool in_serving_report_object = false;
243
244   string curr_key = "";
245   string curr_value = "";
246   bool Null() { return true; }
247   bool Bool(bool b) { return true; }
248   bool Int(int i) {
249
250     return true;
251   }
252
253   bool Uint(unsigned i) {
254
255     if (in_serving_report_object) {
256       if (curr_key.compare("rsrp") == 0) {
257         serving_cell_rsrp = i;
258       } else if (curr_key.compare("rsrq") == 0) {
259         serving_cell_rsrq = i;
260       } else if (curr_key.compare("rssinr") == 0) {
261         serving_cell_sinr = i;
262       }
263     }
264
265     return true; }
266   bool Int64(int64_t i) {
267
268     return true; }
269   bool Uint64(uint64_t i) {
270
271     return true; }
272   bool Double(double d) { return true; }
273   bool String(const char* str, SizeType length, bool copy) {
274
275     if (curr_key.compare("ServingCellID") == 0) {
276       serving_cell_id = str;
277     }
278
279     return true;
280   }
281   bool StartObject() {
282     if (curr_key.compare("ServingCellRF") == 0) {
283       in_serving_report_object = true;
284     }
285
286     return true; }
287   bool Key(const char* str, SizeType length, bool copy) {
288
289     curr_key = str;
290     return true;
291   }
292   bool EndObject(SizeType memberCount) {
293     if (curr_key.compare("ServingCellRF") == 0) {
294       in_serving_report_object = false;
295     }
296     return true; }
297   bool StartArray() {
298
299     if (curr_key.compare("ServingCellRF") == 0) {
300       in_serving_array = true;
301     }
302
303     return true;
304   }
305   bool EndArray(SizeType elementCount) {
306
307     if (curr_key.compare("servingCellRF") == 0) {
308       in_serving_array = false;
309       rf_meas_index = 0;
310     }
311
312     return true; }
313 }; */
314
315
316 /* unordered_map<string, UEData> get_sdl_ue_data() {
317
318   fprintf(stderr, "In get_sdl_ue_data()\n");
319
320   unordered_map<string, string> ue_data;
321
322   unordered_map<string, UEData> return_ue_data_map;
323
324   std::string prefix3="";
325   Keys K2 = sdl->findKeys(nsu, prefix3);
326   DataMap Dk2 = sdl->get(nsu, K2);
327
328   string ue_json;
329   string ue_id;
330
331   for(auto si=K2.begin();si!=K2.end();++si){
332     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
333     char val[val_v.size()+1];                               // from Data
334     int i;
335
336     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
337     val[i]='\0';
338       ue_id.assign((std::string)*si);
339
340       ue_json.assign(val);
341       ue_data[ue_id] =  ue_json;
342   }
343
344   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
345     UEDataHandler handler;
346     Reader reader;
347     StringStream ss(map_iter->second.c_str());
348     reader.Parse(ss,handler);
349
350     string ueID = map_iter->first;
351     string serving_cell_id = handler.serving_cell_id;
352     int serv_rsrp = handler.serving_cell_rsrp;
353
354     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
355
356   }
357
358   return return_ue_data_map;
359 } */
360
361 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
362
363   int response_to = 0;   // max timeout wating for a response
364   int rmtype;           // received message type
365
366   string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
367
368   cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length="<< len << "\n";
369   cout << "[INFO] Payload is " << arg << endl;
370
371   PolicyHandler handler;
372   Reader reader;
373   StringStream ss(arg.c_str());
374   reader.Parse(ss,handler);
375
376   //Set the threshold value
377   if (handler.found_threshold) {
378     cout << "[INFO] Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
379     rsrp_threshold = handler.threshold;
380   }
381
382   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
383   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
384 }
385
386 // callback to handle handover reply (json http response)
387 size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
388   const size_t totalBytes( size * num );
389   out->append( in, totalBytes );
390   return totalBytes;
391 }
392
393 // sends a handover message through REST
394 void send_handoff_request( string msg ) {
395   CURL *curl = curl_easy_init();
396   curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
397   curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
398   curl_easy_setopt( curl, CURLOPT_POST, 1L );
399   // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
400
401   // response information
402   long httpCode( 0 );
403   unique_ptr<string> httpData( new string() );
404
405   curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
406   curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
407   curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
408
409   struct curl_slist *headers = NULL;  // needs to free this after easy perform
410   headers = curl_slist_append( headers, "Accept: application/json" );
411   headers = curl_slist_append( headers, "Content-Type: application/json" );
412   curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
413
414   cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
415   cout << "[INFO] HandOff request is " << msg << endl;
416
417   // sending request
418   CURLcode res = curl_easy_perform( curl );
419   if( res != CURLE_OK ) {
420     cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
421
422   } else {
423
424     curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
425     if( httpCode == 200 ) {
426       // ============== DO SOMETHING USEFUL HERE ===============
427       // Currently, we only print out the HandOff reply
428       rapidjson::Document document;
429       document.Parse( httpData.get()->c_str() );
430       rapidjson::StringBuffer s;
431             rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
432       document.Accept( writer );
433       cout << "[INFO] HandOff reply is " << s.GetString() << endl;
434
435
436     } else if ( httpCode == 404 ) {
437       cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
438     } else {
439       cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
440               "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
441     }
442
443   }
444
445   curl_slist_free_all( headers );
446   curl_easy_cleanup( curl );
447 }
448
449 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
450
451   time_t now;
452   string str_now;
453   static unsigned int seq_number = 0; // static counter, not thread-safe
454
455   int response_to = 0;   // max timeout wating for a response
456
457   int send_mtype = 0;
458   int rmtype;                                                   // received message type
459   int delay = 1000000;                          // mu-sec delay; default 1s
460
461   string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
462
463   cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
464   cout << "[INFO] Payload is " << json << endl;
465
466   PredictionHandler handler;
467   try {
468     Reader reader;
469     StringStream ss(json.c_str());
470     reader.Parse(ss,handler);
471   } catch (...) {
472     cout << "[ERROR] Got an exception on stringstream read parse\n";
473   }
474
475   // We are only considering download throughput
476   unordered_map<string, int> throughput_map = handler.cell_pred_down;
477
478   // Decision about CONTROL message
479   // (1) Identify UE Id in Prediction message
480   // (2) Iterate through Prediction message.
481   //     If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
482   //     We assume the first cell in the prediction message is the serving cell
483
484   int serving_cell_throughput = 0;
485   int highest_throughput = 0;
486   string highest_throughput_cell_id;
487
488   // Getting the current serving cell throughput prediction
489   auto cell = throughput_map.find( handler.serving_cell_id );
490   serving_cell_throughput = cell->second;
491
492    // Iterating to identify the highest throughput prediction
493   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
494
495     string curr_cellid = map_iter->first;
496     int curr_throughput = map_iter->second;
497
498     if ( highest_throughput < curr_throughput ) {
499       highest_throughput = curr_throughput;
500       highest_throughput_cell_id = curr_cellid;
501     }
502
503   }
504
505   if ( highest_throughput > serving_cell_throughput ) {
506     // building a handoff control message
507     now = time( nullptr );
508     str_now = ctime( &now );
509     str_now.pop_back(); // removing the \n character
510
511     seq_number++;       // static counter, not thread-safe
512
513     rapidjson::StringBuffer s;
514           rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
515     writer.StartObject();
516     writer.Key( "command" );
517     writer.String( "HandOff" );
518     writer.Key( "seqNo" );
519     writer.Int( seq_number );
520     writer.Key( "ue" );
521     writer.String( handler.ue_id.c_str() );
522     writer.Key( "fromCell" );
523     writer.String( handler.serving_cell_id.c_str() );
524     writer.Key( "toCell" );
525     writer.String( highest_throughput_cell_id.c_str() );
526     writer.Key( "timestamp" );
527     writer.String( str_now.c_str() );
528     writer.Key( "reason" );
529     writer.String( "HandOff Control Request from TS xApp" );
530     writer.Key( "ttl" );
531     writer.Int( 10 );
532     writer.EndObject();
533     // creates a message like
534     /* {
535       "command": "HandOff",
536       "seqNo": 1,
537       "ue": "ueid-here",
538       "fromCell": "CID1",
539       "toCell": "CID3",
540       "timestamp": "Sat May 22 10:35:33 2021",
541       "reason": "HandOff Control Request from TS xApp",
542       "ttl": 10
543     } */
544
545     // sending a control request message
546     send_handoff_request( s.GetString() );
547
548   } else {
549     cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
550   }
551
552   // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );       // validate that we can use the same buffer for 2 rts calls
553   // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
554 }
555
556 void send_prediction_request( vector<string> ues_to_predict ) {
557
558   std::unique_ptr<Message> msg;
559   Msg_component payload;                                // special type of unique pointer to the payload
560
561   int sz;
562   int i;
563   size_t plen;
564   Msg_component send_payload;
565
566   msg = xfw->Alloc_msg( 2048 );
567
568   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
569   if( sz < 2048 ) {
570     fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
571     exit( 1 );
572   }
573
574   string ues_list = "[";
575
576   for (int i = 0; i < ues_to_predict.size(); i++) {
577     if (i == ues_to_predict.size() - 1) {
578       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
579     } else {
580       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
581     }
582   }
583
584   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
585
586   send_payload = msg->Get_payload(); // direct access to payload
587   snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
588
589   plen = strlen( (char *)send_payload.get() );
590
591   cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
592
593   // payload updated in place, nothing to copy from, so payload parm is nil
594   if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
595     fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
596   }
597
598 }
599
600 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
601  * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
602  * sends a prediction request to the QP Driver xApp.
603  */
604 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
605   string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
606
607   cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
608   cout << "[INFO] Payload is " << json << "\n";
609
610   AnomalyHandler handler;
611   Reader reader;
612   StringStream ss(json.c_str());
613   reader.Parse(ss,handler);
614
615   // just sending ACK to the AD xApp
616   mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
617
618   // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
619   // if( handler.degradation < rsrp_threshold )
620   send_prediction_request(handler.prediction_ues);
621 }
622
623 extern int main( int argc, char** argv ) {
624
625   int nthreads = 1;
626   char* port = (char *) "4560";
627
628   // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
629   if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
630     cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
631     return 1;
632   }
633
634   fprintf( stderr, "[TS xApp] listening on port %s\n", port );
635   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
636
637   xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL );          // msg type 20010
638   xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL );  // msg type 30002
639   xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
640
641   xfw->Run( nthreads );
642
643 }