Changes to SDL read to support new keys used by KPIMON
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
index 4cb3ebc..ca0cd6c 100644 (file)
@@ -85,36 +85,109 @@ struct UEData {
 
 };
 
-struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
+struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
   unordered_map<string, string> cell_pred;
   std::string ue_id;
   bool ue_id_found = false;
   string curr_key = "";
   string curr_value = "";
-  bool Null() { cout << "Null()" << endl; return true; }
-  bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
-  bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
-  bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; }
-  bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
-  bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
-  bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
+  int policy_type_id;
+  int policy_instance_id;
+  int threshold;
+  std::string operation;
+  bool found_threshold = false;
+
+  
+  bool Null() { return true; }
+  bool Bool(bool b) { return true; }
+  bool Int(int i) {
+
+    if (curr_key.compare("policy_type_id") == 0) {
+      policy_type_id = i;
+    } else if (curr_key.compare("policy_instance_id") == 0) {
+      policy_instance_id = i;
+    } else if (curr_key.compare("threshold") == 0) {
+      found_threshold = true;
+      threshold = i;
+    }
+
+    return true;
+  }
+  bool Uint(unsigned u) {
+
+    if (curr_key.compare("policy_type_id") == 0) {
+      policy_type_id = u;
+    } else if (curr_key.compare("policy_instance_id") == 0) {
+      policy_instance_id = u;
+    } else if (curr_key.compare("threshold") == 0) {
+      found_threshold = true;
+      threshold = u;
+    }
+
+    return true;
+  }    
+  bool Int64(int64_t i) {  return true; }
+  bool Uint64(uint64_t u) {  return true; }
+  bool Double(double d) {  return true; }
   bool String(const char* str, SizeType length, bool copy) {
-    cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
-    if (curr_key.compare("") != 0) {
-      cout << "Found throughput\n";
-      curr_value = str;
-      cell_pred[curr_key] = curr_value;
-      curr_key = "";
-      curr_value = "";
+    
+    if (curr_key.compare("operation") != 0) {
+      operation = str;
     }
 
     return true;
   }
-  bool StartObject() { cout << "StartObject()" << endl; return true; }
+  bool StartObject() {
+
+    return true;
+  }
+  bool Key(const char* str, SizeType length, bool copy) {
+
+    curr_key = str;
+
+    return true;
+  }
+  bool EndObject(SizeType memberCount) {  return true; }
+  bool StartArray() {  return true; }
+  bool EndArray(SizeType elementCount) {  return true; }
+
+};
+
+struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
+  unordered_map<string, int> cell_pred_down;
+  unordered_map<string, int> cell_pred_up;
+  std::string ue_id;
+  bool ue_id_found = false;
+  string curr_key = "";
+  string curr_value = "";
+  bool down_val = true;
+  bool Null() {  return true; }
+  bool Bool(bool b) {  return true; }
+  bool Int(int i) {  return true; }
+  bool Uint(unsigned u) {    
+
+    if (down_val) {
+      cell_pred_down[curr_key] = u;
+      down_val = false;
+    } else {
+      cell_pred_up[curr_key] = u;
+      down_val = true;
+    }
+
+    return true;    
+
+  }
+  bool Int64(int64_t i) {  return true; }
+  bool Uint64(uint64_t u) {  return true; }
+  bool Double(double d) {  return true; }
+  bool String(const char* str, SizeType length, bool copy) {
+
+    return true;
+  }
+  bool StartObject() {  return true; }
   bool Key(const char* str, SizeType length, bool copy) {
-    cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
     if (!ue_id_found) {
-      cout << "Found UE ID\n";
+
       ue_id = str;
       ue_id_found = true;
     } else {
@@ -122,9 +195,9 @@ struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
     }
     return true;
   }
-  bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
-  bool StartArray() { cout << "StartArray()" << endl; return true; }
-  bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
+  bool EndObject(SizeType memberCount) {  return true; }
+  bool StartArray() {  return true; }
+  bool EndArray(SizeType elementCount) {  return true; }
 };
 
 
@@ -137,51 +210,63 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
   bool in_serving_array = false;
   int rf_meas_index = 0;
 
+  bool in_serving_report_object = false;  
+
   string curr_key = "";
   string curr_value = "";
-  bool Null() { cout << "Null()" << endl; return true; }
-  bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
+  bool Null() { return true; }
+  bool Bool(bool b) { return true; }
   bool Int(int i) {
-    fprintf(stderr, "Int(%d)\n", i);
-    if (in_serving_array) {
-      fprintf(stderr, "we are in serving array\n");
-      switch(rf_meas_index) {
-      case 0:
+
+    return true;
+  }
+  
+  bool Uint(unsigned i) {
+
+    if (in_serving_report_object) {
+      if (curr_key.compare("rsrp") == 0) {
        serving_cell_rsrp = i;
-       break;
-      case 1:
+      } else if (curr_key.compare("rsrq") == 0) {
        serving_cell_rsrq = i;
-       break;
-      case 2:
+      } else if (curr_key.compare("rssinr") == 0) {
        serving_cell_sinr = i;
-       break;
       }
-      rf_meas_index++;
-    }
-    return true;
-  }
-  bool Uint(unsigned u) {
-    fprintf(stderr, "Int(%d)\n", u); return true; }
-  bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
-  bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
-  bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
+    }          
+    
+    return true; }
+  bool Int64(int64_t i) {
+
+    return true; }
+  bool Uint64(uint64_t i) {
+
+    return true; }
+  bool Double(double d) { return true; }
   bool String(const char* str, SizeType length, bool copy) {
-    fprintf(stderr,"String(%s)\n", str);
+    
     if (curr_key.compare("ServingCellID") == 0) {
       serving_cell_id = str;
     } 
 
     return true;
   }
-  bool StartObject() { cout << "StartObject()" << endl; return true; }
+  bool StartObject() {
+    if (curr_key.compare("ServingCellRF") == 0) {
+      in_serving_report_object = true;
+    }
+
+    return true; }
   bool Key(const char* str, SizeType length, bool copy) {
-    fprintf(stderr,"Key(%s)\n", str);
+    
     curr_key = str;
     return true;
   }
-  bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
+  bool EndObject(SizeType memberCount) {
+    if (curr_key.compare("ServingCellRF") == 0) {
+      in_serving_report_object = false;
+    }
+    return true; }
   bool StartArray() {
-    fprintf(stderr,"StartArray()");
+
     if (curr_key.compare("ServingCellRF") == 0) {
       in_serving_array = true;
     }
@@ -189,7 +274,7 @@ struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
     return true;
   }
   bool EndArray(SizeType elementCount) {
-    fprintf(stderr, "EndArray()\n");
+
     if (curr_key.compare("servingCellRF") == 0) {
       in_serving_array = false;
       rf_meas_index = 0;
@@ -207,7 +292,7 @@ unordered_map<string, UEData> get_sdl_ue_data() {
 
   unordered_map<string, UEData> return_ue_data_map;
     
-  std::string prefix3="12";
+  std::string prefix3="";
   Keys K2 = sdl->findKeys(nsu, prefix3);
   DataMap Dk2 = sdl->get(nsu, K2);
   
@@ -218,7 +303,7 @@ unordered_map<string, UEData> get_sdl_ue_data() {
     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
     char val[val_v.size()+1];                               // from Data
     int i;
-    fprintf(stderr, "val size %d\n", val_v.size());
+
     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
     val[i]='\0';
       ue_id.assign((std::string)*si);
@@ -227,10 +312,6 @@ unordered_map<string, UEData> get_sdl_ue_data() {
       ue_data[ue_id] =  ue_json;
   }
   
-  fprintf(stderr, "after sdl get of ue data\n");
-  
-  fprintf(stderr, "From UE data map\n");
-  
   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
     UEDataHandler handler;
     Reader reader;
@@ -241,15 +322,10 @@ unordered_map<string, UEData> get_sdl_ue_data() {
     string serving_cell_id = handler.serving_cell_id;
     int serv_rsrp = handler.serving_cell_rsrp;
     
-    fprintf(stderr,"UE data for %s\n", ueID.c_str());
-    fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
-    fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
-
     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
     
-  }
-  
-  fprintf(stderr, "\n");
+  }  
+
   return return_ue_data_map;
 }
 
@@ -259,15 +335,27 @@ void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_componen
   int rmtype;          // received message type
 
   
-  fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
+  fprintf(stderr, "Policy Callback got a message, type=%d, length=%d\n", mtype, len);
+
+  const char *arg = (const char*)payload.get();
+
   fprintf(stderr, "payload is %s\n", payload.get());
-  
-  //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+
+  PolicyHandler handler;
+  Reader reader;
+  StringStream ss(arg);
+  reader.Parse(ss,handler);
 
   //Set the threshold value
 
+  if (handler.found_threshold) {
+    fprintf(stderr, "Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
+    rsrp_threshold = handler.threshold;
+  }
+
+  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
+  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+  
   
 }
 
@@ -283,8 +371,6 @@ void send_prediction_request(vector<string> ues_to_predict) {
   int i;
   Msg_component send_payload;
   
-  fprintf(stderr, "cb 1\n");
-
   msg = xfw->Alloc_msg( 2048 );
   
   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
@@ -293,13 +379,11 @@ void send_prediction_request(vector<string> ues_to_predict) {
     exit( 1 );
   }
 
-  fprintf(stderr, "cb 2");
-
   string ues_list = "[";
 
   for (int i = 0; i < ues_to_predict.size(); i++) {
     if (i == ues_to_predict.size() - 1) {
-      ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
+      ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"]";
     } else {
       ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
     }
@@ -313,12 +397,10 @@ void send_prediction_request(vector<string> ues_to_predict) {
   
   send_payload = msg->Get_payload(); // direct access to payload
   //  snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
-  //  snprintf( (char *) send_payload.get(), 2048, body);
-  snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
+  snprintf( (char *) send_payload.get(), 2048, body);
+  //snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
 
-  fprintf(stderr, "message body %s\n", send_payload.get());
-  
-  fprintf(stderr, "cb 3");
+  fprintf(stderr, "message body %s\n", send_payload.get());  
   fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
   
   // payload updated in place, nothing to copy from, so payload parm is nil
@@ -326,8 +408,6 @@ void send_prediction_request(vector<string> ues_to_predict) {
     fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
   }
 
-  fprintf(stderr, "cb 4");
-
   /*
   msg = xfw->Receive( response_to );
   if( msg != NULL ) {
@@ -353,29 +433,28 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp
   int rmtype;                                                  // received message type
   int delay = 1000000;                         // mu-sec delay; default 1s
 
-  fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
-  fprintf(stderr, "payload is %s\n", payload.get());
-  
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
-  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
+  cout << "Prediction Callback got a message, type=" << mtype << " , length=" << len << "\n";
+  cout << "payload is " << payload.get() << "\n";
 
   mtype = 0;
 
-  fprintf(stderr, "cb 1\n");
-
-  char *incoming_msg = "{\"12345\": {\"222\": \"20000\", \"333\" : \"50000\"} }";
-
+  const char* arg = (const char*)payload.get();
   PredictionHandler handler;
-  Reader reader;
-  StringStream ss(incoming_msg);
-  reader.Parse(ss,handler);
 
-  std::string pred_ue_id = handler.ue_id;
+  try {
 
+    Reader reader;
+    StringStream ss(arg);
+    reader.Parse(ss,handler);
+  } catch (...) {
+    cout << "got an exception on stringstream read parse\n";
+  }
+  
+  std::string pred_ue_id = handler.ue_id;
+  
   cout << "Prediction for " << pred_ue_id << endl;
-
-  unordered_map<string, string> throughput_map = handler.cell_pred;
-
+  
+  unordered_map<string, int> throughput_map = handler.cell_pred_down;
 
   cout << endl;
  
@@ -396,14 +475,10 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp
   std::string highest_throughput_cell_id;
   std::string::size_type str_size;
 
-  cout << "Going through throughtput map:" << endl;
-
   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
-    cout << map_iter->first << " : " << map_iter->second << endl;    
+
     std::string curr_cellid = map_iter->first;
-    cout << "Cell ID is " << curr_cellid;
-    int curr_throughput = stoi(map_iter->second, &str_size);
-    cout << "Throughput is " << curr_throughput << endl;
+    int curr_throughput = map_iter->second;
 
     if (curr_cellid.compare(serving_cell_id) == 0) {
       serving_cell_throughput = curr_throughput;
@@ -415,11 +490,9 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp
   //Iterating again to identify the highest throughput prediction
 
   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
-    cout << map_iter->first << " : " << map_iter->second << endl;    
+
     std::string curr_cellid = map_iter->first;
-    cout << "Cell ID is " << curr_cellid;
-    int curr_throughput = stoi(map_iter->second, &str_size);
-    cout << "Throughput is " << curr_throughput << endl;
+    int curr_throughput = map_iter->second;
 
     if (curr_throughput > serving_cell_throughput) {
       highest_throughput = curr_throughput;
@@ -433,6 +506,9 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp
     cout << "Source cell " << serving_cell_id << endl;
     cout << "Target cell " << highest_throughput_cell_id << endl;
   }
+
+  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
+  mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
   
   
 }
@@ -442,26 +518,33 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp
 
 void run_loop() {
 
-  fprintf(stderr, "in run_loop()\n");
+  cout << "in Traffic Steering run_loop()\n";
 
   unordered_map<string, UEData> uemap;
 
-  vector<string> prediction_ues;
-
   while (1) {
 
-    fprintf(stderr, "in while loop\n");
-
     uemap = get_sdl_ue_data();
 
+    vector<string> prediction_ues;
+
     for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
       string ueid = map_iter->first;
+      fprintf(stderr,"found a ueid %s\n", ueid.c_str());
       UEData data = map_iter->second;
+
+      fprintf(stderr, "current rsrp is %d\n", data.serving_cell_rsrp);
+
       if (data.serving_cell_rsrp < rsrp_threshold) {
+       fprintf(stderr,"it is less than the rsrp threshold\n");
        prediction_ues.push_back(ueid);
+      } else {
+       fprintf(stderr,"it is not less than the rsrp threshold\n");
       }
     }
 
+    fprintf(stderr, "the size of pred ues is %d\n", prediction_ues.size());
+
     if (prediction_ues.size() > 0) {
       send_prediction_request(prediction_ues);
     }
@@ -470,7 +553,13 @@ void run_loop() {
   }
 }
 
-
+/* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
+ * It just print the payload received from AD xApp and send an ACK with same UEID as payload to AD xApp.
+ */
+void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
+  cout << "payload is " << payload.get() << "\n";
+  mbuf.Send_response(30004, -1, strlen((char *) payload.get()), (unsigned char *) payload.get());
+}
 
 extern int main( int argc, char** argv ) {
 
@@ -482,17 +571,14 @@ extern int main( int argc, char** argv ) {
 
   nsu = Namespace(sdl_namespace_u);
   nsc = Namespace(sdl_namespace_c);
-  
+
   
   fprintf( stderr, "<XAPP> listening on port: %s\n", port );
-  xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
-  fprintf(stderr, "code1\n");
-
+  xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); 
   
   xfw->Add_msg_cb( 20010, policy_callback, NULL );
   xfw->Add_msg_cb( 30002, prediction_callback, NULL );
-
-  fprintf(stderr, "code2\n");
+  xfw->Add_msg_cb( 30003, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
   
   std::thread loop_thread;