Fix dummy values in gRPC message sent to RC xApp
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
index f6e2c37..6bb05b7 100644 (file)
@@ -1,7 +1,6 @@
 // vi: ts=4 sw=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2020 Nokia
        Copyright (c) 2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
 
 /*
        Mnemonic:       ts_xapp.cpp
-       Abstract:       Traffic Steering xApp;
-                              1. Receives A1 Policy
-                              2. Queries SDL to decide which UE to attempt Traffic Steering for
+       Abstract:       Traffic Steering xApp
+                  1. Receives A1 Policy
+                              2. Receives anomaly detection
                               3. Requests prediction for UE throughput on current and neighbor cells
                               4. Receives prediction
                               5. Optionally exercises Traffic Steering action over E2
 
-       Date:           22 April 2020
+       Date:     22 April 2020
        Author:         Ron Shacham
-               
+
+  Modified: 21 May 2021 (Alexandre Huff)
+            Update for traffic steering use case in release D.
+            07 Dec 2021 (Alexandre Huff)
+            Update for traffic steering use case in release E.
 */
 
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
 
+#include <thread>
 #include <iostream>
 #include <memory>
 
-#include <sdl/syncstorage.hpp>
 #include <set>
 #include <map>
 #include <vector>
 #include <string>
+#include <unordered_map>
+
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/schema.h>
+#include <rapidjson/reader.h>
+#include <rapidjson/prettywriter.h>
+
+#include <rmr/RIC_message_types.h>
+#include <ricxfcpp/xapp.hpp>
+#include <ricxfcpp/config.hpp>
+
+/*
+  FIXME unfortunately this RMR flag has to be disabled
+  due to name resolution conflicts.
+  RC xApp defines the same name for gRPC control messages.
+*/
+#undef RIC_CONTROL_ACK
 
-#include "ricxfcpp/xapp.hpp"
+#include <grpc/grpc.h>
+#include <grpcpp/channel.h>
+#include <grpcpp/client_context.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/security/credentials.h>
+#include "protobuf/api.grpc.pb.h"
+
+#include "utils/restclient.hpp"
+
+
+using namespace rapidjson;
+using namespace std;
+using namespace xapp;
 
 using Namespace = std::string;
 using Key = std::string;
@@ -55,292 +89,707 @@ using Keys = std::set<Key>;
 
 
 // ----------------------------------------------------------
-
 std::unique_ptr<Xapp> xfw;
+std::unique_ptr<api::MsgComm::Stub> rc_stub;
+
+int rsrp_threshold = 0;
+
+// scoped enum to identify which API is used to send control messages
+enum class TsControlApi { REST, gRPC };
+TsControlApi ts_control_api;  // api to send control messages
+string ts_control_ep;         // api target endpoint
+
+typedef struct nodeb {
+  string ran_name;
+  struct {
+    string plmn_id;
+    string nb_id;
+  } global_nb_id;
+} nodeb_t;
+
+unordered_map<string, shared_ptr<nodeb_t>> cell_map; // maps each cell to its nodeb
+
+/* struct UEData {
+  string serving_cell;
+  int serving_cell_rsrp;
+}; */
+
+struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
+  unordered_map<string, string> cell_pred;
+  std::string ue_id;
+  bool ue_id_found = false;
+  string curr_key = "";
+  string curr_value = "";
+  int policy_type_id;
+  int policy_instance_id;
+  int threshold;
+  std::string operation;
+  bool found_threshold = false;
+
+  bool Null() { return true; }
+  bool Bool(bool b) { return true; }
+  bool Int(int i) {
+
+    if (curr_key.compare("policy_type_id") == 0) {
+      policy_type_id = i;
+    } else if (curr_key.compare("policy_instance_id") == 0) {
+      policy_instance_id = i;
+    } else if (curr_key.compare("threshold") == 0) {
+      found_threshold = true;
+      threshold = i;
+    }
 
+    return true;
+  }
+  bool Uint(unsigned u) {
+
+    if (curr_key.compare("policy_type_id") == 0) {
+      policy_type_id = u;
+    } else if (curr_key.compare("policy_instance_id") == 0) {
+      policy_instance_id = u;
+    } else if (curr_key.compare("threshold") == 0) {
+      found_threshold = true;
+      threshold = u;
+    }
 
-void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
+    return true;
+  }
+  bool Int64(int64_t i) {  return true; }
+  bool Uint64(uint64_t u) {  return true; }
+  bool Double(double d) {  return true; }
+  bool String(const char* str, SizeType length, bool copy) {
 
-  long now;
-  long total_count;
+    if (curr_key.compare("operation") != 0) {
+      operation = str;
+    }
 
-  int sz;
-  int i;
+    return true;
+  }
+  bool StartObject() {
 
-  int response_to = 0;  // max timeout wating for a response
+    return true;
+  }
+  bool Key(const char* str, SizeType length, bool copy) {
 
-  int send_mtype = 0;
-  int rmtype;                                                  // received message type
-  int delay = 1000000;                         // mu-sec delay; default 1s
+    curr_key = str;
 
-  std::unique_ptr<Message> msg;
-  Msg_component send_payload;                          // special type of unique pointer to the payload
-  
-  fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
-  fprintf(stderr, "payload is %s\n", payload.get());
-  
-  //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+    return true;
+  }
+  bool EndObject(SizeType memberCount) {  return true; }
+  bool StartArray() {  return true; }
+  bool EndArray(SizeType elementCount) {  return true; }
+
+};
+
+struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
+  unordered_map<string, int> cell_pred_down;
+  unordered_map<string, int> cell_pred_up;
+  std::string ue_id;
+  bool ue_id_found = false;
+  string curr_key = "";
+  string curr_value = "";
+  string serving_cell_id;
+  bool down_val = true;
+  bool Null() {  return true; }
+  bool Bool(bool b) {  return true; }
+  bool Int(int i) {  return true; }
+  bool Uint(unsigned u) {
+    // Currently, we assume the first cell in the prediction message is the serving cell
+    if ( serving_cell_id.empty() ) {
+      serving_cell_id = curr_key;
+    }
 
-  mtype = 0;
+    if (down_val) {
+      cell_pred_down[curr_key] = u;
+      down_val = false;
+    } else {
+      cell_pred_up[curr_key] = u;
+      down_val = true;
+    }
 
-  fprintf(stderr, "cb 1\n");
+    return true;
 
-  msg = xfw->Alloc_msg( 2048 );
-  
-  sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
-  if( sz < 2048 ) {
-    fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
-    exit( 1 );
   }
+  bool Int64(int64_t i) {  return true; }
+  bool Uint64(uint64_t u) {  return true; }
+  bool Double(double d) {  return true; }
+  bool String(const char* str, SizeType length, bool copy) {
 
-  fprintf(stderr, "cb 2");
-  
-  send_payload = msg->Get_payload(); // direct access to payload
-  snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\" : [\"222\", \"333\", \"444\"]}" );        
+    return true;
+  }
+  bool StartObject() {  return true; }
+  bool Key(const char* str, SizeType length, bool copy) {
+    if (!ue_id_found) {
+
+      ue_id = str;
+      ue_id_found = true;
+    } else {
+      curr_key = str;
+    }
+    return true;
+  }
+  bool EndObject(SizeType memberCount) {  return true; }
+  bool StartArray() {  return true; }
+  bool EndArray(SizeType elementCount) {  return true; }
+};
 
-  fprintf(stderr, "cb 3");    
-  
-  // payload updated in place, nothing to copy from, so payload parm is nil
-  if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() )+1, NULL )) {
-    fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
+struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
+  /*
+    Assuming we receive the following payload from AD
+    [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
+  */
+  vector<string> prediction_ues;
+  string curr_key = "";
+
+  bool Key(const Ch* str, SizeType len, bool copy) {
+    curr_key = str;
+    return true;
   }
 
-  fprintf(stderr, "cb 4");    
+  bool String(const Ch* str, SizeType len, bool copy) {
+    // We are only interested in the "ue-id"
+    if ( curr_key.compare( "ue-id") == 0 ) {
+      prediction_ues.push_back( str );
+    }
+    return true;
+  }
+};
 
-  /*
-  msg = xfw->Receive( response_to );
-  if( msg != NULL ) {
-    rmtype = msg->Get_mtype();
-    send_payload = msg->Get_payload();
-    fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
-  } 
-  */  
-  
-}
+struct NodebListHandler : public BaseReaderHandler<UTF8<>, NodebListHandler> {
+  vector<string> nodeb_list;
+  string curr_key = "";
 
+  bool Key(const Ch* str, SizeType length, bool copy) {
+    curr_key = str;
+    return true;
+  }
 
-void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
+  bool String(const Ch* str, SizeType length, bool copy) {
+    if( curr_key.compare( "inventoryName" ) == 0 ) {
+      nodeb_list.push_back( str );
+    }
+    return true;
+  }
+};
 
-  long now;
-  long total_count;
+struct NodebHandler : public BaseReaderHandler<UTF8<>, NodebHandler> {
+  string curr_key = "";
+  shared_ptr<nodeb_t> nodeb = make_shared<nodeb_t>();
 
-  int sz;
-  int i;
+  bool Key(const Ch* str, SizeType length, bool copy) {
+    curr_key = str;
+    return true;
+  }
 
-  int response_to = 0;  // max timeout wating for a response
+  bool String(const Ch* str, SizeType length, bool copy) {
+    if( curr_key.compare( "ranName" ) == 0 ) {
+      nodeb->ran_name = str;
+    } else if( curr_key.compare( "plmnId" ) == 0 ) {
+      nodeb->global_nb_id.plmn_id = str;
+    } else if( curr_key.compare( "nbId" ) == 0 ) {
+      nodeb->global_nb_id.nb_id = str;
+    } else if( curr_key.compare( "cellId" ) == 0 ) {
+      cell_map[str] = nodeb;
+    }
+    return true;
+  }
 
-  int send_mtype = 0;
-  int rmtype;                                                  // received message type
-  int delay = 1000000;                         // mu-sec delay; default 1s
+};
 
-  std::unique_ptr<Message> msg;
-  Msg_component send_payload;                          // special type of unique pointer to the payload
-  
-  fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
-  fprintf(stderr, "payload is %s\n", payload.get());
-  
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-
-  mtype = 0;
-
-  fprintf(stderr, "cb 1\n");
-  
-}
 
+/* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
+  unordered_map<string, string> cell_pred;
+  std::string serving_cell_id;
+  int serving_cell_rsrp;
+  int serving_cell_rsrq;
+  int serving_cell_sinr;
+  bool in_serving_array = false;
+  int rf_meas_index = 0;
 
+  bool in_serving_report_object = false;
 
-extern int main( int argc, char** argv ) {
+  string curr_key = "";
+  string curr_value = "";
+  bool Null() { return true; }
+  bool Bool(bool b) { return true; }
+  bool Int(int i) {
 
-  std::unique_ptr<Message> msg;
-  Msg_component payload;                               // special type of unique pointer to the payload
+    return true;
+  }
 
-  int nthreads = 1;
+  bool Uint(unsigned i) {
 
-  int response_to = 0;  // max timeout wating for a response  
+    if (in_serving_report_object) {
+      if (curr_key.compare("rsrp") == 0) {
+       serving_cell_rsrp = i;
+      } else if (curr_key.compare("rsrq") == 0) {
+       serving_cell_rsrq = i;
+      } else if (curr_key.compare("rssinr") == 0) {
+       serving_cell_sinr = i;
+      }
+    }
 
-  int delay = 1000000;                         // mu-sec delay; default 1s  
+    return true; }
+  bool Int64(int64_t i) {
 
-  char*        port = (char *) "4560";
+    return true; }
+  bool Uint64(uint64_t i) {
+
+    return true; }
+  bool Double(double d) { return true; }
+  bool String(const char* str, SizeType length, bool copy) {
+
+    if (curr_key.compare("ServingCellID") == 0) {
+      serving_cell_id = str;
+    }
+
+    return true;
+  }
+  bool StartObject() {
+    if (curr_key.compare("ServingCellRF") == 0) {
+      in_serving_report_object = true;
+    }
+
+    return true; }
+  bool Key(const char* str, SizeType length, bool copy) {
 
-  int ai;  
-  
-  ai = 1;
-  while( ai < argc ) {                         // very simple flag processing (no bounds/error checking)
-    if( argv[ai][0] != '-' )  {
-      break;
+    curr_key = str;
+    return true;
+  }
+  bool EndObject(SizeType memberCount) {
+    if (curr_key.compare("ServingCellRF") == 0) {
+      in_serving_report_object = false;
     }
-    
-    switch( argv[ai][1] ) {                    // we only support -x so -xy must be -x -y
-    case 'd':                                  // delay between messages (mu-sec)
-      delay = atoi( argv[ai+1] );
-      ai++;
-      break;
-      
-    case 'p': 
-      port = argv[ai+1];       
-      ai++;
-      break;
-      
-    case 't':                                  // timeout in seconds; we need to convert to ms for rmr calls
-      response_to = atoi( argv[ai+1] ) * 1000;
-      ai++;
-      break;
+    return true; }
+  bool StartArray() {
+
+    if (curr_key.compare("ServingCellRF") == 0) {
+      in_serving_array = true;
     }
-    ai++;
+
+    return true;
   }
-  
-  fprintf( stderr, "<XAPP> response timeout set to: %d\n", response_to );
-  fprintf( stderr, "<XAPP> listening on port: %s\n", port );
-  
-  xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
+  bool EndArray(SizeType elementCount) {
 
-  fprintf(stderr, "code1\n");
-  
-  xfw->Add_msg_cb( 20010, policy_callback, NULL );
-  xfw->Add_msg_cb( 30002, prediction_callback, NULL );
+    if (curr_key.compare("servingCellRF") == 0) {
+      in_serving_array = false;
+      rf_meas_index = 0;
+    }
 
-  fprintf(stderr, "code2\n");
+    return true; }
+}; */
 
-  std::string sdl_namespace_u = "TS-UE-metrics";
-  std::string sdl_namespace_c = "TS-cell-metrics";
 
-  fprintf(stderr, "code5\n");
-  
-  std::unique_ptr<shareddatalayer::SyncStorage> sdl(shareddatalayer::SyncStorage::create());
+/* unordered_map<string, UEData> get_sdl_ue_data() {
 
-  Namespace nsu(sdl_namespace_u);
-  Namespace nsc(sdl_namespace_c);
+  fprintf(stderr, "In get_sdl_ue_data()\n");
 
-    /*
+  unordered_map<string, string> ue_data;
 
-  fprintf(stderr, "before sdl set\n");
-  
-  try{
-    //connecting to the Redis and generating a random key for namespace "hwxapp"
-    fprintf(stderr, "IN SDL Set Data");
-    //    std::string data_string = "{\"rsrp\" : -110}";
+  unordered_map<string, UEData> return_ue_data_map;
 
+  std::string prefix3="";
+  Keys K2 = sdl->findKeys(nsu, prefix3);
+  DataMap Dk2 = sdl->get(nsu, K2);
 
-    std::string data_string = "{\"CellID\": \"310-680-200-555001\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 2000000, \"PDCPBytesUL\": 1200000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 50  }";
-      
-    DataMap dmap;
-    //    char key[4]="abc";
-    char key[] = "310-680-200-555001";
-    std::cout << "KEY: "<< key << std::endl;
-    Key k = key;
-    Data d;
-    //    uint8_t num = 101;
-    d.assign(data_string.begin(), data_string.end());
-    //    d.push_back(num);
-    dmap.insert({k,d});
+  string ue_json;
+  string ue_id;
 
-    sdl->set(nsc, dmap);
+  for(auto si=K2.begin();si!=K2.end();++si){
+    std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
+    char val[val_v.size()+1];                               // from Data
+    int i;
 
-    data_string = "{ \"CellID\": \"310-680-200-555002\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45  }";
+    for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
+    val[i]='\0';
+      ue_id.assign((std::string)*si);
 
-    Data d2;
-    DataMap dmap2;    
-    char key2[] = "310-680-200-555002";
-    std::cout << "KEY: "<< key2 << std::endl;
-    Key k2 = key2;
-    d2.assign(data_string.begin(), data_string.end());
-    //    d.push_back(num);
-    dmap2.insert({k2,d});
+      ue_json.assign(val);
+      ue_data[ue_id] =  ue_json;
+  }
 
-    sdl->set(nsc, dmap2);
+  for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
+    UEDataHandler handler;
+    Reader reader;
+    StringStream ss(map_iter->second.c_str());
+    reader.Parse(ss,handler);
 
+    string ueID = map_iter->first;
+    string serving_cell_id = handler.serving_cell_id;
+    int serv_rsrp = handler.serving_cell_rsrp;
 
+    return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
 
-    std::string data_string = "{ \"CellID\": \"310-680-200-555003\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45  }";
+  }
 
-    Data d3;
-    DataMap dmap3;
-    char key3[] = "310-680-200-555003";
-    std::cout << "KEY: "<< key3 << std::endl;
-    Key k3 = key3;
-    d3.assign(data_string.begin(), data_string.end());
-    //    d.push_back(num);
-    dmap3.insert({k3,d3});
+  return return_ue_data_map;
+} */
 
-    sdl->set(nsc, dmap3);
+void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
 
+  int response_to = 0;  // max timeout wating for a response
+  int rmtype;          // received message type
 
+  string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
 
-    data_string = "{ \"UEID\": 12345, \"ServingCellID\": \"310-680-200-555002\", \"MeasTimestampUEPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPDCPBytes\": 20,\"UEPDCPBytesDL\": 250000,\"UEPDCPBytesUL\": 100000, \"MeasTimestampUEPRBUsage\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPRBUsage\": 20, \"UEPRBUsageDL\": 10, \"UEPRBUsageUL\": 30, \"MeasTimestampRF\": \"2020-03-18 02:23:18.210\",\"MeasPeriodRF\": 40, \"ServingCellRF\": [-115,-16,-5], \"NeighborCellRF\": [  {\"CID\": \"310-680-200-555001\",\"Cell-RF\": [-90,-13,-2.5 ] }, {\"CID\": \"310-680-200-555003\",  \"Cell-RF\": [-140,-17,-6 ] } ] }";
-      
-    Data d4;
-    DataMap dmap4;
-    char key4[] = "12345";
-    std::cout << "KEY: "<< key << std::endl;
-    d4.assign(data_string.begin(), data_string.end());
-    Key k4 = key4;
-    //    d.push_back(num);
-    dmap4.insert({k4,d4});
+  cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length="<< len << "\n";
+  cout << "[INFO] Payload is " << arg << endl;
 
-    sdl->set(nsu, dmap4);
+  PolicyHandler handler;
+  Reader reader;
+  StringStream ss(arg.c_str());
+  reader.Parse(ss,handler);
 
-    
+  //Set the threshold value
+  if (handler.found_threshold) {
+    cout << "[INFO] Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
+    rsrp_threshold = handler.threshold;
   }
-  catch(...){
-    fprintf(stderr,"SDL Error in Set Data for Namespace");
-    return false;
+
+}
+
+// sends a handover message through REST
+void send_rest_control_request( string ue_id, string serving_cell_id, string target_cell_id ) {
+  time_t now;
+  string str_now;
+  static unsigned int seq_number = 0; // static counter, not thread-safe
+
+  // building a handoff control message
+  now = time( nullptr );
+  str_now = ctime( &now );
+  str_now.pop_back(); // removing the \n character
+
+  seq_number++;       // static counter, not thread-safe
+
+  rapidjson::StringBuffer s;
+  rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+  writer.StartObject();
+  writer.Key( "command" );
+  writer.String( "HandOff" );
+  writer.Key( "seqNo" );
+  writer.Int( seq_number );
+  writer.Key( "ue" );
+  writer.String( ue_id.c_str() );
+  writer.Key( "fromCell" );
+  writer.String( serving_cell_id.c_str() );
+  writer.Key( "toCell" );
+  writer.String( target_cell_id.c_str() );
+  writer.Key( "timestamp" );
+  writer.String( str_now.c_str() );
+  writer.Key( "reason" );
+  writer.String( "HandOff Control Request from TS xApp" );
+  writer.Key( "ttl" );
+  writer.Int( 10 );
+  writer.EndObject();
+  // creates a message like
+  /* {
+    "command": "HandOff",
+    "seqNo": 1,
+    "ue": "ueid-here",
+    "fromCell": "CID1",
+    "toCell": "CID3",
+    "timestamp": "Sat May 22 10:35:33 2021",
+    "reason": "HandOff Control Request from TS xApp",
+    "ttl": 10
+  } */
+
+  string msg = s.GetString();
+
+  cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
+  cout << "[INFO] HandOff request is " << msg << endl;
+
+  // sending request
+  restclient::RestClient client( ts_control_ep );
+  restclient::response_t resp = client.do_post( "", msg ); // we already have the full path in ts_control_ep
+
+  if( resp.status_code == 200 ) {
+      // ============== DO SOMETHING USEFUL HERE ===============
+      // Currently, we only print out the HandOff reply
+      rapidjson::Document document;
+      document.Parse( resp.body.c_str() );
+      rapidjson::StringBuffer s;
+           rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+      document.Accept( writer );
+      cout << "[INFO] HandOff reply is " << s.GetString() << endl;
+
+  } else {
+      cout << "[ERROR] Unexpected HTTP code " << resp.status_code << " from " << \
+              client.getBaseUrl() << \
+              "\n[ERROR] HTTP payload is " << resp.body.c_str() << endl;
   }
-  
-  fprintf(stderr, "after sdl set\n");
 
-    */
+}
 
-  fprintf(stderr, "before sdl get\n");
+// sends a handover message to RC xApp through gRPC
+void send_grpc_control_request( string ue_id, string target_cell_id ) {
+  grpc::ClientContext context;
+
+  api::RicControlGrpcRsp response;
+  shared_ptr<api::RicControlGrpcReq> request = make_shared<api::RicControlGrpcReq>();
+
+  api::RICE2APHeader *apHeader = request->mutable_rice2apheaderdata();
+  apHeader->set_ranfuncid( 300 );
+  apHeader->set_ricrequestorid( 1001 );
+
+  api::RICControlHeader *ctrlHeader = request->mutable_riccontrolheaderdata();
+  ctrlHeader->set_controlstyle( 3 );
+  ctrlHeader->set_controlactionid( 1 );
+  ctrlHeader->set_ueid( ue_id );
+
+  api::RICControlMessage *ctrlMsg = request->mutable_riccontrolmessagedata();
+  ctrlMsg->set_riccontrolcelltypeval( api::RIC_CONTROL_CELL_UNKWON );
+  ctrlMsg->set_targetcellid( target_cell_id );
+
+  auto data = cell_map.find( target_cell_id );
+  if( data != cell_map.end() ) {
+    request->set_e2nodeid( data->second->global_nb_id.nb_id );
+    request->set_plmnid( data->second->global_nb_id.plmn_id );
+    request->set_ranname( data->second->ran_name );
+  } else {
+    request->set_e2nodeid( "unknown_e2nodeid" );
+    request->set_plmnid( "unknown_plmnid" );
+    request->set_ranname( "unknown_ranname" );
+  }
+  request->set_riccontrolackreqval( api::RIC_CONTROL_ACK_UNKWON );  // not yet used in api.proto
 
+  grpc::Status status = rc_stub->SendRICControlReqServiceGrpc( &context, *request, &response );
 
-  std::string prefix2="310";
-  Keys K = sdl->findKeys(nsc, prefix2);     // just the prefix
-  DataMap Dk = sdl->get(nsc, K);
+  if( status.ok() ) {
+    if( response.rspcode() == 0 ) {
+      cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
+    } else {
+      cout << "[ERROR] Control Request failed with code=" << response.rspcode()
+           << ", description=" << response.description() << endl;
+    }
 
-  std::cout << "K contains " << K.size() << " elements.\n";  
+  } else {
+    cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
+         << status.error_code() << ", error_msg=" << status.error_message() << endl;
+  }
 
-  fprintf(stderr, "before forloop\n");
+}
 
-  for(auto si=K.begin();si!=K.end();++si){
-    std::vector<uint8_t> val_v = Dk[(*si)]; // 4 lines to unpack a string
-    char val[val_v.size()+1];                               // from Data
-    int i;
-    for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
-    val[i]='\0';
-    fprintf(stderr, "KEYS and Values %s = %s\n",(*si).c_str(), val);
+void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
+  string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
+
+  cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
+  cout << "[INFO] Payload is " << json << endl;
+
+  PredictionHandler handler;
+  try {
+    Reader reader;
+    StringStream ss(json.c_str());
+    reader.Parse(ss,handler);
+  } catch (...) {
+    cout << "[ERROR] Got an exception on stringstream read parse\n";
   }
 
+  // We are only considering download throughput
+  unordered_map<string, int> throughput_map = handler.cell_pred_down;
 
-  std::string prefix3="12";
-  Keys K2 = sdl->findKeys(nsu, prefix3);     // just the prefix
-  DataMap Dk2 = sdl->get(nsu, K2);
+  // Decision about CONTROL message
+  // (1) Identify UE Id in Prediction message
+  // (2) Iterate through Prediction message.
+  //     If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
+  //     We assume the first cell in the prediction message is the serving cell
 
-  std::cout << "K contains " << K2.size() << " elements.\n";  
+  int serving_cell_throughput = 0;
+  int highest_throughput = 0;
+  string highest_throughput_cell_id;
 
-  fprintf(stderr, "before forloop\n");
+  // Getting the current serving cell throughput prediction
+  auto cell = throughput_map.find( handler.serving_cell_id );
+  serving_cell_throughput = cell->second;
 
-  for(auto si=K2.begin();si!=K2.end();++si){
-    std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
-    char val[val_v.size()+1];                               // from Data
-    int i;
-    for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
-    val[i]='\0';
-    fprintf(stderr, "KEYS and Values %s = %s\n",(*si).c_str(), val);
-  }  
-  
+   // Iterating to identify the highest throughput prediction
+  for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
 
-  fprintf(stderr, "after sdl get\n");
+    string curr_cellid = map_iter->first;
+    int curr_throughput = map_iter->second;
 
-  xfw->Run( nthreads );
+    if ( highest_throughput < curr_throughput ) {
+      highest_throughput = curr_throughput;
+      highest_throughput_cell_id = curr_cellid;
+    }
+
+  }
+
+  if ( highest_throughput > serving_cell_throughput ) {
+
+    // sending a control request message
+    if ( ts_control_api == TsControlApi::REST ) {
+      send_rest_control_request( handler.ue_id, handler.serving_cell_id, highest_throughput_cell_id );
+    } else {
+      send_grpc_control_request( handler.ue_id, highest_throughput_cell_id );
+    }
+
+  } else {
+    cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
+  }
+
+}
+
+void send_prediction_request( vector<string> ues_to_predict ) {
+  std::unique_ptr<Message> msg;
+  Msg_component payload;           // special type of unique pointer to the payload
+
+  int sz;
+  int i;
+  size_t plen;
+  Msg_component send_payload;
 
-  fprintf(stderr, "code3\n");  
-  
   msg = xfw->Alloc_msg( 2048 );
 
-  fprintf(stderr, "code4\n");    
+  sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
+  if( sz < 2048 ) {
+    fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
+    exit( 1 );
+  }
+
+  string ues_list = "[";
+
+  for (int i = 0; i < ues_to_predict.size(); i++) {
+    if (i == ues_to_predict.size() - 1) {
+      ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
+    } else {
+      ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
+    }
+  }
+
+  string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
+
+  send_payload = msg->Get_payload(); // direct access to payload
+  snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
+
+  plen = strlen( (char *)send_payload.get() );
+
+  cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
+
+  // payload updated in place, nothing to copy from, so payload parm is nil
+  if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
+    fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
+  }
+
+}
+
+/* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
+ * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
+ * sends a prediction request to the QP Driver xApp.
+ */
+void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
+  string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
+
+  cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
+  cout << "[INFO] Payload is " << json << "\n";
+
+  AnomalyHandler handler;
+  Reader reader;
+  StringStream ss(json.c_str());
+  reader.Parse(ss,handler);
+
+  // just sending ACK to the AD xApp
+  mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
+
+  send_prediction_request(handler.prediction_ues);
+}
+
+vector<string> get_nodeb_list( restclient::RestClient& client ) {
+
+  restclient::response_t response = client.do_get( "/v1/nodeb/states" );
+
+  NodebListHandler handler;
+  if( response.status_code == 200 ) {
+    Reader reader;
+    StringStream ss( response.body.c_str() );
+    reader.Parse( ss, handler );
+
+    cout << "[INFO] nodeb list is " << response.body.c_str() << endl;
+
+  } else {
+    if( response.body.empty() ) {
+      cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() << endl;
+    } else {
+      cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() <<
+              ". HTTP payload is " << response.body.c_str() << endl;
+    }
+  }
+
+  return handler.nodeb_list;
+}
+
+bool build_cell_mapping() {
+  string base_url;
+  char *data = getenv( "SERVICE_E2MGR_HTTP_BASE_URL" );
+  if ( data == NULL ) {
+    base_url = "http://service-ricplt-e2mgr-http.ricplt:3800";
+  } else {
+    base_url = string( data );
+  }
+
+  restclient::RestClient client( base_url );
+
+  vector<string> nb_list = get_nodeb_list( client );
+
+  for( string nb : nb_list ) {
+    string full_path = string("/v1/nodeb/") + nb;
+    restclient::response_t response = client.do_get( full_path );
+    if( response.status_code != 200 ) {
+      if( response.body.empty() ) {
+        cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
+                client.getBaseUrl() + full_path << endl;
+      } else {
+        cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
+              client.getBaseUrl() + full_path << ". HTTP payload is " << response.body.c_str() << endl;
+      }
+      return false;
+    }
+
+    try {
+      NodebHandler handler;
+      Reader reader;
+      StringStream ss( response.body.c_str() );
+      reader.Parse( ss, handler );
+    } catch (...) {
+      cout << "[ERROR] Got an exception on parsing nodeb (stringstream read parse)\n";
+      return false;
+    }
+  }
+
+  return true;
+}
+
+extern int main( int argc, char** argv ) {
+  int nthreads = 1;
+  char*        port = (char *) "4560";
+  shared_ptr<grpc::Channel> channel;
+
+  Config *config = new Config();
+  string api = config->Get_control_str("ts_control_api");
+  ts_control_ep = config->Get_control_str("ts_control_ep");
+  if ( api.empty() ) {
+    cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
+    exit(1);
+  }
+  if ( api.compare("rest") == 0 ) {
+    ts_control_api = TsControlApi::REST;
+  } else {
+    ts_control_api = TsControlApi::gRPC;
+
+    if( !build_cell_mapping() ) {
+      cout << "[ERROR] unable to map cells to nodeb\n";
+    }
+
+    channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
+    rc_stub = api::MsgComm::NewStub(channel, grpc::StubOptions());
+  }
+
+  fprintf( stderr, "[TS xApp] listening on port %s\n", port );
+  xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
+
+  xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL );          // msg type 20010
+  xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL );  // msg type 30002
+  xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
+
+  xfw->Run( nthreads );
 
-  
 }