Update TS xApp for Release D use case
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
index b32e71e..bf146ec 100644 (file)
 
 /*
        Mnemonic:       ts_xapp.cpp
-       Abstract:       Traffic Steering xApp;
-                              1. Receives A1 Policy
-                              2. Queries SDL to decide which UE to attempt Traffic Steering for
+       Abstract:       Traffic Steering xApp
+                  1. Receives A1 Policy
+                              2. Receives anomaly detection
                               3. Requests prediction for UE throughput on current and neighbor cells
                               4. Receives prediction
                               5. Optionally exercises Traffic Steering action over E2
 
-       Date:           22 April 2020
+       Date:     22 April 2020
        Author:         Ron Shacham
 
+  Modified: 21 May 2021 (Alexandre Huff)
+            Update for traffic steering use case in release D.
 */
 
 #include <stdio.h>
@@ -39,7 +41,6 @@
 #include <iostream>
 #include <memory>
 
-#include <sdl/syncstorage.hpp>
 #include <set>
 #include <map>
 #include <vector>
 #include <rapidjson/stringbuffer.h>
 #include <rapidjson/schema.h>
 #include <rapidjson/reader.h>
+#include <rapidjson/prettywriter.h>
 
-
+#include <curl/curl.h>
+#include <rmr/RIC_message_types.h>
 #include "ricxfcpp/xapp.hpp"
 
+
+// Defines env name for the endpoint to POST handoff control messages
+#define ENV_CONTROL_URL "TS_CONTROL_URL"
+
+
 using namespace rapidjson;
 using namespace std;
+using namespace xapp;
+
 using Namespace = std::string;
 using Key = std::string;
 using Data = std::vector<uint8_t>;
@@ -66,23 +76,17 @@ using Keys = std::set<Key>;
 
 // ----------------------------------------------------------
 
-std::unique_ptr<Xapp> xfw;
+// Stores the the URL to POST handoff control messages
+const char *ts_control_url;
 
-std::string sdl_namespace_u = "TS-UE-metrics";
-std::string sdl_namespace_c = "TS-cell-metrics";
+std::unique_ptr<Xapp> xfw;
 
 int rsrp_threshold = 0;
 
-std::unique_ptr<shareddatalayer::SyncStorage> sdl;
-
-Namespace nsu;
-Namespace nsc;
-
-struct UEData {
+/* struct UEData {
   string serving_cell;
   int serving_cell_rsrp;
-
-};
+}; */
 
 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
   unordered_map<string, string> cell_pred;
@@ -96,7 +100,6 @@ struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
   std::string operation;
   bool found_threshold = false;
 
-  
   bool Null() { return true; }
   bool Bool(bool b) { return true; }
   bool Int(int i) {
@@ -124,12 +127,12 @@ struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
     }
 
     return true;
-  }    
+  }
   bool Int64(int64_t i) {  return true; }
   bool Uint64(uint64_t u) {  return true; }
   bool Double(double d) {  return true; }
   bool String(const char* str, SizeType length, bool copy) {
-    
+
     if (curr_key.compare("operation") != 0) {
       operation = str;
     }
@@ -159,11 +162,16 @@ struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
   bool ue_id_found = false;
   string curr_key = "";
   string curr_value = "";
+  string serving_cell_id;
   bool down_val = true;
   bool Null() {  return true; }
   bool Bool(bool b) {  return true; }
   bool Int(int i) {  return true; }
-  bool Uint(unsigned u) {    
+  bool Uint(unsigned u) {
+    // Currently, we assume the first cell in the prediction message is the serving cell
+    if ( serving_cell_id.empty() ) {
+      serving_cell_id = curr_key;
+    }
 
     if (down_val) {
       cell_pred_down[curr_key] = u;
@@ -173,7 +181,7 @@ struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
       down_val = true;
     }
 
-    return true;    
+    return true;
 
   }
   bool Int64(int64_t i) {  return true; }
@@ -199,8 +207,30 @@ struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
   bool EndArray(SizeType elementCount) {  return true; }
 };
 
+struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
+  /*
+    Assuming we receive the following payload from AD
+    [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
+  */
+  vector<string> prediction_ues;
+  string curr_key = "";
+
+  bool Key(const Ch* str, SizeType len, bool copy) {
+    curr_key = str;
+    return true;
+  }
+
+  bool String(const Ch* str, SizeType len, bool copy) {
+    // We are only interested in the "ue-id"
+    if ( curr_key.compare( "ue-id") == 0 ) {
+      prediction_ues.push_back( str );
+    }
+    return true;
+  }
+};
+
 
-struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
+/* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
   unordered_map<string, string> cell_pred;
   std::string serving_cell_id;
   int serving_cell_rsrp;
@@ -209,7 +239,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
   bool in_serving_array = false;
   int rf_meas_index = 0;
 
-  bool in_serving_report_object = false;  
+  bool in_serving_report_object = false;
 
   string curr_key = "";
   string curr_value = "";
@@ -219,7 +249,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
 
     return true;
   }
-  
+
   bool Uint(unsigned i) {
 
     if (in_serving_report_object) {
@@ -230,8 +260,8 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
       } else if (curr_key.compare("rssinr") == 0) {
        serving_cell_sinr = i;
       }
-    }          
-    
+    }
+
     return true; }
   bool Int64(int64_t i) {
 
@@ -241,10 +271,10 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
     return true; }
   bool Double(double d) { return true; }
   bool String(const char* str, SizeType length, bool copy) {
-    
+
     if (curr_key.compare("ServingCellID") == 0) {
       serving_cell_id = str;
-    } 
+    }
 
     return true;
   }
@@ -255,7 +285,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
 
     return true; }
   bool Key(const char* str, SizeType length, bool copy) {
-    
+
     curr_key = str;
     return true;
   }
@@ -269,7 +299,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
     if (curr_key.compare("ServingCellRF") == 0) {
       in_serving_array = true;
     }
-    
+
     return true;
   }
   bool EndArray(SizeType elementCount) {
@@ -280,24 +310,24 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
     }
 
     return true; }
-};
+}; */
 
 
-unordered_map<string, UEData> get_sdl_ue_data() {
+/* unordered_map<string, UEData> get_sdl_ue_data() {
 
   fprintf(stderr, "In get_sdl_ue_data()\n");
 
   unordered_map<string, string> ue_data;
 
   unordered_map<string, UEData> return_ue_data_map;
-    
+
   std::string prefix3="";
   Keys K2 = sdl->findKeys(nsu, prefix3);
   DataMap Dk2 = sdl->get(nsu, K2);
-  
+
   string ue_json;
   string ue_id;
-  
+
   for(auto si=K2.begin();si!=K2.end();++si){
     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
     char val[val_v.size()+1];                               // from Data
@@ -306,11 +336,11 @@ unordered_map<string, UEData> get_sdl_ue_data() {
     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
     val[i]='\0';
       ue_id.assign((std::string)*si);
-      
+
       ue_json.assign(val);
       ue_data[ue_id] =  ue_json;
   }
-  
+
   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
     UEDataHandler handler;
     Reader reader;
@@ -320,25 +350,24 @@ unordered_map<string, UEData> get_sdl_ue_data() {
     string ueID = map_iter->first;
     string serving_cell_id = handler.serving_cell_id;
     int serv_rsrp = handler.serving_cell_rsrp;
-    
+
     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
-    
-  }  
+
+  }
 
   return return_ue_data_map;
-}
+} */
 
 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
 
   int response_to = 0;  // max timeout wating for a response
   int rmtype;          // received message type
 
-  
-  fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
+  fprintf(stderr, "[INFO] Policy Callback got a message, type=%d, length=%d\n", mtype, len);
 
   const char *arg = (const char*)payload.get();
 
-  fprintf(stderr, "payload is %s\n", payload.get());
+  fprintf(stderr, "[INFO] Payload is %s\n", arg);
 
   PolicyHandler handler;
   Reader reader;
@@ -346,85 +375,83 @@ void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_componen
   reader.Parse(ss,handler);
 
   //Set the threshold value
-
   if (handler.found_threshold) {
-    fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
+    fprintf(stderr, "[INFO] Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
     rsrp_threshold = handler.threshold;
   }
 
   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-  
-  
 }
 
-void send_prediction_request(vector<string> ues_to_predict) {
-
-  std::unique_ptr<Message> msg;
-  Msg_component payload;                                // special type of unique pointer to the payload
-  
-  int nthreads = 1;  
-  int response_to = 0;   // max timeout wating for a response
-  int mtype = 30000;
-  int sz;
-  int i;
-  Msg_component send_payload;
-  
-  msg = xfw->Alloc_msg( 2048 );
-  
-  sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
-  if( sz < 2048 ) {
-    fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
-    exit( 1 );
-  }
-
-  string ues_list = "[";
+// callback to handle handover reply (json http response)
+size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
+  const size_t totalBytes( size * num );
+  out->append( in, totalBytes );
+  return totalBytes;
+}
 
-  for (int i = 0; i < ues_to_predict.size(); i++) {
-    if (i == ues_to_predict.size() - 1) {
-      ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
+// sends a handover message through REST
+void send_handoff_request( string msg ) {
+  CURL *curl = curl_easy_init();
+  curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
+  curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
+  curl_easy_setopt( curl, CURLOPT_POST, 1L );
+  // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
+
+  // response information
+  long httpCode( 0 );
+  unique_ptr<string> httpData( new string() );
+
+  curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
+  curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
+  curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
+
+  struct curl_slist *headers = NULL;  // needs to free this after easy perform
+  headers = curl_slist_append( headers, "Accept: application/json" );
+  headers = curl_slist_append( headers, "Content-Type: application/json" );
+  curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
+
+  cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
+  cout << "[INFO] HandOff request is " << msg << endl;
+
+  // sending request
+  CURLcode res = curl_easy_perform( curl );
+  if( res != CURLE_OK ) {
+    cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
+
+  } else {
+
+    curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
+    if( httpCode == 200 ) {
+      // ============== DO SOMETHING USEFUL HERE ===============
+      // Currently, we only print out the HandOff reply
+      rapidjson::Document document;
+      document.Parse( httpData.get()->c_str() );
+      rapidjson::StringBuffer s;
+           rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+      document.Accept( writer );
+      cout << "[INFO] HandOff reply is " << s.GetString() << endl;
+
+
+    } else if ( httpCode == 404 ) {
+      cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
     } else {
-      ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
+      cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
+              "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
     }
-  }
 
-  string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
-
-  const char *body = message_body.c_str();
-
-  //  char *body = "{\"UEPredictionSet\": [\"12345\"]}";
-  
-  send_payload = msg->Get_payload(); // direct access to payload
-  //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
-  snprintf( (char *) send_payload.get(), 2048, body);
-  //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
-
-  fprintf(stderr, "message body %s\n", send_payload.get());  
-  fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
-  
-  // payload updated in place, nothing to copy from, so payload parm is nil
-  if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
-    fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
   }
 
-  /*
-  msg = xfw->Receive( response_to );
-  if( msg != NULL ) {
-    rmtype = msg->Get_mtype();
-    send_payload = msg->Get_payload();
-    fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
-  } 
-  */
-
+  curl_slist_free_all( headers );
+  curl_easy_cleanup( curl );
 }
 
 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
 
-  long now;
-  long total_count;
-
-  int sz;
-  int i;
+  time_t now;
+  string str_now;
+  static unsigned int seq_number = 0; // static counter, not thread-safe
 
   int response_to = 0;  // max timeout wating for a response
 
@@ -432,157 +459,192 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp
   int rmtype;                                                  // received message type
   int delay = 1000000;                         // mu-sec delay; default 1s
 
-  cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
-  cout << "payload is " << payload.get() << "\n";
-
-  mtype = 0;
+  cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
+  cout << "[INFO] Payload is " << payload.get() << endl;
 
   const char* arg = (const char*)payload.get();
   PredictionHandler handler;
 
   try {
-
     Reader reader;
     StringStream ss(arg);
     reader.Parse(ss,handler);
   } catch (...) {
-    cout << "got an exception on stringstream read parse\n";
+    cout << "[ERROR] Got an exception on stringstream read parse\n";
   }
-  
-  std::string pred_ue_id = handler.ue_id;
-  
-  cout << "Prediction for " << pred_ue_id << endl;
-  
-  unordered_map<string, int> throughput_map = handler.cell_pred_down;
-
-  cout << endl;
-  unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
 
-  //Decision about CONTROL message
-  //(1) Identify UE Id in Prediction message
-  //(2) Get UEData struct for this UE Id
-  //(3) Identify the UE's service cell ID
-  //(4) Iterate through Prediction message.
-  //    If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
-
-  UEData pred_ue_data = sdl_data[pred_ue_id];
-  std::string serving_cell_id = pred_ue_data.serving_cell;
-
-  int serving_cell_throughput;
-  int highest_throughput;
-  std::string highest_throughput_cell_id;
-  std::string::size_type str_size;
-
-  for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
-
-    std::string curr_cellid = map_iter->first;
-    int curr_throughput = map_iter->second;
+  // We are only considering download throughput
+  unordered_map<string, int> throughput_map = handler.cell_pred_down;
 
-    if (curr_cellid.compare(serving_cell_id) == 0) {
-      serving_cell_throughput = curr_throughput;
-      highest_throughput = serving_cell_throughput;
-    }
+  // Decision about CONTROL message
+  // (1) Identify UE Id in Prediction message
+  // (2) Iterate through Prediction message.
+  //     If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
+  //     We assume the first cell in the prediction message is the serving cell
 
-  }
+  int serving_cell_throughput = 0;
+  int highest_throughput = 0;
+  string highest_throughput_cell_id;
 
-  //Iterating again to identify the highest throughput prediction
+  // Getting the current serving cell throughput prediction
+  auto cell = throughput_map.find( handler.serving_cell_id );
+  serving_cell_throughput = cell->second;
 
+   // Iterating to identify the highest throughput prediction
   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
 
-    std::string curr_cellid = map_iter->first;
+    string curr_cellid = map_iter->first;
     int curr_throughput = map_iter->second;
 
-    if (curr_throughput > serving_cell_throughput) {
+    if ( highest_throughput < curr_throughput ) {
       highest_throughput = curr_throughput;
       highest_throughput_cell_id = curr_cellid;
     }
+
   }
 
-  if (highest_throughput > serving_cell_throughput) {
-    cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
-    cout << "UE ID: " << pred_ue_id << endl;
-    cout << "Source cell " << serving_cell_id << endl;
-    cout << "Target cell " << highest_throughput_cell_id << endl;
+  if ( highest_throughput > serving_cell_throughput ) {
+    // building a handoff control message
+    now = time( nullptr );
+    str_now = ctime( &now );
+    str_now.pop_back(); // removing the \n character
+
+    seq_number++;       // static counter, not thread-safe
+
+    rapidjson::StringBuffer s;
+         rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+    writer.StartObject();
+    writer.Key( "command" );
+    writer.String( "HandOff" );
+    writer.Key( "seqNo" );
+    writer.Int( seq_number );
+    writer.Key( "ue" );
+    writer.String( handler.ue_id.c_str() );
+    writer.Key( "fromCell" );
+    writer.String( handler.serving_cell_id.c_str() );
+    writer.Key( "toCell" );
+    writer.String( highest_throughput_cell_id.c_str() );
+    writer.Key( "timestamp" );
+    writer.String( str_now.c_str() );
+    writer.Key( "reason" );
+    writer.String( "HandOff Control Request from TS xApp" );
+    writer.Key( "ttl" );
+    writer.Int( 10 );
+    writer.EndObject();
+    // creates a message like
+    /* {
+      "command": "HandOff",
+      "seqNo": 1,
+      "ue": "ueid-here",
+      "fromCell": "CID1",
+      "toCell": "CID3",
+      "timestamp": "Sat May 22 10:35:33 2021",
+      "reason": "HandOff Control Request from TS xApp",
+      "ttl": 10
+    } */
+
+    // sending a control request message
+    send_handoff_request( s.GetString() );
+
+  } else {
+    cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
   }
 
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-  
-  
+  // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );      // validate that we can use the same buffer for 2 rts calls
+  // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
 }
 
+void send_prediction_request( vector<string> ues_to_predict ) {
 
-//This function runs a loop that continuously checks SDL for any UE
-
-void run_loop() {
+  std::unique_ptr<Message> msg;
+  Msg_component payload;                                // special type of unique pointer to the payload
 
-  cout << "in Traffic Steering run_loop()\n";
+  int sz;
+  int i;
+  size_t plen;
+  Msg_component send_payload;
 
-  unordered_map<string, UEData> uemap;
+  msg = xfw->Alloc_msg( 2048 );
 
-  while (1) {
+  sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
+  if( sz < 2048 ) {
+    fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
+    exit( 1 );
+  }
 
-    uemap = get_sdl_ue_data();
+  string ues_list = "[";
 
-    vector<string> prediction_ues;
+  for (int i = 0; i < ues_to_predict.size(); i++) {
+    if (i == ues_to_predict.size() - 1) {
+      ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
+    } else {
+      ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
+    }
+  }
 
-    for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
-      string ueid = map_iter->first;
-      fprintf(stderr,"found a ueid %s\n", ueid.c_str());
-      UEData data = map_iter->second;
+  string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
 
-      fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
+  const char *body = message_body.c_str();
 
-      if (data.serving_cell_rsrp < rsrp_threshold) {
-       fprintf(stderr,"it is less than the rsrp threshold\n");
-       prediction_ues.push_back(ueid);
-      } else {
-       fprintf(stderr,"it is not less than the rsrp threshold\n");
-      }
-    }
+  send_payload = msg->Get_payload(); // direct access to payload
+  snprintf( (char *) send_payload.get(), 2048, "%s", body );
 
-    fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
+  /*
+    we are sending a string, so we have to include the nil byte in the RMR message
+    to keep things simple in the receiver side
+   */
+  plen = strlen( (char *) send_payload.get() ) + 1;
 
-    if (prediction_ues.size() > 0) {
-      send_prediction_request(prediction_ues);
-    }
+  cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
 
-    sleep(20);
+  // payload updated in place, nothing to copy from, so payload parm is nil
+  if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
+    fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
   }
+
 }
 
 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
- * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
+ * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
+ * sends a prediction request to the QP Driver xApp.
  */
 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
-  cout << "payload is " << payload.get() << "\n";
-  mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
+  const char *json = (const char *) payload.get();
+
+  cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
+  cout << "[INFO] Payload is " << json << "\n";
+
+  AnomalyHandler handler;
+  Reader reader;
+  StringStream ss(json);
+  reader.Parse(ss,handler);
+
+  // just sending ACK to the AD xApp
+  mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
+
+  // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
+  // if( handler.degradation < rsrp_threshold )
+  send_prediction_request(handler.prediction_ues);
 }
 
 extern int main( int argc, char** argv ) {
 
   int nthreads = 1;
-
   char*        port = (char *) "4560";
 
-  sdl = shareddatalayer::SyncStorage::create();
-
-  nsu = Namespace(sdl_namespace_u);
-  nsc = Namespace(sdl_namespace_c);
+  // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
+  if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
+    cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
+    return 1;
+  }
 
-  
-  fprintf( stderr, "<XAPP> listening on port: %s\n", port );
-  xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); 
-  
-  xfw->Add_msg_cb( 20010, policy_callback, NULL );
-  xfw->Add_msg_cb( 30002, prediction_callback, NULL );
-  xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
-  
-  std::thread loop_thread;
+  fprintf( stderr, "[TS xApp] listening on port %s\n", port );
+  xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
 
-  loop_thread = std::thread(&run_loop);
+  xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL );          // msg type 20010
+  xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL );  // msg type 30002
+  xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
 
   xfw->Run( nthreads );
-  
+
 }