Initialization script now passes signal to xapp process
[ric-app/admin.git] / src / message_processor_class.cc
index 1601daa..b5b421a 100644 (file)
 
 int verbose_flag = 0;
 
-message_processor::message_processor(int mode, bool report_mode, size_t buffer_length, size_t reporting_interval):  _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
+message_processor::message_processor(int mode, bool report_mode, size_t buffer_length):  _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
   processing_level = mode;
   report_mode_only = report_mode;
   _buffer_size = buffer_length;
   scratch_buffer = 0;
   scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
   assert(scratch_buffer != 0);
-  _reporting_interval = reporting_interval;
-  num_proc_packets = 0;
-  processing_duration = 0;
-  processing_dev = 0;
-  max_proc_duration = 0;
 
 };
 
@@ -67,8 +62,6 @@ bool message_processor::operator()(rmr_mbuf_t *message){
   _num_messages ++;
   std::string response;
 
-  //FILE *pfile;
-  //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
 
   state = NO_ERROR;
   mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
@@ -80,10 +73,7 @@ bool message_processor::operator()(rmr_mbuf_t *message){
     return false;
   }
   
-  // start measurement 
-  auto start = std::chrono::high_resolution_clock::now();
-  
-  // main message processing code
+  // routing based on message type 
   switch(message->mtype){
     
   case RIC_INDICATION:
@@ -94,10 +84,6 @@ bool message_processor::operator()(rmr_mbuf_t *message){
       break;
     }
     
-    //pfile = fopen(filename.c_str(), "wb");
-    //fwrite(message->payload, sizeof(char), message->len, pfile);
-    //fclose(pfile);
-
     e2ap_recv_pdu = 0;
     e2sm_header = 0;
     
@@ -113,9 +99,11 @@ bool message_processor::operator()(rmr_mbuf_t *message){
        mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
        goto finished;
       }
+
       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
       //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
+
     }
     else{
       num_err_indications ++;
@@ -163,7 +151,7 @@ bool message_processor::operator()(rmr_mbuf_t *message){
     
 
     // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
-    // Send payload to plugin
+    // Send payload to protector plugin
     current_index = 0;
     remaining_buffer = _buffer_size;
     mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
@@ -225,7 +213,9 @@ bool message_processor::operator()(rmr_mbuf_t *message){
     control_data.req_id = indication_data.req_id;
     control_data.req_seq_no = indication_data.req_seq_no;
     control_data.func_id = indication_data.func_id;
-    control_data.control_ack = 2; // no ack required       
+    control_data.control_ack = 1; // no ack required
+    control_data.call_process_id = indication_data.call_process_id;
+    control_data.call_process_id_size = indication_data.call_process_id_size;
     res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
     
     if(likely(res)){
@@ -252,8 +242,11 @@ bool message_processor::operator()(rmr_mbuf_t *message){
   case (RIC_SUB_FAILURE):
   case ( RIC_SUB_DEL_FAILURE ):
     if (_ref_sub_handler != NULL){
+      // extract meid ..
+      unsigned char meid[32];
+      rmr_get_meid(message, meid);
       mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
-      _ref_sub_handler->Response(message->mtype, message->payload, message->len);
+      _ref_sub_handler->Response(message->mtype, message->payload, message->len, (const char *)meid);
     }
     else{
       state = MISSING_HANDLER_ERROR;
@@ -263,16 +256,16 @@ bool message_processor::operator()(rmr_mbuf_t *message){
     break;
     
     
-  case DC_ADM_INT_CONTROL:
+  case A1_POLICY_REQ:
     {
       if(_ref_policy_handler != NULL){
        // Need to apply config. Since config may need to be
        // applied across all threads, we do a callback to the parent thread.
        // wait for config to be applied and then send response
-       _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true);
+       _ref_policy_handler(A1_POLICY_REQ, (const char *) message->payload, message->len, response, true);
        std::memcpy( (char *) message->payload, response.c_str(),  response.length());
        message->len = response.length();
-       message->mtype = DC_ADM_INT_CONTROL_ACK;
+       message->mtype = A1_POLICY_RESP;
        send_msg = true;
       }
       else{
@@ -282,56 +275,12 @@ bool message_processor::operator()(rmr_mbuf_t *message){
     }
     break;
 
-  case DC_ADM_GET_POLICY:
-    {
-      if(_ref_policy_handler != NULL){
-       _ref_policy_handler(DC_ADM_GET_POLICY,  (const char *) message->payload, message->len, response, false);
-       std::memcpy((char *)message->payload, response.c_str(), response.length());
-       message->len = response.length();
-       message->mtype = DC_ADM_GET_POLICY_ACK;
-       send_msg = true;
-      }
-      else{
-       state = MISSING_HANDLER_ERROR;
-       mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
-      }
-    }
-    break;
     
   default:
     mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
 
   };
 
-  auto end = std::chrono::high_resolution_clock::now();
-  // Ad hoc metric reporting for now ...
-  // every reporting interval, prints to stdou
-  // 1, reporting interval (# of packets)
-  // 2. average  processing time (in micro seconds) across each packet
-  // 3. standard deviation (micro seconds ^2) a
-  // 4. maximum processing time (in micro seconds)
-  
-  if(num_proc_packets == _reporting_interval){
-    auto epoch = std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()).time_since_epoch();
-    double avg_latency =  processing_duration/num_proc_packets;
-    double std_dev_latency = processing_dev/num_proc_packets - avg_latency * avg_latency;
-    std::cout << "Processing Metrics : " << epoch.count() << "," << num_proc_packets << "," << avg_latency << "," << std_dev_latency << "," << max_proc_duration << std::endl;  
-    processing_duration = 0;
-    processing_dev = 0;
-    max_proc_duration = 0;
-    num_proc_packets = 0;
-  }
-
-  double  elapsed = std::chrono::duration<double, std::micro>(end - start).count();
-  if(elapsed > max_proc_duration){
-    max_proc_duration = elapsed;
-  }
-  
-  processing_duration += elapsed;
-  processing_dev += elapsed * elapsed;
-  num_proc_packets ++;
-
-  
   if(send_msg){
     return true;
   }