[ISSUE-ID] : RICAPP-202 upgrading protofile and modified NodebHandler to build CELL...
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*
21         Mnemonic:       ts_xapp.cpp
22         Abstract:       Traffic Steering xApp
23                    1. Receives A1 Policy
24                                2. Receives anomaly detection
25                                3. Requests prediction for UE throughput on current and neighbor cells
26                                4. Receives prediction
27                                5. Optionally exercises Traffic Steering action over E2
28
29         Date:     22 April 2020
30         Author:         Ron Shacham
31
32   Modified: 21 May 2021 (Alexandre Huff)
33             Update for traffic steering use case in release D.
34             07 Dec 2021 (Alexandre Huff)
35             Update for traffic steering use case in release E.
36 */
37
38 #include <stdio.h>
39 #include <string.h>
40 #include <unistd.h>
41
42 #include <thread>
43 #include <iostream>
44 #include <memory>
45 #include <algorithm>
46 #include <set>
47 #include <map>
48 #include <vector>
49 #include <string>
50 #include <unordered_map>
51 #include<deque>
52 #include <rapidjson/document.h>
53 #include <rapidjson/writer.h>
54 #include <rapidjson/stringbuffer.h>
55 #include <rapidjson/schema.h>
56 #include <rapidjson/reader.h>
57 #include <rapidjson/prettywriter.h>
58
59 #include <rmr/RIC_message_types.h>
60 #include <ricxfcpp/xapp.hpp>
61 #include <ricxfcpp/config.hpp>
62 #include<sstream>
63
64 /*
65   FIXME unfortunately this RMR flag has to be disabled
66   due to name resolution conflicts.
67   RC xApp defines the same name for gRPC control messages.
68 */
69 #undef RIC_CONTROL_ACK
70
71 #include <grpc/grpc.h>
72 #include <grpcpp/channel.h>
73 #include <grpcpp/client_context.h>
74 #include <grpcpp/create_channel.h>
75 #include <grpcpp/security/credentials.h>
76 #include "protobuf/rc.grpc.pb.h"
77
78 #include "utils/restclient.hpp"
79
80
81 using namespace rapidjson;
82 using namespace std;
83 using namespace xapp;
84
85 using Namespace = std::string;
86 using Key = std::string;
87 using Data = std::vector<uint8_t>;
88 using DataMap = std::map<Key, Data>;
89 using Keys = std::set<Key>;
90
91
92 // ----------------------------------------------------------
93 std::unique_ptr<Xapp> xfw;
94 std::unique_ptr<rc::MsgComm::Stub> rc_stub;
95
96 int downlink_threshold = 0;  // A1 policy type 20008 (in percentage)
97
98 // scoped enum to identify which API is used to send control messages
99 enum class TsControlApi { REST, gRPC };
100 TsControlApi ts_control_api;  // api to send control messages
101 string ts_control_ep;         // api target endpoint
102
103 typedef struct nodeb {
104   string ran_name;
105   struct {
106     string plmn_id;
107     string nb_id;
108   } global_nb_id;
109 } nodeb_t;
110
111 unordered_map<string, shared_ptr<nodeb_t>> cell_map; // maps each cell to its nodeb
112
113 /* struct UEData {
114   string serving_cell;
115   int serving_cell_rsrp;
116 }; */
117
118
119 //https://stackoverflow.com/a/34571089/15098882
120
121 static std::string base64_decode(const std::string &in) {
122
123         std::string out;
124
125         std::vector<int> T(256, -1);
126         for (int i = 0; i < 64; i++) T["ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[i]] = i;
127
128         int val = 0, valb = -8;
129         for (unsigned char c : in) {
130                 if (T[c] == -1) break;
131                 val = (val << 6) + T[c];
132                 valb += 6;
133                 if (valb >= 0) {
134                         out.push_back(char((val >> valb) & 0xFF));
135                         valb -= 8;
136                 }
137         }
138         return out;
139 }
140
141 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
142   /*
143     Assuming we receive the following payload from A1 Mediator
144     {"operation": "CREATE", "policy_type_id": 20008, "policy_instance_id": "tsapolicy145", "payload": {"threshold": 5}}
145   */
146   unordered_map<string, string> cell_pred;
147   std::string ue_id;
148   bool ue_id_found = false;
149   string curr_key = "";
150   string curr_value = "";
151   int policy_type_id;
152   int policy_instance_id;
153   int threshold;
154   std::string operation;
155   bool found_threshold = false;
156
157   bool Null() { return true; }
158   bool Bool(bool b) { return true; }
159   bool Int(int i) {
160
161     if (curr_key.compare("policy_type_id") == 0) {
162       policy_type_id = i;
163     } else if (curr_key.compare("policy_instance_id") == 0) {
164       policy_instance_id = i;
165     } else if (curr_key.compare("threshold") == 0) {
166       found_threshold = true;
167       threshold = i;
168     }
169
170     return true;
171   }
172   bool Uint(unsigned u) {
173
174     if (curr_key.compare("policy_type_id") == 0) {
175       policy_type_id = u;
176     } else if (curr_key.compare("policy_instance_id") == 0) {
177       policy_instance_id = u;
178     } else if (curr_key.compare("threshold") == 0) {
179       found_threshold = true;
180       threshold = u;
181     }
182
183     return true;
184   }
185   bool Int64(int64_t i) {  return true; }
186   bool Uint64(uint64_t u) {  return true; }
187   bool Double(double d) {  return true; }
188   bool String(const char* str, SizeType length, bool copy) {
189
190     if (curr_key.compare("operation") != 0) {
191       operation = str;
192     }
193
194     return true;
195   }
196   bool StartObject() {
197
198     return true;
199   }
200   bool Key(const char* str, SizeType length, bool copy) {
201
202     curr_key = str;
203
204     return true;
205   }
206   bool EndObject(SizeType memberCount) {  return true; }
207   bool StartArray() {  return true; }
208   bool EndArray(SizeType elementCount) {  return true; }
209
210 };
211
212 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
213   unordered_map<string, int> cell_pred_down;
214   unordered_map<string, int> cell_pred_up;
215   std::string ue_id;
216   bool ue_id_found = false;
217   string curr_key = "";
218   string curr_value = "";
219   string serving_cell_id;
220   bool down_val = true;
221   bool Null() {  return true; }
222   bool Bool(bool b) {  return true; }
223   bool Int(int i) {  return true; }
224   bool Uint(unsigned u) {
225     // Currently, we assume the first cell in the prediction message is the serving cell
226     if ( serving_cell_id.empty() ) {
227       serving_cell_id = curr_key;
228     }
229
230     if (down_val) {
231       cell_pred_down[curr_key] = u;
232       down_val = false;
233     } else {
234       cell_pred_up[curr_key] = u;
235       down_val = true;
236     }
237
238     return true;
239
240   }
241   bool Int64(int64_t i) {  return true; }
242   bool Uint64(uint64_t u) {  return true; }
243   bool Double(double d) {  return true; }
244   bool String(const char* str, SizeType length, bool copy) {
245
246     return true;
247   }
248   bool StartObject() {  return true; }
249   bool Key(const char* str, SizeType length, bool copy) {
250     if (!ue_id_found) {
251
252       ue_id = str;
253       ue_id_found = true;
254     } else {
255       curr_key = str;
256     }
257     return true;
258   }
259   bool EndObject(SizeType memberCount) {  return true; }
260   bool StartArray() {  return true; }
261   bool EndArray(SizeType elementCount) {  return true; }
262 };
263
264 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
265   /*
266     Assuming we receive the following payload from AD
267     [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
268   */
269   vector<string> prediction_ues;
270   string curr_key = "";
271
272   bool Key(const Ch* str, SizeType len, bool copy) {
273     curr_key = str;
274     return true;
275   }
276
277   bool String(const Ch* str, SizeType len, bool copy) {
278     // We are only interested in the "ue-id"
279     if ( curr_key.compare( "ue-id") == 0 ) {
280       prediction_ues.push_back( str );
281     }
282     return true;
283   }
284 };
285
286 struct NodebListHandler : public BaseReaderHandler<UTF8<>, NodebListHandler> {
287   vector<string> nodeb_list;
288   string curr_key = "";
289
290   bool Key(const Ch* str, SizeType length, bool copy) {
291     curr_key = str;
292     return true;
293   }
294
295   bool String(const Ch* str, SizeType length, bool copy) {
296     if( curr_key.compare( "inventoryName" ) == 0 ) {
297       nodeb_list.push_back( str );
298     }
299     return true;
300   }
301 };
302
303 struct NodebHandler : public BaseReaderHandler<UTF8<>, NodebHandler> {
304         string curr_key = "";
305         shared_ptr<nodeb_t> nodeb = make_shared<nodeb_t>();
306         std::string meid;
307         std::vector<string> cells;
308
309         bool Key(const Ch* str, SizeType length, bool copy) {
310                 curr_key = str;
311                 return true;
312         }
313
314         bool String(const Ch* str, SizeType length, bool copy) {
315
316                 if (curr_key.compare("ranName") == 0) {
317                         //std::cout << str << "\n";
318                         nodeb->ran_name = str;
319                         meid= str;
320                         //std::cout << "\n meid = " << meid;
321
322                 }
323                 else if (curr_key.compare("plmnId") == 0) {
324                         //std::cout << str << "\n";
325                         nodeb->global_nb_id.plmn_id = str;
326                 }
327                 else if (curr_key.compare("nbId") == 0) {
328                         //std::cout <<str<< "\n";
329                         nodeb->global_nb_id.nb_id = str;
330                 }
331                 else if (curr_key.compare("e2nodeComponentRequestPart") == 0) {
332                         //std::cout << str<<"\n";
333                         auto message = base64_decode(str);
334                         //std::cout << message<<"\n";
335                         int len = meid.length();
336                         //std::cout << "\n meid = " << meid;
337                         int counter = 0;
338                                 for (int i = 0; i <len; i++ ){
339                                         if (meid[i] == '_') {
340                                                 counter++;
341                                         }
342                                         if( counter == 3) {
343                                                 counter = i + 1;
344                                                 break;
345                                         }
346                                 }
347                                 std::string last_matching_bits = meid.substr(counter, meid.length());
348                                 len = last_matching_bits.size();
349                                 char b;
350
351                                 for (int i = 0; i < len; i++) {
352                                         b = last_matching_bits[i];
353                                         b = toupper(b);
354                                         // b = to lower(b); //alternately
355                                         last_matching_bits[i] = b;
356                                 }
357                                 len = message.length();
358                                 //std::cout << "\nlast_matching_bits = " << last_matching_bits;
359                                 int matching_len = last_matching_bits.length();;
360
361                                         for (int i = 0; i <= len - matching_len; i++ ){
362                                                 //std::cout << "\n" << message.substr(i, matching_len);
363
364                                                 if (message.substr(i,matching_len)== last_matching_bits){
365                                                         //std::cout << "\nmatched!\n";
366                                                         cells.push_back(message.substr(i,10));//cell id is 36 bit long , last  4 bit unused
367
368                                                 }
369                                         }
370                                         len = cells.size();
371                                         for (int i = 0; i < len; i++) {
372                                                 cell_map[cells[i]] = nodeb;
373                                         }
374
375                 }
376                 return true;
377         }
378
379 };
380
381 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
382   unordered_map<string, string> cell_pred;
383   std::string serving_cell_id;
384   int serving_cell_rsrp;
385   int serving_cell_rsrq;
386   int serving_cell_sinr;
387   bool in_serving_array = false;
388   int rf_meas_index = 0;
389
390   bool in_serving_report_object = false;
391
392   string curr_key = "";
393   string curr_value = "";
394   bool Null() { return true; }
395   bool Bool(bool b) { return true; }
396   bool Int(int i) {
397
398     return true;
399   }
400
401   bool Uint(unsigned i) {
402
403     if (in_serving_report_object) {
404       if (curr_key.compare("rsrp") == 0) {
405         serving_cell_rsrp = i;
406       } else if (curr_key.compare("rsrq") == 0) {
407         serving_cell_rsrq = i;
408       } else if (curr_key.compare("rssinr") == 0) {
409         serving_cell_sinr = i;
410       }
411     }
412
413     return true; }
414   bool Int64(int64_t i) {
415
416     return true; }
417   bool Uint64(uint64_t i) {
418
419     return true; }
420   bool Double(double d) { return true; }
421   bool String(const char* str, SizeType length, bool copy) {
422
423     if (curr_key.compare("ServingCellID") == 0) {
424       serving_cell_id = str;
425     }
426
427     return true;
428   }
429   bool StartObject() {
430     if (curr_key.compare("ServingCellRF") == 0) {
431       in_serving_report_object = true;
432     }
433
434     return true; }
435   bool Key(const char* str, SizeType length, bool copy) {
436
437     curr_key = str;
438     return true;
439   }
440   bool EndObject(SizeType memberCount) {
441     if (curr_key.compare("ServingCellRF") == 0) {
442       in_serving_report_object = false;
443     }
444     return true; }
445   bool StartArray() {
446
447     if (curr_key.compare("ServingCellRF") == 0) {
448       in_serving_array = true;
449     }
450
451     return true;
452   }
453   bool EndArray(SizeType elementCount) {
454
455     if (curr_key.compare("servingCellRF") == 0) {
456       in_serving_array = false;
457       rf_meas_index = 0;
458     }
459
460     return true; }
461 }; */
462
463
464 /* unordered_map<string, UEData> get_sdl_ue_data() {
465
466   fprintf(stderr, "In get_sdl_ue_data()\n");
467
468   unordered_map<string, string> ue_data;
469
470   unordered_map<string, UEData> return_ue_data_map;
471
472   std::string prefix3="";
473   Keys K2 = sdl->findKeys(nsu, prefix3);
474   DataMap Dk2 = sdl->get(nsu, K2);
475
476   string ue_json;
477   string ue_id;
478
479   for(auto si=K2.begin();si!=K2.end();++si){
480     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
481     char val[val_v.size()+1];                               // from Data
482     int i;
483
484     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
485     val[i]='\0';
486       ue_id.assign((std::string)*si);
487
488       ue_json.assign(val);
489       ue_data[ue_id] =  ue_json;
490   }
491
492   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
493     UEDataHandler handler;
494     Reader reader;
495     StringStream ss(map_iter->second.c_str());
496     reader.Parse(ss,handler);
497
498     string ueID = map_iter->first;
499     string serving_cell_id = handler.serving_cell_id;
500     int serv_rsrp = handler.serving_cell_rsrp;
501
502     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
503
504   }
505
506   return return_ue_data_map;
507 } */
508
509 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
510   string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
511
512   cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length=" << len << "\n";
513   cout << "[INFO] Payload is " << arg << endl;
514
515   PolicyHandler handler;
516   Reader reader;
517   StringStream ss(arg.c_str());
518   reader.Parse(ss,handler);
519
520   //Set the threshold value
521   if (handler.found_threshold) {
522     cout << "[INFO] Setting Threshold for A1-P value: " << handler.threshold << "%\n";
523     downlink_threshold = handler.threshold;
524   }
525
526 }
527
528 // sends a handover message through REST
529 void send_rest_control_request( string ue_id, string serving_cell_id, string target_cell_id ) {
530   time_t now;
531   string str_now;
532   static unsigned int seq_number = 0; // static counter, not thread-safe
533
534   // building a handoff control message
535   now = time( nullptr );
536   str_now = ctime( &now );
537   str_now.pop_back(); // removing the \n character
538
539   seq_number++;       // static counter, not thread-safe
540
541   rapidjson::StringBuffer s;
542   rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
543   writer.StartObject();
544   writer.Key( "command" );
545   writer.String( "HandOff" );
546   writer.Key( "seqNo" );
547   writer.Int( seq_number );
548   writer.Key( "ue" );
549   writer.String( ue_id.c_str() );
550   writer.Key( "fromCell" );
551   writer.String( serving_cell_id.c_str() );
552   writer.Key( "toCell" );
553   writer.String( target_cell_id.c_str() );
554   writer.Key( "timestamp" );
555   writer.String( str_now.c_str() );
556   writer.Key( "reason" );
557   writer.String( "HandOff Control Request from TS xApp" );
558   writer.Key( "ttl" );
559   writer.Int( 10 );
560   writer.EndObject();
561   // creates a message like
562   /* {
563     "command": "HandOff",
564     "seqNo": 1,
565     "ue": "ueid-here",
566     "fromCell": "CID1",
567     "toCell": "CID3",
568     "timestamp": "Sat May 22 10:35:33 2021",
569     "reason": "HandOff Control Request from TS xApp",
570     "ttl": 10
571   } */
572
573   string msg = s.GetString();
574
575   cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
576   cout << "[INFO] HandOff request is " << msg << endl;
577
578   try {
579     // sending request
580     restclient::RestClient client( ts_control_ep );
581     restclient::response_t resp = client.do_post( "", msg ); // we already have the full path in ts_control_ep
582
583     if( resp.status_code == 200 ) {
584         // ============== DO SOMETHING USEFUL HERE ===============
585         // Currently, we only print out the HandOff reply
586         rapidjson::Document document;
587         document.Parse( resp.body.c_str() );
588         rapidjson::StringBuffer s;
589         rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
590         document.Accept( writer );
591         cout << "[INFO] HandOff reply is " << s.GetString() << endl;
592
593     } else {
594         cout << "[ERROR] Unexpected HTTP code " << resp.status_code << " from " << \
595                 client.getBaseUrl() << \
596                 "\n[ERROR] HTTP payload is " << resp.body.c_str() << endl;
597     }
598
599   } catch( const restclient::RestClientException &e ) {
600     cout << "[ERROR] " << e.what() << endl;
601
602   }
603
604 }
605
606 // sends a handover message to RC xApp through gRPC
607 void send_grpc_control_request( string ue_id, string target_cell_id ) {
608   grpc::ClientContext context;
609
610   rc::RicControlGrpcRsp response;
611   shared_ptr<rc::RicControlGrpcReq> request = make_shared<rc::RicControlGrpcReq>();
612
613   rc::RICE2APHeader *apHeader = request->mutable_rice2apheaderdata();
614   apHeader->set_ranfuncid(3);
615   apHeader->set_ricrequestorid( 1 );
616
617   rc::RICControlHeader *ctrlHeader = request->mutable_riccontrolheaderdata();
618   ctrlHeader->set_controlstyle( 3 );
619   ctrlHeader->set_controlactionid( 1 );
620   rc::UeId *ueid =  ctrlHeader->mutable_ueid();
621   rc::gNBUEID* gnbue= ueid->mutable_gnbueid();
622   gnbue->set_amfuengapid(stoi(ue_id));
623   gnbue->add_gnbcuuef1apid(stoi(ue_id));
624   gnbue->add_gnbcucpuee1apid(stoi(ue_id));
625   rc::Guami* gumi=gnbue->mutable_guami();
626   //As of now hardcoded according to the value setted in VIAVI RSG TOOL
627   gumi->set_amfregionid("10100000");
628   gumi->set_amfsetid("0000000000");
629   gumi->set_amfpointer("000001");
630   
631   //ctrlHeader->set_ueid( ue_id );
632
633   rc::RICControlMessage *ctrlMsg = request->mutable_riccontrolmessagedata();
634   ctrlMsg->set_riccontrolcelltypeval( rc::RICControlCellTypeEnum::RIC_CONTROL_CELL_UNKWON );
635   //ctrlMsg->set_riccontrolcelltypeval( api::RIC_CONTROL_CELL_UNKWON);
636     
637     ctrlMsg->set_targetcellid( target_cell_id);
638
639   auto data = cell_map.find(target_cell_id);
640   if( data != cell_map.end() ) {
641     request->set_e2nodeid( data->second->global_nb_id.nb_id );
642     request->set_plmnid( data->second->global_nb_id.plmn_id );
643     request->set_ranname( data->second->ran_name );
644     gumi->set_plmnidentity(data->second->global_nb_id.plmn_id);
645   } else {
646     cout << "[INFO] Cannot find RAN name corresponding to cell id = "<<target_cell_id<<endl;
647     return;
648     request->set_e2nodeid( "unknown_e2nodeid" );
649     request->set_plmnid( "unknown_plmnid" );
650     request->set_ranname( "unknown_ranname" );
651     gumi->set_plmnidentity("unknown_plmnid");
652   }
653   request->set_riccontrolackreqval( rc::RICControlAckEnum::RIC_CONTROL_ACK_UNKWON );
654   //request->set_riccontrolackreqval( api::RIC_CONTROL_ACK_UNKWON);  // not yet used in api.proto
655  cout<<"\nin ts xapp grpc message content \n"<< request->DebugString()<<"\n"; 
656   grpc::Status status = rc_stub->SendRICControlReqServiceGrpc( &context, *request, &response );
657
658   if( status.ok() ) {
659     if( response.rspcode() == 0 ) {
660       cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
661     } else {
662       cout << "[ERROR] Control Request failed with code=" << response.rspcode()
663            << ", description=" << response.description() << endl;
664     }
665
666   } else {
667     cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
668          << status.error_code() << ", error_msg=" << status.error_message() << endl;
669   }
670
671 }
672
673 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
674   string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
675
676   cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
677   cout << "[INFO] Payload is " << json << endl;
678
679   PredictionHandler handler;
680   try {
681     Reader reader;
682     StringStream ss(json.c_str());
683     reader.Parse(ss,handler);
684   } catch (...) {
685     cout << "[ERROR] Got an exception on stringstream read parse\n";
686   }
687
688   // We are only considering download throughput
689   unordered_map<string, int> throughput_map = handler.cell_pred_down;
690
691   // Decision about CONTROL message
692   // (1) Identify UE Id in Prediction message
693   // (2) Iterate through Prediction message.
694   //     If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
695   //     We assume the first cell in the prediction message is the serving cell
696
697   int serving_cell_throughput = 0;
698   int highest_throughput = 0;
699   string highest_throughput_cell_id;
700
701   // Getting the current serving cell throughput prediction
702   auto cell = throughput_map.find( handler.serving_cell_id );
703   serving_cell_throughput = cell->second;
704
705    // Iterating to identify the highest throughput prediction
706   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
707
708     string curr_cellid = map_iter->first;
709     int curr_throughput = map_iter->second;
710
711     if ( highest_throughput < curr_throughput ) {
712       highest_throughput = curr_throughput;
713       highest_throughput_cell_id = curr_cellid;
714     }
715
716   }
717
718   float thresh = 0;
719   if( downlink_threshold > 0 ) {  // we also take into account the threshold in A1 policy type 20008
720     thresh = serving_cell_throughput * (downlink_threshold / 100.0);
721   }
722
723   if ( highest_throughput > ( serving_cell_throughput + thresh ) ) {
724
725     // sending a control request message
726     if ( ts_control_api == TsControlApi::REST ) {
727       send_rest_control_request( handler.ue_id, handler.serving_cell_id, highest_throughput_cell_id );
728     } else {
729       send_grpc_control_request( handler.ue_id, highest_throughput_cell_id );
730     }
731
732   } else {
733     cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
734   }
735
736 }
737
738 void send_prediction_request( vector<string> ues_to_predict ) {
739   std::unique_ptr<Message> msg;
740   Msg_component payload;           // special type of unique pointer to the payload
741
742   int sz;
743   int i;
744   size_t plen;
745   Msg_component send_payload;
746
747   msg = xfw->Alloc_msg( 2048 );
748
749   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
750   if( sz < 2048 ) {
751     fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
752     exit( 1 );
753   }
754
755   string ues_list = "[";
756
757   for (int i = 0; i < ues_to_predict.size(); i++) {
758     if (i == ues_to_predict.size() - 1) {
759       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
760     } else {
761       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
762     }
763   }
764
765   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
766
767   send_payload = msg->Get_payload(); // direct access to payload
768   snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
769
770   plen = strlen( (char *)send_payload.get() );
771
772   cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
773
774   // payload updated in place, nothing to copy from, so payload parm is nil
775   if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
776     fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
777   }
778
779 }
780
781 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
782  * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
783  * sends a prediction request to the QP Driver xApp.
784  */
785 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
786   string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
787
788   cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
789   cout << "[INFO] Payload is " << json << "\n";
790
791   AnomalyHandler handler;
792   Reader reader;
793   StringStream ss(json.c_str());
794   reader.Parse(ss,handler);
795
796   // just sending ACK to the AD xApp
797   mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
798
799   send_prediction_request(handler.prediction_ues);
800 }
801
802 vector<string> get_nodeb_list( restclient::RestClient& client ) {
803
804   restclient::response_t response = client.do_get( "/v1/nodeb/states" );
805
806   NodebListHandler handler;
807   if( response.status_code == 200 ) {
808     Reader reader;
809     StringStream ss( response.body.c_str() );
810     reader.Parse( ss, handler );
811
812     cout << "[INFO] nodeb list is " << response.body.c_str() << endl;
813
814   } else {
815     if( response.body.empty() ) {
816       cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() << endl;
817     } else {
818       cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() <<
819               ". HTTP payload is " << response.body.c_str() << endl;
820     }
821   }
822
823   return handler.nodeb_list;
824 }
825
826 bool build_cell_mapping() {
827   string base_url;
828   char *data = getenv( "SERVICE_E2MGR_HTTP_BASE_URL" );
829   if ( data == NULL ) {
830     base_url = "http://service-ricplt-e2mgr-http.ricplt:3800";
831   } else {
832     base_url = string( data );
833   }
834
835   try {
836     restclient::RestClient client( base_url );
837
838     vector<string> nb_list = get_nodeb_list( client );
839
840     for( string nb : nb_list ) {
841       string full_path = string("/v1/nodeb/") + nb;
842       restclient::response_t response = client.do_get( full_path );
843       if( response.status_code != 200 ) {
844         if( response.body.empty() ) {
845           cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
846                   client.getBaseUrl() + full_path << endl;
847         } else {
848           cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
849                 client.getBaseUrl() + full_path << ". HTTP payload is " << response.body.c_str() << endl;
850         }
851         return false;
852       }
853
854       try {
855         NodebHandler handler;
856         Reader reader;
857         StringStream ss( response.body.c_str() );
858         reader.Parse( ss, handler );
859       } catch (...) {
860         cout << "[ERROR] Got an exception on parsing nodeb (stringstream read parse)\n";
861         return false;
862       }
863     }
864
865   } catch( const restclient::RestClientException &e ) {
866     cout << "[ERROR] " << e.what() << endl;
867     return false;
868   }
869
870   return true;
871 }
872
873 extern int main( int argc, char** argv ) {
874   int nthreads = 1;
875   char* port = (char *) "4560";
876   shared_ptr<grpc::Channel> channel;
877
878   Config *config = new Config();
879   string api = config->Get_control_str("ts_control_api");
880   ts_control_ep = config->Get_control_str("ts_control_ep");
881   if ( api.empty() ) {
882     cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
883     exit(1);
884   }
885   if ( api.compare("rest") == 0 ) {
886     ts_control_api = TsControlApi::REST;
887   } else {
888     ts_control_api = TsControlApi::gRPC;
889
890     if( !build_cell_mapping() ) {
891       cout << "[ERROR] unable to map cells to nodeb\n";
892     }
893
894     channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
895     rc_stub = rc::MsgComm::NewStub(channel, grpc::StubOptions());
896   }
897
898   fprintf( stderr, "[INFO] listening on port %s\n", port );
899   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
900
901   xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL );          // msg type 20010
902   xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL );  // msg type 30002
903   xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
904
905   xfw->Run( nthreads );
906
907 }