2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
21 #include <message_processor_class.hpp>
26 message_processor::message_processor(int mode, bool report_mode, size_t buffer_length): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27 processing_level = mode;
28 report_mode_only = report_mode;
29 _buffer_size = buffer_length;
31 scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
32 assert(scratch_buffer != 0);
37 void message_processor::register_subscription_handler(subscription_handler * sub){
38 _ref_sub_handler = sub;
42 void message_processor::register_protector(protector * p){
46 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
47 _ref_policy_handler = f1;
50 message_processor::~message_processor(void){
54 // main message processing
55 bool message_processor::operator()(rmr_mbuf_t *message){
58 bool send_msg = false;
60 size_t buf_size = _buffer_size;
67 mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
69 // Sanity check on RMR. Max size ?
70 if (message->len > MAX_RMR_RECV_SIZE){
72 mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
76 // routing based on message type
77 switch(message->mtype){
81 if (unlikely(_ref_protector == NULL)){
82 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
83 state = MISSING_HANDLER_ERROR;
90 rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
93 if(likely(rval.code == RC_OK)){
95 res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
97 state = E2AP_INDICATION_ERROR;
98 num_err_indications ++;
99 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
103 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
104 //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
105 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
109 num_err_indications ++;
110 state = E2AP_INDICATION_ERROR;
111 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
115 mdclog_write(MDCLOG_DEBUG, "E2AP INDICATION :: Successfully received E2AP Indication with id = %ld, sequence no = %ld, Number of indications = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
118 // do we progress ahead ?
119 if(processing_level == E2AP_PROC_ONLY){
123 //Decode the SM header
124 mdclog_write(MDCLOG_DEBUG, "Decoding e2sm header of size %lu\n", indication_data.indication_header_size);
125 rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
127 if (likely(rval.code == RC_OK)){
128 res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
130 state = E2SM_INDICATION_HEADER_ERROR;
131 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
135 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
136 // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
137 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
140 state = E2SM_INDICATION_HEADER_ERROR;
141 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
144 mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
147 // do we progress ahead ?
148 if(processing_level == E2SM_PROC_ONLY){
153 // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
154 // Send payload to protector plugin
156 remaining_buffer = _buffer_size;
157 mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
159 res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
161 state = PLUGIN_ERROR;
165 // Do we respond ? Depends on RIC indication type ...
166 // if RICindicationType == report, then no response
167 // else if RICindicationType == insert, generate control ...
168 if (report_mode_only|| indication_data.indication_type == E2N_RICindicationType::E2N_RICindicationType_report){
169 mdclog_write(MDCLOG_DEBUG, "Indication type is report. Not generating control\n");
173 // we assume for now that just like in E2AP indication, response message is directly put in E2AP control
174 control_data.control_msg = &scratch_buffer[current_index];
175 control_data.control_msg_size = buf_size; // re-use for size
176 current_index += buf_size ;
177 remaining_buffer = _buffer_size - current_index;
178 mdclog_write(MDCLOG_DEBUG, "Encoded X2AP response of size %lu bytes. Remaining buffer = %lu bytes\n", buf_size, remaining_buffer);
180 if (current_index >= _buffer_size){
181 state = BUFFER_ERROR;
182 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
187 // Encode the control header
188 // Control header is same as indication header ( except interface direction ?)
189 header_helper.interface_direction = 0;
190 res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
193 control_data.control_header = &scratch_buffer[current_index];
194 control_data.control_header_size = remaining_buffer;
195 current_index += remaining_buffer ;
196 remaining_buffer = _buffer_size - current_index;
200 state = E2SM_CONTROL_HEADER_ERROR;
201 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
205 if (current_index >= _buffer_size){
206 state = BUFFER_ERROR;
207 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
211 // Generate control message
213 control_data.req_id = indication_data.req_id;
214 control_data.req_seq_no = indication_data.req_seq_no;
215 control_data.func_id = indication_data.func_id;
216 //control_data.control_ack = 0; // no ack required
217 control_data.call_process_id = indication_data.call_process_id;
218 control_data.call_process_id_size = indication_data.call_process_id_size;
219 res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
224 message->mtype = RIC_CONTROL_REQ;
225 mdclog_write(MDCLOG_DEBUG, "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
228 state = E2AP_CONTROL_ERROR;
229 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2AP control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
233 // Record id for tracking .... (tbd)
235 ASN_STRUCT_FREE(asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, e2sm_header);
236 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
241 case (RIC_SUB_DEL_RESP):
242 case (RIC_SUB_FAILURE):
243 case ( RIC_SUB_DEL_FAILURE ):
244 if (_ref_sub_handler != NULL){
246 unsigned char meid[32];
247 rmr_get_meid(message, meid);
248 mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
249 _ref_sub_handler->Response(message->mtype, message->payload, message->len, (const char *)meid);
252 state = MISSING_HANDLER_ERROR;
253 mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
261 if(_ref_policy_handler != NULL){
262 // Need to apply config. Since config may need to be
263 // applied across all threads, we do a callback to the parent thread.
264 // wait for config to be applied and then send response
265 _ref_policy_handler(A1_POLICY_REQ, (const char *) message->payload, message->len, response, true);
266 std::memcpy( (char *) message->payload, response.c_str(), response.length());
267 message->len = response.length();
268 message->mtype = A1_POLICY_RESP;
272 state = MISSING_HANDLER_ERROR;
273 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
280 mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
293 unsigned long const message_processor::get_messages (void){
294 return _num_messages;