2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
21 #include <message_processor_class.hpp>
26 message_processor::message_processor(int mode, bool report_mode, size_t buffer_length, size_t reporting_interval): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27 processing_level = mode;
28 report_mode_only = report_mode;
29 _buffer_size = buffer_length;
31 scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
32 assert(scratch_buffer != 0);
33 _reporting_interval = reporting_interval;
35 processing_duration = 0;
37 max_proc_duration = 0;
42 void message_processor::register_subscription_handler(subscription_handler * sub){
43 _ref_sub_handler = sub;
47 void message_processor::register_protector(protector * p){
51 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
52 _ref_policy_handler = f1;
55 message_processor::~message_processor(void){
59 // main message processing
60 bool message_processor::operator()(rmr_mbuf_t *message){
63 bool send_msg = false;
65 size_t buf_size = _buffer_size;
71 //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
74 mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
76 // Sanity check on RMR. Max size ?
77 if (message->len > MAX_RMR_RECV_SIZE){
79 mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
84 auto start = std::chrono::high_resolution_clock::now();
86 // main message processing code
87 switch(message->mtype){
91 if (unlikely(_ref_protector == NULL)){
92 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
93 state = MISSING_HANDLER_ERROR;
97 //pfile = fopen(filename.c_str(), "wb");
98 //fwrite(message->payload, sizeof(char), message->len, pfile);
104 rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
107 if(likely(rval.code == RC_OK)){
109 res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
111 state = E2AP_INDICATION_ERROR;
112 num_err_indications ++;
113 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
116 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
117 //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
118 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
121 num_err_indications ++;
122 state = E2AP_INDICATION_ERROR;
123 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
127 mdclog_write(MDCLOG_DEBUG, "E2AP INDICATION :: Successfully received E2AP Indication with id = %ld, sequence no = %ld, Number of indications = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
130 // do we progress ahead ?
131 if(processing_level == E2AP_PROC_ONLY){
135 //Decode the SM header
136 mdclog_write(MDCLOG_DEBUG, "Decoding e2sm header of size %lu\n", indication_data.indication_header_size);
137 rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
139 if (likely(rval.code == RC_OK)){
140 res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
142 state = E2SM_INDICATION_HEADER_ERROR;
143 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
147 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
148 // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
149 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
152 state = E2SM_INDICATION_HEADER_ERROR;
153 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
156 mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
159 // do we progress ahead ?
160 if(processing_level == E2SM_PROC_ONLY){
165 // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
166 // Send payload to plugin
168 remaining_buffer = _buffer_size;
169 mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
171 res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
173 state = PLUGIN_ERROR;
177 // Do we respond ? Depends on RIC indication type ...
178 // if RICindicationType == report, then no response
179 // else if RICindicationType == insert, generate control ...
180 if (report_mode_only|| indication_data.indication_type == E2N_RICindicationType::E2N_RICindicationType_report){
181 mdclog_write(MDCLOG_DEBUG, "Indication type is report. Not generating control\n");
185 // we assume for now that just like in E2AP indication, response message is directly put in E2AP control
186 control_data.control_msg = &scratch_buffer[current_index];
187 control_data.control_msg_size = buf_size; // re-use for size
188 current_index += buf_size ;
189 remaining_buffer = _buffer_size - current_index;
190 mdclog_write(MDCLOG_DEBUG, "Encoded X2AP response of size %lu bytes. Remaining buffer = %lu bytes\n", buf_size, remaining_buffer);
192 if (current_index >= _buffer_size){
193 state = BUFFER_ERROR;
194 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
199 // Encode the control header
200 // Control header is same as indication header ( except interface direction ?)
201 header_helper.interface_direction = 0;
202 res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
205 control_data.control_header = &scratch_buffer[current_index];
206 control_data.control_header_size = remaining_buffer;
207 current_index += remaining_buffer ;
208 remaining_buffer = _buffer_size - current_index;
212 state = E2SM_CONTROL_HEADER_ERROR;
213 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
217 if (current_index >= _buffer_size){
218 state = BUFFER_ERROR;
219 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
223 // Generate control message
225 control_data.req_id = indication_data.req_id;
226 control_data.req_seq_no = indication_data.req_seq_no;
227 control_data.func_id = indication_data.func_id;
228 control_data.control_ack = 2; // no ack required
229 res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
234 message->mtype = RIC_CONTROL_REQ;
235 mdclog_write(MDCLOG_DEBUG, "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
238 state = E2AP_CONTROL_ERROR;
239 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2AP control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
243 // Record id for tracking .... (tbd)
245 ASN_STRUCT_FREE(asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, e2sm_header);
246 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
251 case (RIC_SUB_DEL_RESP):
252 case (RIC_SUB_FAILURE):
253 case ( RIC_SUB_DEL_FAILURE ):
254 if (_ref_sub_handler != NULL){
255 mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
256 _ref_sub_handler->Response(message->mtype, message->payload, message->len);
259 state = MISSING_HANDLER_ERROR;
260 mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
266 case DC_ADM_INT_CONTROL:
268 if(_ref_policy_handler != NULL){
269 // Need to apply config. Since config may need to be
270 // applied across all threads, we do a callback to the parent thread.
271 // wait for config to be applied and then send response
272 _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true);
273 std::memcpy( (char *) message->payload, response.c_str(), response.length());
274 message->len = response.length();
275 message->mtype = DC_ADM_INT_CONTROL_ACK;
279 state = MISSING_HANDLER_ERROR;
280 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
285 case DC_ADM_GET_POLICY:
287 if(_ref_policy_handler != NULL){
288 _ref_policy_handler(DC_ADM_GET_POLICY, (const char *) message->payload, message->len, response, false);
289 std::memcpy((char *)message->payload, response.c_str(), response.length());
290 message->len = response.length();
291 message->mtype = DC_ADM_GET_POLICY_ACK;
295 state = MISSING_HANDLER_ERROR;
296 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
302 mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
306 auto end = std::chrono::high_resolution_clock::now();
307 // Ad hoc metric reporting for now ...
308 // every reporting interval, prints to stdou
309 // 1, reporting interval (# of packets)
310 // 2. average processing time (in micro seconds) across each packet
311 // 3. standard deviation (micro seconds ^2) a
312 // 4. maximum processing time (in micro seconds)
314 if(num_proc_packets == _reporting_interval){
315 auto epoch = std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()).time_since_epoch();
316 double avg_latency = processing_duration/num_proc_packets;
317 double std_dev_latency = processing_dev/num_proc_packets - avg_latency * avg_latency;
318 std::cout << "Processing Metrics : " << epoch.count() << "," << num_proc_packets << "," << avg_latency << "," << std_dev_latency << "," << max_proc_duration << std::endl;
319 processing_duration = 0;
321 max_proc_duration = 0;
322 num_proc_packets = 0;
325 double elapsed = std::chrono::duration<double, std::micro>(end - start).count();
326 if(elapsed > max_proc_duration){
327 max_proc_duration = elapsed;
330 processing_duration += elapsed;
331 processing_dev += elapsed * elapsed;
344 unsigned long const message_processor::get_messages (void){
345 return _num_messages;