Added RIC subsequent Action and time to Wait in subscription request
[ric-app/admin.git] / src / message_processor_class.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20
21 #include <message_processor_class.hpp>
22
23
24 int verbose_flag = 0;
25
26 message_processor::message_processor(int mode, bool report_mode, size_t buffer_length):  _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27   processing_level = mode;
28   report_mode_only = report_mode;
29   _buffer_size = buffer_length;
30   scratch_buffer = 0;
31   scratch_buffer = (unsigned char *)calloc(_buffer_size, sizeof(unsigned char));
32   assert(scratch_buffer != 0);
33
34 };
35
36
37 void message_processor::register_subscription_handler(subscription_handler * sub){
38   _ref_sub_handler = sub;
39 }
40
41
42 void message_processor::register_protector(protector * p){
43   _ref_protector = p;
44 }
45
46 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
47   _ref_policy_handler = f1;
48 }
49
50 message_processor::~message_processor(void){
51   free(scratch_buffer);
52 }
53
54 // main message processing 
55 bool message_processor::operator()(rmr_mbuf_t *message){
56
57   bool res;
58   bool send_msg = false;
59   asn_dec_rval_t rval;
60   size_t buf_size = _buffer_size;
61   size_t mlen;
62   _num_messages ++;
63   std::string response;
64
65   //FILE *pfile;
66   //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
67
68   state = NO_ERROR;
69   mdclog_write(MDCLOG_DEBUG, "Received RMR message of type %d and size %d\n", message->mtype, message->len);
70   
71   // Sanity check on RMR. Max size ?
72   if (message->len > MAX_RMR_RECV_SIZE){
73     state = RMR_ERROR;
74     mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
75     return false;
76   }
77   
78   // start measurement 
79   auto start = std::chrono::high_resolution_clock::now();
80   
81   // main message processing code
82   switch(message->mtype){
83     
84   case RIC_INDICATION:
85     
86     if (unlikely(_ref_protector == NULL)){
87       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
88       state = MISSING_HANDLER_ERROR;
89       break;
90     }
91     
92     //pfile = fopen(filename.c_str(), "wb");
93     //fwrite(message->payload, sizeof(char), message->len, pfile);
94     //fclose(pfile);
95
96     e2ap_recv_pdu = 0;
97     e2sm_header = 0;
98     
99     rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
100     
101       
102     if(likely(rval.code == RC_OK)){
103       num_indications ++;
104       res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
105       if (unlikely(!res)){
106         state = E2AP_INDICATION_ERROR;
107         num_err_indications ++;
108         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
109         goto finished;
110       }
111       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
112       //xer_fprint(stdout, &asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
113       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
114     }
115     else{
116       num_err_indications ++;
117       state = E2AP_INDICATION_ERROR;
118       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
119       goto finished;
120     }
121     
122     mdclog_write(MDCLOG_DEBUG, "E2AP INDICATION :: Successfully received E2AP Indication  with id = %ld, sequence no = %ld, Number of indications = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
123
124     
125     // do we progress ahead ?
126     if(processing_level == E2AP_PROC_ONLY){
127       goto finished;
128     }
129     
130     //Decode the SM header
131     mdclog_write(MDCLOG_DEBUG, "Decoding e2sm header of size %lu\n", indication_data.indication_header_size);
132     rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
133     
134     if (likely(rval.code == RC_OK)){
135       res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
136       if (unlikely(!res)){
137         state = E2SM_INDICATION_HEADER_ERROR;
138         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
139         goto finished;
140       }
141       
142       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
143       // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
144       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;    
145     }
146     else{
147       state = E2SM_INDICATION_HEADER_ERROR;
148       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
149       goto finished;
150     }
151     mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
152     
153           
154     // do we progress ahead ?
155     if(processing_level == E2SM_PROC_ONLY){
156       goto finished;
157     }
158     
159
160     // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
161     // Send payload to plugin
162     current_index = 0;
163     remaining_buffer = _buffer_size;
164     mdclog_write(MDCLOG_DEBUG, "Processing E2AP Indication message of size %lu\n", indication_data.indication_msg_size);
165     
166     res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
167     if(unlikely(!res)){
168       state = PLUGIN_ERROR;
169       goto finished;
170     }
171                             
172     // Do we respond ? Depends on RIC indication type ...
173     // if RICindicationType == report, then no response
174     // else if RICindicationType == insert, generate control ...
175     if (report_mode_only|| indication_data.indication_type == E2N_RICindicationType::E2N_RICindicationType_report){
176       mdclog_write(MDCLOG_DEBUG, "Indication type is report. Not generating control\n");
177       goto finished;
178     }
179
180     // we assume for now that just like in E2AP indication, response message is directly put in E2AP control 
181     control_data.control_msg = &scratch_buffer[current_index];
182     control_data.control_msg_size = buf_size; // re-use for size
183     current_index += buf_size ;
184     remaining_buffer = _buffer_size - current_index;
185     mdclog_write(MDCLOG_DEBUG, "Encoded X2AP response of size %lu bytes. Remaining buffer = %lu bytes\n", buf_size, remaining_buffer);
186     
187     if (current_index >= _buffer_size){
188       state = BUFFER_ERROR;
189       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
190       goto finished;
191     }
192
193       
194     // Encode the control header
195     // Control header is same as indication header ( except interface direction ?)
196     header_helper.interface_direction = 0;
197     res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
198     
199     if (likely(res)){
200       control_data.control_header = &scratch_buffer[current_index];
201       control_data.control_header_size = remaining_buffer;      
202       current_index += remaining_buffer ;
203       remaining_buffer = _buffer_size - current_index;
204       
205     }
206     else{
207       state = E2SM_CONTROL_HEADER_ERROR;
208       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
209       goto finished;
210     }
211     
212     if (current_index >= _buffer_size){
213       state = BUFFER_ERROR;
214       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %lu too small to encode further objects \n", __FILE__, __LINE__, _buffer_size);
215       goto finished;
216     }
217     
218     // Generate control message
219     mlen = _buffer_size;
220     control_data.req_id = indication_data.req_id;
221     control_data.req_seq_no = indication_data.req_seq_no;
222     control_data.func_id = indication_data.func_id;
223     control_data.control_ack = 2; // no ack required       
224     res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
225     
226     if(likely(res)){
227       send_msg = true;
228       message->len = mlen;
229       message->mtype = RIC_CONTROL_REQ;
230       mdclog_write(MDCLOG_DEBUG,  "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
231     }
232     else{
233       state = E2AP_CONTROL_ERROR;
234       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2AP control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
235       goto finished;
236     }
237     
238     // Record id for tracking .... (tbd)
239   finished:
240     ASN_STRUCT_FREE(asn_DEF_E2N_E2SM_gNB_X2_indicationHeader, e2sm_header);
241     ASN_STRUCT_FREE(asn_DEF_E2N_E2AP_PDU, e2ap_recv_pdu);
242     
243     break;
244     
245   case (RIC_SUB_RESP):
246   case (RIC_SUB_DEL_RESP):
247   case (RIC_SUB_FAILURE):
248   case ( RIC_SUB_DEL_FAILURE ):
249     if (_ref_sub_handler != NULL){
250       // extract meid ..
251       unsigned char meid[32];
252       rmr_get_meid(message, meid);
253       mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
254       _ref_sub_handler->Response(message->mtype, message->payload, message->len, (const char *)meid);
255     }
256     else{
257       state = MISSING_HANDLER_ERROR;
258       mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
259     }
260     
261     break;
262     
263     
264   case A1_POLICY_REQ:
265     {
266       if(_ref_policy_handler != NULL){
267         // Need to apply config. Since config may need to be
268         // applied across all threads, we do a callback to the parent thread.
269         // wait for config to be applied and then send response
270         _ref_policy_handler(A1_POLICY_REQ, (const char *) message->payload, message->len, response, true);
271         std::memcpy( (char *) message->payload, response.c_str(),  response.length());
272         message->len = response.length();
273         message->mtype = A1_POLICY_RESP;
274         send_msg = true;
275       }
276       else{
277         state = MISSING_HANDLER_ERROR;
278         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Policy handler not assigned in message processor !", __FILE__, __LINE__);
279       }
280     }
281     break;
282
283     
284   default:
285     mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
286
287   };
288
289   if(send_msg){
290     return true;
291   }
292   else{
293     return false;
294   }
295 };
296
297    
298 unsigned long const message_processor::get_messages (void){
299     return _num_messages;
300 };
301