1601daa87a7ef481bdea1c2aacf5f74cd7f20409
[ric-app/admin.git] / src / message_processor_class.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20
21 #include <message_processor_class.hpp>
22
23
24 int verbose_flag = 0;
25
26 message_processor::message_processor(int mode, bool report_mode, size_t buffer_length, size_t reporting_interval):  _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27   processing_level = mode;
28   report_mode_only = report_mode;
29   _buffer_size = buffer_length;
30   scratch_buffer = 0;
31   scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
32   assert(scratch_buffer != 0);
33   _reporting_interval = reporting_interval;
34   num_proc_packets = 0;
35   processing_duration = 0;
36   processing_dev = 0;
37   max_proc_duration = 0;
38
39 };
40
41
42 void message_processor::register_subscription_handler(subscription_handler * sub){
43   _ref_sub_handler = sub;
44 }
45
46
47 void message_processor::register_protector(protector * p){
48   _ref_protector = p;
49 }
50
51 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
52   _ref_policy_handler = f1;
53 }
54
55 message_processor::~message_processor(void){
56   free(scratch_buffer);
57 }
58
59 // main message processing 
60 bool message_processor::operator()(rmr_mbuf_t *message){
61
62   bool res;
63   bool send_msg = false;
64   asn_dec_rval_t rval;
65   size_t buf_size = _buffer_size;
66   size_t mlen;
67   _num_messages ++;
68   std::string response;
69
70   //FILE *pfile;
71   //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
72
73   state = NO_ERROR;
74   mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
75   
76   // Sanity check on RMR. Max size ?
77   if (message->len > MAX_RMR_RECV_SIZE){
78     state = RMR_ERROR;
79     mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
80     return false;
81   }
82   
83   // start measurement 
84   auto start = std::chrono::high_resolution_clock::now();
85   
86   // main message processing code
87   switch(message->mtype){
88     
89   case RIC_INDICATION:
90     
91     if (unlikely(_ref_protector == NULL)){
92       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
93       state = MISSING_HANDLER_ERROR;
94       break;
95     }
96     
97     //pfile = fopen(filename.c_str(), "wb");
98     //fwrite(message->payload, sizeof(char), message->len, pfile);
99     //fclose(pfile);
100
101     e2ap_recv_pdu = 0;
102     e2sm_header = 0;
103     
104     rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
105     
106       
107     if(likely(rval.code == RC_OK)){
108       num_indications ++;
109       res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
110       if (unlikely(!res)){
111         state = E2AP_INDICATION_ERROR;
112         num_err_indications ++;
113         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
114         goto finished;
115       }
116       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
117       //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
118       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
119     }
120     else{
121       num_err_indications ++;
122       state = E2AP_INDICATION_ERROR;
123       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
124       goto finished;
125     }
126     
127     mdclog_write(MDCLOG_DEBUG, "E2AP INDICATION :: Successfully received E2AP Indication  with id = %ld, sequence no = %ld, Number of indications = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
128
129     
130     // do we progress ahead ?
131     if(processing_level == E2AP_PROC_ONLY){
132       goto finished;
133     }
134     
135     //Decode the SM header
136     mdclog_write(MDCLOG_DEBUG, "Decoding e2sm header of size %lu\n", indication_data.indication_header_size);
137     rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
138     
139     if (likely(rval.code == RC_OK)){
140       res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
141       if (unlikely(!res)){
142         state = E2SM_INDICATION_HEADER_ERROR;
143         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
144         goto finished;
145       }
146       
147       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
148       // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
149       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;    
150     }
151     else{
152       state = E2SM_INDICATION_HEADER_ERROR;
153       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
154       goto finished;
155     }
156     mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
157     
158           
159     // do we progress ahead ?
160     if(processing_level == E2SM_PROC_ONLY){
161       goto finished;
162     }
163     
164
165     // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
166     // Send payload to plugin
167     current_index = 0;
168     remaining_buffer = _buffer_size;
169     mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
170     
171     res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
172     if(unlikely(!res)){
173       state = PLUGIN_ERROR;
174       goto finished;
175     }
176                             
177     // Do we respond ? Depends on RIC indication type ...
178     // if RICindicationType == report, then no response
179     // else if RICindicationType == insert, generate control ...
180     if (report_mode_only|| indication_data.indication_type == E2N_RICindicationType::E2N_RICindicationType_report){
181       mdclog_write(MDCLOG_DEBUG, "Indication type is report. Not generating control\n");
182       goto finished;
183     }
184
185     // we assume for now that just like in E2AP indication, response message is directly put in E2AP control 
186     control_data.control_msg = &scratch_buffer[current_index];
187     control_data.control_msg_size = buf_size; // re-use for size
188     current_index += buf_size ;
189     remaining_buffer = _buffer_size - current_index;
190     mdclog_write(MDCLOG_DEBUG, "Encoded X2AP response of size %lu bytes. Remaining buffer = %lu bytes\n", buf_size, remaining_buffer);
191     
192     if (current_index >= _buffer_size){
193       state = BUFFER_ERROR;
194       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
195       goto finished;
196     }
197
198       
199     // Encode the control header
200     // Control header is same as indication header ( except interface direction ?)
201     header_helper.interface_direction = 0;
202     res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
203     
204     if (likely(res)){
205       control_data.control_header = &scratch_buffer[current_index];
206       control_data.control_header_size = remaining_buffer;      
207       current_index += remaining_buffer ;
208       remaining_buffer = _buffer_size - current_index;
209       
210     }
211     else{
212       state = E2SM_CONTROL_HEADER_ERROR;
213       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
214       goto finished;
215     }
216     
217     if (current_index >= _buffer_size){
218       state = BUFFER_ERROR;
219       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
220       goto finished;
221     }
222     
223     // Generate control message
224     mlen = _buffer_size;
225     control_data.req_id = indication_data.req_id;
226     control_data.req_seq_no = indication_data.req_seq_no;
227     control_data.func_id = indication_data.func_id;
228     control_data.control_ack = 2; // no ack required       
229     res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
230     
231     if(likely(res)){
232       send_msg = true;
233       message->len = mlen;
234       message->mtype = RIC_CONTROL_REQ;
235       mdclog_write(MDCLOG_DEBUG,  "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
236     }
237     else{
238       state = E2AP_CONTROL_ERROR;
239       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2AP control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
240       goto finished;
241     }
242     
243     // Record id for tracking .... (tbd)
244   finished:
245     ASN_STRUCT_FREE(asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, e2sm_header);
246     ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
247     
248     break;
249     
250   case (RIC_SUB_RESP):
251   case (RIC_SUB_DEL_RESP):
252   case (RIC_SUB_FAILURE):
253   case ( RIC_SUB_DEL_FAILURE ):
254     if (_ref_sub_handler != NULL){
255       mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
256       _ref_sub_handler->Response(message->mtype, message->payload, message->len);
257     }
258     else{
259       state = MISSING_HANDLER_ERROR;
260       mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
261     }
262     
263     break;
264     
265     
266   case DC_ADM_INT_CONTROL:
267     {
268       if(_ref_policy_handler != NULL){
269         // Need to apply config. Since config may need to be
270         // applied across all threads, we do a callback to the parent thread.
271         // wait for config to be applied and then send response
272         _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true);
273         std::memcpy( (char *) message->payload, response.c_str(),  response.length());
274         message->len = response.length();
275         message->mtype = DC_ADM_INT_CONTROL_ACK;
276         send_msg = true;
277       }
278       else{
279         state = MISSING_HANDLER_ERROR;
280         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
281       }
282     }
283     break;
284
285   case DC_ADM_GET_POLICY:
286     {
287       if(_ref_policy_handler != NULL){
288         _ref_policy_handler(DC_ADM_GET_POLICY,  (const char *) message->payload, message->len, response, false);
289         std::memcpy((char *)message->payload, response.c_str(), response.length());
290         message->len = response.length();
291         message->mtype = DC_ADM_GET_POLICY_ACK;
292         send_msg = true;
293       }
294       else{
295         state = MISSING_HANDLER_ERROR;
296         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
297       }
298     }
299     break;
300     
301   default:
302     mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
303
304   };
305
306   auto end = std::chrono::high_resolution_clock::now();
307   // Ad hoc metric reporting for now ...
308   // every reporting interval, prints to stdou
309   // 1, reporting interval (# of packets)
310   // 2. average  processing time (in micro seconds) across each packet
311   // 3. standard deviation (micro seconds ^2) a
312   // 4. maximum processing time (in micro seconds)
313   
314   if(num_proc_packets == _reporting_interval){
315     auto epoch = std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()).time_since_epoch();
316     double avg_latency =  processing_duration/num_proc_packets;
317     double std_dev_latency = processing_dev/num_proc_packets - avg_latency * avg_latency;
318     std::cout << "Processing Metrics : " << epoch.count() << "," << num_proc_packets << "," << avg_latency << "," << std_dev_latency << "," << max_proc_duration << std::endl;  
319     processing_duration = 0;
320     processing_dev = 0;
321     max_proc_duration = 0;
322     num_proc_packets = 0;
323   }
324
325   double  elapsed = std::chrono::duration<double, std::micro>(end - start).count();
326   if(elapsed > max_proc_duration){
327     max_proc_duration = elapsed;
328   }
329   
330   processing_duration += elapsed;
331   processing_dev += elapsed * elapsed;
332   num_proc_packets ++;
333
334   
335   if(send_msg){
336     return true;
337   }
338   else{
339     return false;
340   }
341 };
342
343    
344 unsigned long const message_processor::get_messages (void){
345     return _num_messages;
346 };
347