X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fmessage_processor_class.cc;fp=src%2Fmessage_processor_class.cc;h=bc017ad9bf85790e493f0afbfc02d0579d2535f8;hb=b9d7e9c232a4371ddfed51c58e5a57f87b057229;hp=0000000000000000000000000000000000000000;hpb=59f84608ec15c016958a6e0e0ddd813f376c0925;p=ric-app%2Fadmin.git diff --git a/src/message_processor_class.cc b/src/message_processor_class.cc new file mode 100644 index 0000000..bc017ad --- /dev/null +++ b/src/message_processor_class.cc @@ -0,0 +1,269 @@ +/* +================================================================================== + + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + + +#include + + +int verbose_flag = 0; + +message_processor::message_processor(): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){ + scratch_buffer = 0; + scratch_buffer = (unsigned char *)calloc(BUFFER_LENGTH, sizeof(unsigned char)); + assert(scratch_buffer != 0); + remaining_buffer = BUFFER_LENGTH; +}; + + +void message_processor::register_subscription_handler(SubscriptionHandler * sub){ + _ref_sub_handler = sub; +} + + +void message_processor::register_protector(protector * p){ + _ref_protector = p; +} + +void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){ + _ref_policy_handler = f1; +} + +message_processor::~message_processor(void){ + free(scratch_buffer); +} + +// main message processing +bool message_processor::operator()(rmr_mbuf_t *message){ + + bool res; + int message_type, procedure_code; + bool send_msg = false; + asn_dec_rval_t rval; + size_t buf_size = BUFFER_LENGTH; + size_t mlen; + _num_messages ++; + std::string response; + //FILE *pfile; + //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per"; + + // main message processing code + switch(message->mtype){ + + case RIC_INDICATION: + + if (unlikely(_ref_protector == NULL)){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__); + break; + } + + //pfile = fopen(filename.c_str(), "wb"); + //fwrite(message->payload, sizeof(char), message->len, pfile); + //fclose(pfile); + + e2ap_recv_pdu = 0; + e2sm_header = 0; + + rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len); + + if(likely(rval.code == RC_OK)){ + num_indications ++; + res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data); + if (unlikely(!res)){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__); + goto finished; + } + //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl; + //xer_fprint(stdout, &asn_DEF_E2AP_PDU, e2ap_recv_pdu); + //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl; + } + else{ + num_err_indications ++; + mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__); + goto finished; + } + + mdclog_write(MDCLOG_INFO, "E2AP INDICATION :: Successfully received E2AP Indication message with id = %d, sequence no = %d, Number of indication messages = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications); + + //Decode the SM header + rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size); + if (likely(rval.code == RC_OK)){ + res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper); + if (unlikely(!res)){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__); + goto finished; + } + + // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl; + // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header); + // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl; + } + else{ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__); + goto finished; + } + mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size); + + + // NOTE : We assume RICindicationMessage contains payload (not E2SM message) + // Send payload to plugin + current_index = 0; + remaining_buffer = BUFFER_LENGTH; + + res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size); + if(unlikely(!res)){ + goto finished; + } + + // Do we respond ? Depends on RIC indication type ... + // if RICindicationType == report, then no response + // else if RICindicationType == insert, generate control ... + if (report_mode_only|| indication_data.indication_type == RICindicationType::RICindicationType_report){ + mdclog_write(MDCLOG_INFO, "Indication type is report. Not generating control\n"); + goto finished; + } + + message_helper.x2ap_pdu = &scratch_buffer[current_index]; + message_helper.x2ap_pdu_size = buf_size; // re-use for size + current_index +=message_helper.x2ap_pdu_size ; + remaining_buffer = BUFFER_LENGTH - current_index; + + if (current_index >= BUFFER_LENGTH){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %d too small to encode further objects \n", __FILE__, __LINE__, BUFFER_LENGTH); + goto finished; + } + + + // Encode the control header + // Control header is same as indication header ( except interface direction ?) + header_helper.interface_direction = 0; + res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper); + if (likely(res)){ + control_data.control_header = &scratch_buffer[current_index]; + control_data.control_header_size = remaining_buffer; + current_index += remaining_buffer ; + remaining_buffer = BUFFER_LENGTH - current_index; + + } + else{ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str()); + goto finished; + } + + if (current_index >= BUFFER_LENGTH){ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %d too small to encode further objects \n", __FILE__, __LINE__, BUFFER_LENGTH); + goto finished; + } + + // Encode the control message + res = e2sm_control_processor.encode_control_message(&scratch_buffer[current_index], &remaining_buffer, message_helper); + if (likely(res)){ + control_data.control_msg = &scratch_buffer[current_index]; + control_data.control_msg_size = remaining_buffer; + + current_index += remaining_buffer ; + remaining_buffer = BUFFER_LENGTH - current_index; + } + else{ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control message. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str()); + goto finished; + } + + // Generate control message + mlen = BUFFER_LENGTH; + control_data.req_id = indication_data.req_id; + control_data.req_seq_no = indication_data.req_seq_no; + control_data.func_id = indication_data.func_id; + control_data.control_ack = 2; // no ack required + + res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data); + + if(likely(res)){ + send_msg = true; + message->len = mlen; + message->mtype = RIC_CONTROL_REQ; + mdclog_write(MDCLOG_INFO, "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n"); + } + else{ + mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str()); + goto finished; + } + // Record id for tracking .... (tbd) + finished: + ASN_STRUCT_FREE(asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header); + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv_pdu); + + if (send_msg){ + return true; + } + + break; + + case (RIC_SUB_RESP): + case (RIC_SUB_DEL_RESP): + case (RIC_SUB_FAILURE): + case ( RIC_SUB_DEL_FAILURE ): + if (_ref_sub_handler != NULL){ + mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype); + _ref_sub_handler->Response(message->mtype, message->payload, message->len); + } + else{ + mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__); + return false; + } + + break; + + + case DC_ADM_INT_CONTROL: + { + + // Need to apply config. Since config may need to be + // applied across all threads, we do a callback to the parent thread. + // wait for config to be applied and then send response + _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true); + std::memcpy( (char *) message->payload, response.c_str(), response.length()); + message->len = response.length(); + message->mtype = DC_ADM_INT_CONTROL_ACK; + return true; + + } + break; + + case DC_ADM_GET_POLICY: + { + _ref_policy_handler(DC_ADM_GET_POLICY, (const char *) message->payload, message->len, response, false); + std::memcpy((char *)message->payload, response.c_str(), response.length()); + message->len = response.length(); + message->mtype = DC_ADM_GET_POLICY_ACK; + return true; + } + break; + + default: + mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype); + }; + + return false; +}; + + +unsigned long const message_processor::get_messages (void){ + return _num_messages; +}; +