int verbose_flag = 0;
-message_processor::message_processor(int mode, bool report_mode, size_t buffer_length, size_t reporting_interval): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
+message_processor::message_processor(int mode, bool report_mode, size_t buffer_length): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
processing_level = mode;
report_mode_only = report_mode;
_buffer_size = buffer_length;
scratch_buffer = 0;
scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
assert(scratch_buffer != 0);
- _reporting_interval = reporting_interval;
- num_proc_packets = 0;
- processing_duration = 0;
- processing_dev = 0;
- max_proc_duration = 0;
};
case (RIC_SUB_FAILURE):
case ( RIC_SUB_DEL_FAILURE ):
if (_ref_sub_handler != NULL){
+ // extract meid ..
+ unsigned char meid[32];
+ rmr_get_meid(message, meid);
mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
- _ref_sub_handler->Response(message->mtype, message->payload, message->len);
+ _ref_sub_handler->Response(message->mtype, message->payload, message->len, (const char *)meid);
}
else{
state = MISSING_HANDLER_ERROR;
break;
- case DC_ADM_INT_CONTROL:
+ case A1_POLICY_REQ:
{
if(_ref_policy_handler != NULL){
// Need to apply config. Since config may need to be
// applied across all threads, we do a callback to the parent thread.
// wait for config to be applied and then send response
- _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true);
+ _ref_policy_handler(A1_POLICY_REQ, (const char *) message->payload, message->len, response, true);
std::memcpy( (char *) message->payload, response.c_str(), response.length());
message->len = response.length();
- message->mtype = DC_ADM_INT_CONTROL_ACK;
+ message->mtype = A1_POLICY_RESP;
send_msg = true;
}
else{
}
break;
- case DC_ADM_GET_POLICY:
- {
- if(_ref_policy_handler != NULL){
- _ref_policy_handler(DC_ADM_GET_POLICY, (const char *) message->payload, message->len, response, false);
- std::memcpy((char *)message->payload, response.c_str(), response.length());
- message->len = response.length();
- message->mtype = DC_ADM_GET_POLICY_ACK;
- send_msg = true;
- }
- else{
- state = MISSING_HANDLER_ERROR;
- mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
- }
- }
- break;
default:
mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
};
- auto end = std::chrono::high_resolution_clock::now();
- // Ad hoc metric reporting for now ...
- // every reporting interval, prints to stdou
- // 1, reporting interval (# of packets)
- // 2. average processing time (in micro seconds) across each packet
- // 3. standard deviation (micro seconds ^2) a
- // 4. maximum processing time (in micro seconds)
-
- if(num_proc_packets == _reporting_interval){
- auto epoch = std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()).time_since_epoch();
- double avg_latency = processing_duration/num_proc_packets;
- double std_dev_latency = processing_dev/num_proc_packets - avg_latency * avg_latency;
- std::cout << "Processing Metrics : " << epoch.count() << "," << num_proc_packets << "," << avg_latency << "," << std_dev_latency << "," << max_proc_duration << std::endl;
- processing_duration = 0;
- processing_dev = 0;
- max_proc_duration = 0;
- num_proc_packets = 0;
- }
-
- double elapsed = std::chrono::duration<double, std::micro>(end - start).count();
- if(elapsed > max_proc_duration){
- max_proc_duration = elapsed;
- }
-
- processing_duration += elapsed;
- processing_dev += elapsed * elapsed;
- num_proc_packets ++;
-
-
if(send_msg){
return true;
}