2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
21 #include <message_processor_class.hpp>
26 message_processor::message_processor(int mode, bool report_mode, size_t buffer_length): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27 processing_level = mode;
28 report_mode_only = report_mode;
29 _buffer_size = buffer_length;
31 scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
32 assert(scratch_buffer != 0);
37 void message_processor::register_subscription_handler(subscription_handler * sub){
38 _ref_sub_handler = sub;
42 void message_processor::register_protector(protector * p){
46 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
47 _ref_policy_handler = f1;
50 message_processor::~message_processor(void){
54 // main message processing
55 bool message_processor::operator()(rmr_mbuf_t *message){
58 bool send_msg = false;
60 size_t buf_size = _buffer_size;
66 //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
69 mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
71 // Sanity check on RMR. Max size ?
72 if (message->len > MAX_RMR_RECV_SIZE){
74 mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
79 auto start = std::chrono::high_resolution_clock::now();
81 // main message processing code
82 switch(message->mtype){
86 if (unlikely(_ref_protector == NULL)){
87 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
88 state = MISSING_HANDLER_ERROR;
92 //pfile = fopen(filename.c_str(), "wb");
93 //fwrite(message->payload, sizeof(char), message->len, pfile);
99 rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
102 if(likely(rval.code == RC_OK)){
104 res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
106 state = E2AP_INDICATION_ERROR;
107 num_err_indications ++;
108 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
111 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
112 //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
113 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
116 num_err_indications ++;
117 state = E2AP_INDICATION_ERROR;
118 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
122 mdclog_write(MDCLOG_DEBUG, "E2AP INDICATION :: Successfully received E2AP Indication with id = %ld, sequence no = %ld, Number of indications = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
125 // do we progress ahead ?
126 if(processing_level == E2AP_PROC_ONLY){
130 //Decode the SM header
131 mdclog_write(MDCLOG_DEBUG, "Decoding e2sm header of size %lu\n", indication_data.indication_header_size);
132 rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
134 if (likely(rval.code == RC_OK)){
135 res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
137 state = E2SM_INDICATION_HEADER_ERROR;
138 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
142 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
143 // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
144 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
147 state = E2SM_INDICATION_HEADER_ERROR;
148 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
151 mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
154 // do we progress ahead ?
155 if(processing_level == E2SM_PROC_ONLY){
160 // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
161 // Send payload to plugin
163 remaining_buffer = _buffer_size;
164 mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
166 res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
168 state = PLUGIN_ERROR;
172 // Do we respond ? Depends on RIC indication type ...
173 // if RICindicationType == report, then no response
174 // else if RICindicationType == insert, generate control ...
175 if (report_mode_only|| indication_data.indication_type == E2N_RICindicationType::E2N_RICindicationType_report){
176 mdclog_write(MDCLOG_DEBUG, "Indication type is report. Not generating control\n");
180 // we assume for now that just like in E2AP indication, response message is directly put in E2AP control
181 control_data.control_msg = &scratch_buffer[current_index];
182 control_data.control_msg_size = buf_size; // re-use for size
183 current_index += buf_size ;
184 remaining_buffer = _buffer_size - current_index;
185 mdclog_write(MDCLOG_DEBUG, "Encoded X2AP response of size %lu bytes. Remaining buffer = %lu bytes\n", buf_size, remaining_buffer);
187 if (current_index >= _buffer_size){
188 state = BUFFER_ERROR;
189 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
194 // Encode the control header
195 // Control header is same as indication header ( except interface direction ?)
196 header_helper.interface_direction = 0;
197 res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
200 control_data.control_header = &scratch_buffer[current_index];
201 control_data.control_header_size = remaining_buffer;
202 current_index += remaining_buffer ;
203 remaining_buffer = _buffer_size - current_index;
207 state = E2SM_CONTROL_HEADER_ERROR;
208 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
212 if (current_index >= _buffer_size){
213 state = BUFFER_ERROR;
214 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
218 // Generate control message
220 control_data.req_id = indication_data.req_id;
221 control_data.req_seq_no = indication_data.req_seq_no;
222 control_data.func_id = indication_data.func_id;
223 control_data.control_ack = 2; // no ack required
224 res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
229 message->mtype = RIC_CONTROL_REQ;
230 mdclog_write(MDCLOG_DEBUG, "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
233 state = E2AP_CONTROL_ERROR;
234 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2AP control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
238 // Record id for tracking .... (tbd)
240 ASN_STRUCT_FREE(asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, e2sm_header);
241 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
246 case (RIC_SUB_DEL_RESP):
247 case (RIC_SUB_FAILURE):
248 case ( RIC_SUB_DEL_FAILURE ):
249 if (_ref_sub_handler != NULL){
251 unsigned char meid[32];
252 rmr_get_meid(message, meid);
253 mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
254 _ref_sub_handler->Response(message->mtype, message->payload, message->len, (const char *)meid);
257 state = MISSING_HANDLER_ERROR;
258 mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
266 if(_ref_policy_handler != NULL){
267 // Need to apply config. Since config may need to be
268 // applied across all threads, we do a callback to the parent thread.
269 // wait for config to be applied and then send response
270 _ref_policy_handler(A1_POLICY_REQ, (const char *) message->payload, message->len, response, true);
271 std::memcpy( (char *) message->payload, response.c_str(), response.length());
272 message->len = response.length();
273 message->mtype = A1_POLICY_RESP;
277 state = MISSING_HANDLER_ERROR;
278 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
285 mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
298 unsigned long const message_processor::get_messages (void){
299 return _num_messages;