a5a96851d070ed64c03e4446f012bdd128911fdd
[ric-app/admin.git] / src / message_processor_class.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20
21 #include <message_processor_class.hpp>
22
23
24 int verbose_flag = 0;
25
26 message_processor::message_processor(int mode, bool report_mode, size_t buffer_length, size_t reporting_interval):  _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27   processing_level = mode;
28   report_mode_only = report_mode;
29   _buffer_size = buffer_length;
30   scratch_buffer = 0;
31   scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
32   assert(scratch_buffer != 0);
33
34 };
35
36
37 void message_processor::register_subscription_handler(subscription_handler * sub){
38   _ref_sub_handler = sub;
39 }
40
41
42 void message_processor::register_protector(protector * p){
43   _ref_protector = p;
44 }
45
46 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
47   _ref_policy_handler = f1;
48 }
49
50 message_processor::~message_processor(void){
51   free(scratch_buffer);
52 }
53
54 // main message processing 
55 bool message_processor::operator()(rmr_mbuf_t *message){
56
57   bool res;
58   bool send_msg = false;
59   asn_dec_rval_t rval;
60   size_t buf_size = _buffer_size;
61   size_t mlen;
62   _num_messages ++;
63   std::string response;
64
65   //FILE *pfile;
66   //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
67
68   state = NO_ERROR;
69   mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
70   
71   // Sanity check on RMR. Max size ?
72   if (message->len > MAX_RMR_RECV_SIZE){
73     state = RMR_ERROR;
74     mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
75     return false;
76   }
77   
78   
79   // main message processing code
80   switch(message->mtype){
81     
82   case RIC_INDICATION:
83     
84     if (unlikely(_ref_protector == NULL)){
85       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
86       state = MISSING_HANDLER_ERROR;
87       break;
88     }
89     
90     //pfile = fopen(filename.c_str(), "wb");
91     //fwrite(message->payload, sizeof(char), message->len, pfile);
92     //fclose(pfile);
93
94     e2ap_recv_pdu = 0;
95     e2sm_header = 0;
96     
97     rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
98     
99       
100     if(likely(rval.code == RC_OK)){
101       num_indications ++;
102       res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
103       if (unlikely(!res)){
104         state = E2AP_INDICATION_ERROR;
105         num_err_indications ++;
106         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
107         goto finished;
108       }
109       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
110       //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
111       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
112     }
113     else{
114       num_err_indications ++;
115       state = E2AP_INDICATION_ERROR;
116       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
117       goto finished;
118     }
119     
120     mdclog_write(MDCLOG_DEBUG, "E2AP INDICATION :: Successfully received E2AP Indication  with id = %ld, sequence no = %ld, Number of indications = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
121
122     
123     // do we progress ahead ?
124     if(processing_level == E2AP_PROC_ONLY){
125       goto finished;
126     }
127     
128     //Decode the SM header
129     mdclog_write(MDCLOG_DEBUG, "Decoding e2sm header of size %lu\n", indication_data.indication_header_size);
130     rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
131     
132     if (likely(rval.code == RC_OK)){
133       res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
134       if (unlikely(!res)){
135         state = E2SM_INDICATION_HEADER_ERROR;
136         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
137         goto finished;
138       }
139       
140       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
141       // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
142       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;    
143     }
144     else{
145       state = E2SM_INDICATION_HEADER_ERROR;
146       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
147       goto finished;
148     }
149     mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
150     
151           
152     // do we progress ahead ?
153     if(processing_level == E2SM_PROC_ONLY){
154       goto finished;
155     }
156     
157
158     // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
159     // Send payload to plugin
160     current_index = 0;
161     remaining_buffer = _buffer_size;
162     mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
163     
164     res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
165     if(unlikely(!res)){
166       state = PLUGIN_ERROR;
167       goto finished;
168     }
169                             
170     // Do we respond ? Depends on RIC indication type ...
171     // if RICindicationType == report, then no response
172     // else if RICindicationType == insert, generate control ...
173     if (report_mode_only|| indication_data.indication_type == E2N_RICindicationType::E2N_RICindicationType_report){
174       mdclog_write(MDCLOG_DEBUG, "Indication type is report. Not generating control\n");
175       goto finished;
176     }
177
178     // we assume for now that just like in E2AP indication, response message is directly put in E2AP control 
179     control_data.control_msg = &scratch_buffer[current_index];
180     control_data.control_msg_size = buf_size; // re-use for size
181     current_index += buf_size ;
182     remaining_buffer = _buffer_size - current_index;
183     mdclog_write(MDCLOG_DEBUG, "Encoded X2AP response of size %lu bytes. Remaining buffer = %lu bytes\n", buf_size, remaining_buffer);
184     
185     if (current_index >= _buffer_size){
186       state = BUFFER_ERROR;
187       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
188       goto finished;
189     }
190
191       
192     // Encode the control header
193     // Control header is same as indication header ( except interface direction ?)
194     header_helper.interface_direction = 0;
195     res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
196     
197     if (likely(res)){
198       control_data.control_header = &scratch_buffer[current_index];
199       control_data.control_header_size = remaining_buffer;      
200       current_index += remaining_buffer ;
201       remaining_buffer = _buffer_size - current_index;
202       
203     }
204     else{
205       state = E2SM_CONTROL_HEADER_ERROR;
206       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
207       goto finished;
208     }
209     
210     if (current_index >= _buffer_size){
211       state = BUFFER_ERROR;
212       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
213       goto finished;
214     }
215     
216     // Generate control message
217     mlen = _buffer_size;
218     control_data.req_id = indication_data.req_id;
219     control_data.req_seq_no = indication_data.req_seq_no;
220     control_data.func_id = indication_data.func_id;
221     control_data.control_ack = 2; // no ack required       
222     res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
223     
224     if(likely(res)){
225       send_msg = true;
226       message->len = mlen;
227       message->mtype = RIC_CONTROL_REQ;
228       mdclog_write(MDCLOG_DEBUG,  "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
229     }
230     else{
231       state = E2AP_CONTROL_ERROR;
232       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2AP control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
233       goto finished;
234     }
235     
236     // Record id for tracking .... (tbd)
237   finished:
238     ASN_STRUCT_FREE(asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, e2sm_header);
239     ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
240     
241     break;
242     
243   case (RIC_SUB_RESP):
244   case (RIC_SUB_DEL_RESP):
245   case (RIC_SUB_FAILURE):
246   case ( RIC_SUB_DEL_FAILURE ):
247     if (_ref_sub_handler != NULL){
248       mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
249       _ref_sub_handler->Response(message->mtype, message->payload, message->len);
250     }
251     else{
252       state = MISSING_HANDLER_ERROR;
253       mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
254     }
255     
256     break;
257     
258     
259   case DC_ADM_INT_CONTROL:
260     {
261       if(_ref_policy_handler != NULL){
262         // Need to apply config. Since config may need to be
263         // applied across all threads, we do a callback to the parent thread.
264         // wait for config to be applied and then send response
265         _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true);
266         std::memcpy( (char *) message->payload, response.c_str(),  response.length());
267         message->len = response.length();
268         message->mtype = DC_ADM_INT_CONTROL_ACK;
269         send_msg = true;
270       }
271       else{
272         state = MISSING_HANDLER_ERROR;
273         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
274       }
275     }
276     break;
277
278   case DC_ADM_GET_POLICY:
279     {
280       if(_ref_policy_handler != NULL){
281         _ref_policy_handler(DC_ADM_GET_POLICY,  (const char *) message->payload, message->len, response, false);
282         std::memcpy((char *)message->payload, response.c_str(), response.length());
283         message->len = response.length();
284         message->mtype = DC_ADM_GET_POLICY_ACK;
285         send_msg = true;
286       }
287       else{
288         state = MISSING_HANDLER_ERROR;
289         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
290       }
291     }
292     break;
293     
294   default:
295     mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
296
297   };
298
299   if(send_msg){
300     return true;
301   }
302   else{
303     return false;
304   }
305 };
306
307    
308 unsigned long const message_processor::get_messages (void){
309     return _num_messages;
310 };
311