b5b421af396086c101f121db75219f91d319ff2f
[ric-app/admin.git] / src / message_processor_class.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20
21 #include <message_processor_class.hpp>
22
23
24 int verbose_flag = 0;
25
26 message_processor::message_processor(int mode, bool report_mode, size_t buffer_length):  _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27   processing_level = mode;
28   report_mode_only = report_mode;
29   _buffer_size = buffer_length;
30   scratch_buffer = 0;
31   scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
32   assert(scratch_buffer != 0);
33
34 };
35
36
37 void message_processor::register_subscription_handler(subscription_handler * sub){
38   _ref_sub_handler = sub;
39 }
40
41
42 void message_processor::register_protector(protector * p){
43   _ref_protector = p;
44 }
45
46 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
47   _ref_policy_handler = f1;
48 }
49
50 message_processor::~message_processor(void){
51   free(scratch_buffer);
52 }
53
54 // main message processing 
55 bool message_processor::operator()(rmr_mbuf_t *message){
56
57   bool res;
58   bool send_msg = false;
59   asn_dec_rval_t rval;
60   size_t buf_size = _buffer_size;
61   size_t mlen;
62   _num_messages ++;
63   std::string response;
64
65
66   state = NO_ERROR;
67   mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
68   
69   // Sanity check on RMR. Max size ?
70   if (message->len > MAX_RMR_RECV_SIZE){
71     state = RMR_ERROR;
72     mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
73     return false;
74   }
75   
76   // routing based on message type 
77   switch(message->mtype){
78     
79   case RIC_INDICATION:
80     
81     if (unlikely(_ref_protector == NULL)){
82       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
83       state = MISSING_HANDLER_ERROR;
84       break;
85     }
86     
87     e2ap_recv_pdu = 0;
88     e2sm_header = 0;
89     
90     rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
91     
92       
93     if(likely(rval.code == RC_OK)){
94       num_indications ++;
95       res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
96       if (unlikely(!res)){
97         state = E2AP_INDICATION_ERROR;
98         num_err_indications ++;
99         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
100         goto finished;
101       }
102
103       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
104       //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
105       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
106
107     }
108     else{
109       num_err_indications ++;
110       state = E2AP_INDICATION_ERROR;
111       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
112       goto finished;
113     }
114     
115     mdclog_write(MDCLOG_DEBUG, "E2AP INDICATION :: Successfully received E2AP Indication  with id = %ld, sequence no = %ld, Number of indications = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
116
117     
118     // do we progress ahead ?
119     if(processing_level == E2AP_PROC_ONLY){
120       goto finished;
121     }
122     
123     //Decode the SM header
124     mdclog_write(MDCLOG_DEBUG, "Decoding e2sm header of size %lu\n", indication_data.indication_header_size);
125     rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
126     
127     if (likely(rval.code == RC_OK)){
128       res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
129       if (unlikely(!res)){
130         state = E2SM_INDICATION_HEADER_ERROR;
131         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
132         goto finished;
133       }
134       
135       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
136       // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
137       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;    
138     }
139     else{
140       state = E2SM_INDICATION_HEADER_ERROR;
141       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
142       goto finished;
143     }
144     mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
145     
146           
147     // do we progress ahead ?
148     if(processing_level == E2SM_PROC_ONLY){
149       goto finished;
150     }
151     
152
153     // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
154     // Send payload to protector plugin
155     current_index = 0;
156     remaining_buffer = _buffer_size;
157     mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
158     
159     res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
160     if(unlikely(!res)){
161       state = PLUGIN_ERROR;
162       goto finished;
163     }
164                             
165     // Do we respond ? Depends on RIC indication type ...
166     // if RICindicationType == report, then no response
167     // else if RICindicationType == insert, generate control ...
168     if (report_mode_only|| indication_data.indication_type == E2N_RICindicationType::E2N_RICindicationType_report){
169       mdclog_write(MDCLOG_DEBUG, "Indication type is report. Not generating control\n");
170       goto finished;
171     }
172
173     // we assume for now that just like in E2AP indication, response message is directly put in E2AP control 
174     control_data.control_msg = &scratch_buffer[current_index];
175     control_data.control_msg_size = buf_size; // re-use for size
176     current_index += buf_size ;
177     remaining_buffer = _buffer_size - current_index;
178     mdclog_write(MDCLOG_DEBUG, "Encoded X2AP response of size %lu bytes. Remaining buffer = %lu bytes\n", buf_size, remaining_buffer);
179     
180     if (current_index >= _buffer_size){
181       state = BUFFER_ERROR;
182       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
183       goto finished;
184     }
185
186       
187     // Encode the control header
188     // Control header is same as indication header ( except interface direction ?)
189     header_helper.interface_direction = 0;
190     res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
191     
192     if (likely(res)){
193       control_data.control_header = &scratch_buffer[current_index];
194       control_data.control_header_size = remaining_buffer;      
195       current_index += remaining_buffer ;
196       remaining_buffer = _buffer_size - current_index;
197       
198     }
199     else{
200       state = E2SM_CONTROL_HEADER_ERROR;
201       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
202       goto finished;
203     }
204     
205     if (current_index >= _buffer_size){
206       state = BUFFER_ERROR;
207       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
208       goto finished;
209     }
210     
211     // Generate control message
212     mlen = _buffer_size;
213     control_data.req_id = indication_data.req_id;
214     control_data.req_seq_no = indication_data.req_seq_no;
215     control_data.func_id = indication_data.func_id;
216     control_data.control_ack = 1; // no ack required
217     control_data.call_process_id = indication_data.call_process_id;
218     control_data.call_process_id_size = indication_data.call_process_id_size;
219     res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
220     
221     if(likely(res)){
222       send_msg = true;
223       message->len = mlen;
224       message->mtype = RIC_CONTROL_REQ;
225       mdclog_write(MDCLOG_DEBUG,  "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
226     }
227     else{
228       state = E2AP_CONTROL_ERROR;
229       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2AP control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
230       goto finished;
231     }
232     
233     // Record id for tracking .... (tbd)
234   finished:
235     ASN_STRUCT_FREE(asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, e2sm_header);
236     ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
237     
238     break;
239     
240   case (RIC_SUB_RESP):
241   case (RIC_SUB_DEL_RESP):
242   case (RIC_SUB_FAILURE):
243   case ( RIC_SUB_DEL_FAILURE ):
244     if (_ref_sub_handler != NULL){
245       // extract meid ..
246       unsigned char meid[32];
247       rmr_get_meid(message, meid);
248       mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
249       _ref_sub_handler->Response(message->mtype, message->payload, message->len, (const char *)meid);
250     }
251     else{
252       state = MISSING_HANDLER_ERROR;
253       mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
254     }
255     
256     break;
257     
258     
259   case A1_POLICY_REQ:
260     {
261       if(_ref_policy_handler != NULL){
262         // Need to apply config. Since config may need to be
263         // applied across all threads, we do a callback to the parent thread.
264         // wait for config to be applied and then send response
265         _ref_policy_handler(A1_POLICY_REQ, (const char *) message->payload, message->len, response, true);
266         std::memcpy( (char *) message->payload, response.c_str(),  response.length());
267         message->len = response.length();
268         message->mtype = A1_POLICY_RESP;
269         send_msg = true;
270       }
271       else{
272         state = MISSING_HANDLER_ERROR;
273         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
274       }
275     }
276     break;
277
278     
279   default:
280     mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
281
282   };
283
284   if(send_msg){
285     return true;
286   }
287   else{
288     return false;
289   }
290 };
291
292    
293 unsigned long const message_processor::get_messages (void){
294     return _num_messages;
295 };
296