Update TS to enforce A1 policy on throughput predictions
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
index bf146ec..e4a06e7 100644 (file)
@@ -31,6 +31,8 @@
 
   Modified: 21 May 2021 (Alexandre Huff)
             Update for traffic steering use case in release D.
+            07 Dec 2021 (Alexandre Huff)
+            Update for traffic steering use case in release E.
 */
 
 #include <stdio.h>
 #include <rapidjson/reader.h>
 #include <rapidjson/prettywriter.h>
 
-#include <curl/curl.h>
 #include <rmr/RIC_message_types.h>
-#include "ricxfcpp/xapp.hpp"
+#include <ricxfcpp/xapp.hpp>
+#include <ricxfcpp/config.hpp>
 
+/*
+  FIXME unfortunately this RMR flag has to be disabled
+  due to name resolution conflicts.
+  RC xApp defines the same name for gRPC control messages.
+*/
+#undef RIC_CONTROL_ACK
 
-// Defines env name for the endpoint to POST handoff control messages
-#define ENV_CONTROL_URL "TS_CONTROL_URL"
+#include <grpc/grpc.h>
+#include <grpcpp/channel.h>
+#include <grpcpp/client_context.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/security/credentials.h>
+#include "protobuf/api.grpc.pb.h"
+
+#include "utils/restclient.hpp"
 
 
 using namespace rapidjson;
@@ -75,13 +89,25 @@ using Keys = std::set<Key>;
 
 
 // ----------------------------------------------------------
+std::unique_ptr<Xapp> xfw;
+std::unique_ptr<api::MsgComm::Stub> rc_stub;
 
-// Stores the the URL to POST handoff control messages
-const char *ts_control_url;
+int downlink_threshold = 0;  // A1 policy type 20008 (in percentage)
 
-std::unique_ptr<Xapp> xfw;
+// scoped enum to identify which API is used to send control messages
+enum class TsControlApi { REST, gRPC };
+TsControlApi ts_control_api;  // api to send control messages
+string ts_control_ep;         // api target endpoint
+
+typedef struct nodeb {
+  string ran_name;
+  struct {
+    string plmn_id;
+    string nb_id;
+  } global_nb_id;
+} nodeb_t;
 
-int rsrp_threshold = 0;
+unordered_map<string, shared_ptr<nodeb_t>> cell_map; // maps each cell to its nodeb
 
 /* struct UEData {
   string serving_cell;
@@ -89,6 +115,10 @@ int rsrp_threshold = 0;
 }; */
 
 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
+  /*
+    Assuming we receive the following payload from A1 Mediator
+    {"operation": "CREATE", "policy_type_id": 20008, "policy_instance_id": "tsapolicy145", "payload": {"threshold": 5}}
+  */
   unordered_map<string, string> cell_pred;
   std::string ue_id;
   bool ue_id_found = false;
@@ -229,6 +259,47 @@ struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
   }
 };
 
+struct NodebListHandler : public BaseReaderHandler<UTF8<>, NodebListHandler> {
+  vector<string> nodeb_list;
+  string curr_key = "";
+
+  bool Key(const Ch* str, SizeType length, bool copy) {
+    curr_key = str;
+    return true;
+  }
+
+  bool String(const Ch* str, SizeType length, bool copy) {
+    if( curr_key.compare( "inventoryName" ) == 0 ) {
+      nodeb_list.push_back( str );
+    }
+    return true;
+  }
+};
+
+struct NodebHandler : public BaseReaderHandler<UTF8<>, NodebHandler> {
+  string curr_key = "";
+  shared_ptr<nodeb_t> nodeb = make_shared<nodeb_t>();
+
+  bool Key(const Ch* str, SizeType length, bool copy) {
+    curr_key = str;
+    return true;
+  }
+
+  bool String(const Ch* str, SizeType length, bool copy) {
+    if( curr_key.compare( "ranName" ) == 0 ) {
+      nodeb->ran_name = str;
+    } else if( curr_key.compare( "plmnId" ) == 0 ) {
+      nodeb->global_nb_id.plmn_id = str;
+    } else if( curr_key.compare( "nbId" ) == 0 ) {
+      nodeb->global_nb_id.nb_id = str;
+    } else if( curr_key.compare( "cellId" ) == 0 ) {
+      cell_map[str] = nodeb;
+    }
+    return true;
+  }
+
+};
+
 
 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
   unordered_map<string, string> cell_pred;
@@ -359,115 +430,155 @@ struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
 } */
 
 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
+  string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
 
-  int response_to = 0;  // max timeout wating for a response
-  int rmtype;          // received message type
-
-  fprintf(stderr, "[INFO] Policy Callback got a message, type=%d, length=%d\n", mtype, len);
-
-  const char *arg = (const char*)payload.get();
-
-  fprintf(stderr, "[INFO] Payload is %s\n", arg);
+  cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length=" << len << "\n";
+  cout << "[INFO] Payload is " << arg << endl;
 
   PolicyHandler handler;
   Reader reader;
-  StringStream ss(arg);
+  StringStream ss(arg.c_str());
   reader.Parse(ss,handler);
 
   //Set the threshold value
   if (handler.found_threshold) {
-    fprintf(stderr, "[INFO] Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
-    rsrp_threshold = handler.threshold;
+    cout << "[INFO] Setting Threshold for A1-P value: " << handler.threshold << "%\n";
+    downlink_threshold = handler.threshold;
   }
 
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-}
-
-// callback to handle handover reply (json http response)
-size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
-  const size_t totalBytes( size * num );
-  out->append( in, totalBytes );
-  return totalBytes;
 }
 
 // sends a handover message through REST
-void send_handoff_request( string msg ) {
-  CURL *curl = curl_easy_init();
-  curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
-  curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
-  curl_easy_setopt( curl, CURLOPT_POST, 1L );
-  // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
-
-  // response information
-  long httpCode( 0 );
-  unique_ptr<string> httpData( new string() );
-
-  curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
-  curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
-  curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
-
-  struct curl_slist *headers = NULL;  // needs to free this after easy perform
-  headers = curl_slist_append( headers, "Accept: application/json" );
-  headers = curl_slist_append( headers, "Content-Type: application/json" );
-  curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
-
-  cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
+void send_rest_control_request( string ue_id, string serving_cell_id, string target_cell_id ) {
+  time_t now;
+  string str_now;
+  static unsigned int seq_number = 0; // static counter, not thread-safe
+
+  // building a handoff control message
+  now = time( nullptr );
+  str_now = ctime( &now );
+  str_now.pop_back(); // removing the \n character
+
+  seq_number++;       // static counter, not thread-safe
+
+  rapidjson::StringBuffer s;
+  rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+  writer.StartObject();
+  writer.Key( "command" );
+  writer.String( "HandOff" );
+  writer.Key( "seqNo" );
+  writer.Int( seq_number );
+  writer.Key( "ue" );
+  writer.String( ue_id.c_str() );
+  writer.Key( "fromCell" );
+  writer.String( serving_cell_id.c_str() );
+  writer.Key( "toCell" );
+  writer.String( target_cell_id.c_str() );
+  writer.Key( "timestamp" );
+  writer.String( str_now.c_str() );
+  writer.Key( "reason" );
+  writer.String( "HandOff Control Request from TS xApp" );
+  writer.Key( "ttl" );
+  writer.Int( 10 );
+  writer.EndObject();
+  // creates a message like
+  /* {
+    "command": "HandOff",
+    "seqNo": 1,
+    "ue": "ueid-here",
+    "fromCell": "CID1",
+    "toCell": "CID3",
+    "timestamp": "Sat May 22 10:35:33 2021",
+    "reason": "HandOff Control Request from TS xApp",
+    "ttl": 10
+  } */
+
+  string msg = s.GetString();
+
+  cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
   cout << "[INFO] HandOff request is " << msg << endl;
 
   // sending request
-  CURLcode res = curl_easy_perform( curl );
-  if( res != CURLE_OK ) {
-    cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
-
-  } else {
+  restclient::RestClient client( ts_control_ep );
+  restclient::response_t resp = client.do_post( "", msg ); // we already have the full path in ts_control_ep
 
-    curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
-    if( httpCode == 200 ) {
+  if( resp.status_code == 200 ) {
       // ============== DO SOMETHING USEFUL HERE ===============
       // Currently, we only print out the HandOff reply
       rapidjson::Document document;
-      document.Parse( httpData.get()->c_str() );
+      document.Parse( resp.body.c_str() );
       rapidjson::StringBuffer s;
            rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
       document.Accept( writer );
       cout << "[INFO] HandOff reply is " << s.GetString() << endl;
 
+  } else {
+      cout << "[ERROR] Unexpected HTTP code " << resp.status_code << " from " << \
+              client.getBaseUrl() << \
+              "\n[ERROR] HTTP payload is " << resp.body.c_str() << endl;
+  }
+
+}
+
+// sends a handover message to RC xApp through gRPC
+void send_grpc_control_request( string ue_id, string target_cell_id ) {
+  grpc::ClientContext context;
+
+  api::RicControlGrpcRsp response;
+  shared_ptr<api::RicControlGrpcReq> request = make_shared<api::RicControlGrpcReq>();
+
+  api::RICE2APHeader *apHeader = request->mutable_rice2apheaderdata();
+  apHeader->set_ranfuncid( 300 );
+  apHeader->set_ricrequestorid( 1001 );
+
+  api::RICControlHeader *ctrlHeader = request->mutable_riccontrolheaderdata();
+  ctrlHeader->set_controlstyle( 3 );
+  ctrlHeader->set_controlactionid( 1 );
+  ctrlHeader->set_ueid( ue_id );
 
-    } else if ( httpCode == 404 ) {
-      cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
+  api::RICControlMessage *ctrlMsg = request->mutable_riccontrolmessagedata();
+  ctrlMsg->set_riccontrolcelltypeval( api::RIC_CONTROL_CELL_UNKWON );
+  ctrlMsg->set_targetcellid( target_cell_id );
+
+  auto data = cell_map.find( target_cell_id );
+  if( data != cell_map.end() ) {
+    request->set_e2nodeid( data->second->global_nb_id.nb_id );
+    request->set_plmnid( data->second->global_nb_id.plmn_id );
+    request->set_ranname( data->second->ran_name );
+  } else {
+    request->set_e2nodeid( "unknown_e2nodeid" );
+    request->set_plmnid( "unknown_plmnid" );
+    request->set_ranname( "unknown_ranname" );
+  }
+  request->set_riccontrolackreqval( api::RIC_CONTROL_ACK_UNKWON );  // not yet used in api.proto
+
+  grpc::Status status = rc_stub->SendRICControlReqServiceGrpc( &context, *request, &response );
+
+  if( status.ok() ) {
+    if( response.rspcode() == 0 ) {
+      cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
     } else {
-      cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
-              "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
+      cout << "[ERROR] Control Request failed with code=" << response.rspcode()
+           << ", description=" << response.description() << endl;
     }
 
+  } else {
+    cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
+         << status.error_code() << ", error_msg=" << status.error_message() << endl;
   }
 
-  curl_slist_free_all( headers );
-  curl_easy_cleanup( curl );
 }
 
 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
-
-  time_t now;
-  string str_now;
-  static unsigned int seq_number = 0; // static counter, not thread-safe
-
-  int response_to = 0;  // max timeout wating for a response
-
-  int send_mtype = 0;
-  int rmtype;                                                  // received message type
-  int delay = 1000000;                         // mu-sec delay; default 1s
+  string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
 
   cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
-  cout << "[INFO] Payload is " << payload.get() << endl;
+  cout << "[INFO] Payload is " << json << endl;
 
-  const char* arg = (const char*)payload.get();
   PredictionHandler handler;
-
   try {
     Reader reader;
-    StringStream ss(arg);
+    StringStream ss(json.c_str());
     reader.Parse(ss,handler);
   } catch (...) {
     cout << "[ERROR] Got an exception on stringstream read parse\n";
@@ -503,61 +614,29 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp
 
   }
 
-  if ( highest_throughput > serving_cell_throughput ) {
-    // building a handoff control message
-    now = time( nullptr );
-    str_now = ctime( &now );
-    str_now.pop_back(); // removing the \n character
-
-    seq_number++;       // static counter, not thread-safe
-
-    rapidjson::StringBuffer s;
-         rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
-    writer.StartObject();
-    writer.Key( "command" );
-    writer.String( "HandOff" );
-    writer.Key( "seqNo" );
-    writer.Int( seq_number );
-    writer.Key( "ue" );
-    writer.String( handler.ue_id.c_str() );
-    writer.Key( "fromCell" );
-    writer.String( handler.serving_cell_id.c_str() );
-    writer.Key( "toCell" );
-    writer.String( highest_throughput_cell_id.c_str() );
-    writer.Key( "timestamp" );
-    writer.String( str_now.c_str() );
-    writer.Key( "reason" );
-    writer.String( "HandOff Control Request from TS xApp" );
-    writer.Key( "ttl" );
-    writer.Int( 10 );
-    writer.EndObject();
-    // creates a message like
-    /* {
-      "command": "HandOff",
-      "seqNo": 1,
-      "ue": "ueid-here",
-      "fromCell": "CID1",
-      "toCell": "CID3",
-      "timestamp": "Sat May 22 10:35:33 2021",
-      "reason": "HandOff Control Request from TS xApp",
-      "ttl": 10
-    } */
+  float thresh = 0;
+  if( downlink_threshold > 0 ) {  // we also take into account the threshold in A1 policy type 20008
+    thresh = serving_cell_throughput * (downlink_threshold / 100.0);
+  }
+
+  if ( highest_throughput > ( serving_cell_throughput + thresh ) ) {
 
     // sending a control request message
-    send_handoff_request( s.GetString() );
+    if ( ts_control_api == TsControlApi::REST ) {
+      send_rest_control_request( handler.ue_id, handler.serving_cell_id, highest_throughput_cell_id );
+    } else {
+      send_grpc_control_request( handler.ue_id, highest_throughput_cell_id );
+    }
 
   } else {
     cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
   }
 
-  // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );      // validate that we can use the same buffer for 2 rts calls
-  // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
 }
 
 void send_prediction_request( vector<string> ues_to_predict ) {
-
   std::unique_ptr<Message> msg;
-  Msg_component payload;                                // special type of unique pointer to the payload
+  Msg_component payload;           // special type of unique pointer to the payload
 
   int sz;
   int i;
@@ -584,16 +663,10 @@ void send_prediction_request( vector<string> ues_to_predict ) {
 
   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
 
-  const char *body = message_body.c_str();
-
   send_payload = msg->Get_payload(); // direct access to payload
-  snprintf( (char *) send_payload.get(), 2048, "%s", body );
+  snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
 
-  /*
-    we are sending a string, so we have to include the nil byte in the RMR message
-    to keep things simple in the receiver side
-   */
-  plen = strlen( (char *) send_payload.get() ) + 1;
+  plen = strlen( (char *)send_payload.get() );
 
   cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
 
@@ -608,34 +681,111 @@ void send_prediction_request( vector<string> ues_to_predict ) {
  * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
  * sends a prediction request to the QP Driver xApp.
  */
-void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
-  const char *json = (const char *) payload.get();
+void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
+  string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
 
   cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
   cout << "[INFO] Payload is " << json << "\n";
 
   AnomalyHandler handler;
   Reader reader;
-  StringStream ss(json);
+  StringStream ss(json.c_str());
   reader.Parse(ss,handler);
 
   // just sending ACK to the AD xApp
   mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
 
-  // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
-  // if( handler.degradation < rsrp_threshold )
   send_prediction_request(handler.prediction_ues);
 }
 
-extern int main( int argc, char** argv ) {
+vector<string> get_nodeb_list( restclient::RestClient& client ) {
+
+  restclient::response_t response = client.do_get( "/v1/nodeb/states" );
+
+  NodebListHandler handler;
+  if( response.status_code == 200 ) {
+    Reader reader;
+    StringStream ss( response.body.c_str() );
+    reader.Parse( ss, handler );
+
+    cout << "[INFO] nodeb list is " << response.body.c_str() << endl;
 
+  } else {
+    if( response.body.empty() ) {
+      cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() << endl;
+    } else {
+      cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() <<
+              ". HTTP payload is " << response.body.c_str() << endl;
+    }
+  }
+
+  return handler.nodeb_list;
+}
+
+bool build_cell_mapping() {
+  string base_url;
+  char *data = getenv( "SERVICE_E2MGR_HTTP_BASE_URL" );
+  if ( data == NULL ) {
+    base_url = "http://service-ricplt-e2mgr-http.ricplt:3800";
+  } else {
+    base_url = string( data );
+  }
+
+  restclient::RestClient client( base_url );
+
+  vector<string> nb_list = get_nodeb_list( client );
+
+  for( string nb : nb_list ) {
+    string full_path = string("/v1/nodeb/") + nb;
+    restclient::response_t response = client.do_get( full_path );
+    if( response.status_code != 200 ) {
+      if( response.body.empty() ) {
+        cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
+                client.getBaseUrl() + full_path << endl;
+      } else {
+        cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
+              client.getBaseUrl() + full_path << ". HTTP payload is " << response.body.c_str() << endl;
+      }
+      return false;
+    }
+
+    try {
+      NodebHandler handler;
+      Reader reader;
+      StringStream ss( response.body.c_str() );
+      reader.Parse( ss, handler );
+    } catch (...) {
+      cout << "[ERROR] Got an exception on parsing nodeb (stringstream read parse)\n";
+      return false;
+    }
+  }
+
+  return true;
+}
+
+extern int main( int argc, char** argv ) {
   int nthreads = 1;
   char*        port = (char *) "4560";
+  shared_ptr<grpc::Channel> channel;
+
+  Config *config = new Config();
+  string api = config->Get_control_str("ts_control_api");
+  ts_control_ep = config->Get_control_str("ts_control_ep");
+  if ( api.empty() ) {
+    cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
+    exit(1);
+  }
+  if ( api.compare("rest") == 0 ) {
+    ts_control_api = TsControlApi::REST;
+  } else {
+    ts_control_api = TsControlApi::gRPC;
+
+    if( !build_cell_mapping() ) {
+      cout << "[ERROR] unable to map cells to nodeb\n";
+    }
 
-  // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
-  if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
-    cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
-    return 1;
+    channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
+    rc_stub = api::MsgComm::NewStub(channel, grpc::StubOptions());
   }
 
   fprintf( stderr, "[TS xApp] listening on port %s\n", port );