2 ==================================================================================
4 Copyright (c) 2018-2019 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
21 #include <message_processor_class.hpp>
26 message_processor::message_processor(int mode, bool report_mode, size_t buffer_length, size_t reporting_interval): _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27 processing_level = mode;
28 report_mode_only = report_mode;
29 _buffer_size = buffer_length;
31 scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
32 assert(scratch_buffer != 0);
37 void message_processor::register_subscription_handler(subscription_handler * sub){
38 _ref_sub_handler = sub;
42 void message_processor::register_protector(protector * p){
46 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
47 _ref_policy_handler = f1;
50 message_processor::~message_processor(void){
54 // main message processing
55 bool message_processor::operator()(rmr_mbuf_t *message){
58 bool send_msg = false;
60 size_t buf_size = _buffer_size;
66 //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
69 mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
71 // Sanity check on RMR. Max size ?
72 if (message->len > MAX_RMR_RECV_SIZE){
74 mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
79 // main message processing code
80 switch(message->mtype){
84 if (unlikely(_ref_protector == NULL)){
85 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
86 state = MISSING_HANDLER_ERROR;
90 //pfile = fopen(filename.c_str(), "wb");
91 //fwrite(message->payload, sizeof(char), message->len, pfile);
97 rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
100 if(likely(rval.code == RC_OK)){
102 res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
104 state = E2AP_INDICATION_ERROR;
105 num_err_indications ++;
106 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
109 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
110 //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
111 //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
114 num_err_indications ++;
115 state = E2AP_INDICATION_ERROR;
116 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
120 mdclog_write(MDCLOG_DEBUG, "E2AP INDICATION :: Successfully received E2AP Indication with id = %ld, sequence no = %ld, Number of indications = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
123 // do we progress ahead ?
124 if(processing_level == E2AP_PROC_ONLY){
128 //Decode the SM header
129 mdclog_write(MDCLOG_DEBUG, "Decoding e2sm header of size %lu\n", indication_data.indication_header_size);
130 rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
132 if (likely(rval.code == RC_OK)){
133 res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
135 state = E2SM_INDICATION_HEADER_ERROR;
136 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
140 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
141 // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
142 // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
145 state = E2SM_INDICATION_HEADER_ERROR;
146 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
149 mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
152 // do we progress ahead ?
153 if(processing_level == E2SM_PROC_ONLY){
158 // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
159 // Send payload to plugin
161 remaining_buffer = _buffer_size;
162 mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
164 res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
166 state = PLUGIN_ERROR;
170 // Do we respond ? Depends on RIC indication type ...
171 // if RICindicationType == report, then no response
172 // else if RICindicationType == insert, generate control ...
173 if (report_mode_only|| indication_data.indication_type == E2N_RICindicationType::E2N_RICindicationType_report){
174 mdclog_write(MDCLOG_DEBUG, "Indication type is report. Not generating control\n");
178 // we assume for now that just like in E2AP indication, response message is directly put in E2AP control
179 control_data.control_msg = &scratch_buffer[current_index];
180 control_data.control_msg_size = buf_size; // re-use for size
181 current_index += buf_size ;
182 remaining_buffer = _buffer_size - current_index;
183 mdclog_write(MDCLOG_DEBUG, "Encoded X2AP response of size %lu bytes. Remaining buffer = %lu bytes\n", buf_size, remaining_buffer);
185 if (current_index >= _buffer_size){
186 state = BUFFER_ERROR;
187 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
192 // Encode the control header
193 // Control header is same as indication header ( except interface direction ?)
194 header_helper.interface_direction = 0;
195 res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
198 control_data.control_header = &scratch_buffer[current_index];
199 control_data.control_header_size = remaining_buffer;
200 current_index += remaining_buffer ;
201 remaining_buffer = _buffer_size - current_index;
205 state = E2SM_CONTROL_HEADER_ERROR;
206 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
210 if (current_index >= _buffer_size){
211 state = BUFFER_ERROR;
212 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
216 // Generate control message
218 control_data.req_id = indication_data.req_id;
219 control_data.req_seq_no = indication_data.req_seq_no;
220 control_data.func_id = indication_data.func_id;
221 control_data.control_ack = 2; // no ack required
222 res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
227 message->mtype = RIC_CONTROL_REQ;
228 mdclog_write(MDCLOG_DEBUG, "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
231 state = E2AP_CONTROL_ERROR;
232 mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2AP control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
236 // Record id for tracking .... (tbd)
238 ASN_STRUCT_FREE(asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, e2sm_header);
239 ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
244 case (RIC_SUB_DEL_RESP):
245 case (RIC_SUB_FAILURE):
246 case ( RIC_SUB_DEL_FAILURE ):
247 if (_ref_sub_handler != NULL){
248 mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
249 _ref_sub_handler->Response(message->mtype, message->payload, message->len);
252 state = MISSING_HANDLER_ERROR;
253 mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
259 case DC_ADM_INT_CONTROL:
261 if(_ref_policy_handler != NULL){
262 // Need to apply config. Since config may need to be
263 // applied across all threads, we do a callback to the parent thread.
264 // wait for config to be applied and then send response
265 _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true);
266 std::memcpy( (char *) message->payload, response.c_str(), response.length());
267 message->len = response.length();
268 message->mtype = DC_ADM_INT_CONTROL_ACK;
272 state = MISSING_HANDLER_ERROR;
273 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
278 case DC_ADM_GET_POLICY:
280 if(_ref_policy_handler != NULL){
281 _ref_policy_handler(DC_ADM_GET_POLICY, (const char *) message->payload, message->len, response, false);
282 std::memcpy((char *)message->payload, response.c_str(), response.length());
283 message->len = response.length();
284 message->mtype = DC_ADM_GET_POLICY_ACK;
288 state = MISSING_HANDLER_ERROR;
289 mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
295 mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
308 unsigned long const message_processor::get_messages (void){
309 return _num_messages;