2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
21 #include <message_processor_class.hpp>
26 message_processor::message_processor(): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
28 scratch_buffer = (unsigned char *)calloc(BUFFER_LENGTH, sizeof(unsigned char));
29 assert(scratch_buffer != 0);
30 remaining_buffer = BUFFER_LENGTH;
34 void message_processor::register_subscription_handler(SubscriptionHandler * sub){
35 _ref_sub_handler = sub;
39 void message_processor::register_protector(protector * p){
43 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
44 _ref_policy_handler = f1;
47 message_processor::~message_processor(void){
51 // main message processing
52 bool message_processor::operator()(rmr_mbuf_t *message){
55 int message_type, procedure_code;
56 bool send_msg = false;
58 size_t buf_size = BUFFER_LENGTH;
63 //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
65 // main message processing code
66 switch(message->mtype){
70 if (unlikely(_ref_protector == NULL)){
71 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
75 //pfile = fopen(filename.c_str(), "wb");
76 //fwrite(message->payload, sizeof(char), message->len, pfile);
82 rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
84 if(likely(rval.code == RC_OK)){
86 res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
88 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
91 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
92 //xer_fprint(stdout, &asn_DEF_E2AP_PDU, e2ap_recv_pdu);
93 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
96 num_err_indications ++;
97 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
101 mdclog_write(MDCLOG_INFO, "E2AP INDICATION :: Successfully received E2AP Indication message with id = %d, sequence no = %d, Number of indication messages = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
103 //Decode the SM header
104 rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
105 if (likely(rval.code == RC_OK)){
106 res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
108 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
112 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
113 // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
114 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
117 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
120 mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
123 // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
124 // Send payload to plugin
126 remaining_buffer = BUFFER_LENGTH;
128 res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
133 // Do we respond ? Depends on RIC indication type ...
134 // if RICindicationType == report, then no response
135 // else if RICindicationType == insert, generate control ...
136 if (report_mode_only|| indication_data.indication_type == RICindicationType::RICindicationType_report){
137 mdclog_write(MDCLOG_INFO, "Indication type is report. Not generating control\n");
141 message_helper.x2ap_pdu = &scratch_buffer[current_index];
142 message_helper.x2ap_pdu_size = buf_size; // re-use for size
143 current_index +=message_helper.x2ap_pdu_size ;
144 remaining_buffer = BUFFER_LENGTH - current_index;
146 if (current_index >= BUFFER_LENGTH){
147 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %d too small to encode further objects \n", __FILE__, __LINE__, BUFFER_LENGTH);
152 // Encode the control header
153 // Control header is same as indication header ( except interface direction ?)
154 header_helper.interface_direction = 0;
155 res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
157 control_data.control_header = &scratch_buffer[current_index];
158 control_data.control_header_size = remaining_buffer;
159 current_index += remaining_buffer ;
160 remaining_buffer = BUFFER_LENGTH - current_index;
164 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
168 if (current_index >= BUFFER_LENGTH){
169 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %d too small to encode further objects \n", __FILE__, __LINE__, BUFFER_LENGTH);
173 // Encode the control message
174 res = e2sm_control_processor.encode_control_message(&scratch_buffer[current_index], &remaining_buffer, message_helper);
176 control_data.control_msg = &scratch_buffer[current_index];
177 control_data.control_msg_size = remaining_buffer;
179 current_index += remaining_buffer ;
180 remaining_buffer = BUFFER_LENGTH - current_index;
183 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control message. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
187 // Generate control message
188 mlen = BUFFER_LENGTH;
189 control_data.req_id = indication_data.req_id;
190 control_data.req_seq_no = indication_data.req_seq_no;
191 control_data.func_id = indication_data.func_id;
192 control_data.control_ack = 2; // no ack required
194 res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
199 message->mtype = RIC_CONTROL_REQ;
200 mdclog_write(MDCLOG_INFO, "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
203 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
206 // Record id for tracking .... (tbd)
208 ASN_STRUCT_FREE(asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
209 ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv_pdu);
218 case (RIC_SUB_DEL_RESP):
219 case (RIC_SUB_FAILURE):
220 case ( RIC_SUB_DEL_FAILURE ):
221 if (_ref_sub_handler != NULL){
222 mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
223 _ref_sub_handler->Response(message->mtype, message->payload, message->len);
226 mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
233 case DC_ADM_INT_CONTROL:
236 // Need to apply config. Since config may need to be
237 // applied across all threads, we do a callback to the parent thread.
238 // wait for config to be applied and then send response
239 _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true);
240 std::memcpy( (char *) message->payload, response.c_str(), response.length());
241 message->len = response.length();
242 message->mtype = DC_ADM_INT_CONTROL_ACK;
248 case DC_ADM_GET_POLICY:
250 _ref_policy_handler(DC_ADM_GET_POLICY, (const char *) message->payload, message->len, response, false);
251 std::memcpy((char *)message->payload, response.c_str(), response.length());
252 message->len = response.length();
253 message->mtype = DC_ADM_GET_POLICY_ACK;
259 mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
266 unsigned long const message_processor::get_messages (void){
267 return _num_messages;