bc017ad9bf85790e493f0afbfc02d0579d2535f8
[ric-app/admin.git] / src / message_processor_class.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20
21 #include <message_processor_class.hpp>
22
23
24 int verbose_flag = 0;
25
26 message_processor::message_processor():  _ref_sub_handler(NULL), _ref_protector(NULL), _ref_policy_handler(NULL), _num_messages(0), current_index (0), num_indications(0), num_err_indications(0){
27   scratch_buffer = 0;
28   scratch_buffer = (unsigned char *)calloc(BUFFER_LENGTH, sizeof(unsigned char));
29   assert(scratch_buffer != 0);
30   remaining_buffer = BUFFER_LENGTH;
31 };
32
33
34 void message_processor::register_subscription_handler(SubscriptionHandler * sub){
35   _ref_sub_handler = sub;
36 }
37
38
39 void message_processor::register_protector(protector * p){
40   _ref_protector = p;
41 }
42
43 void message_processor::register_policy_handler(void (*f1)(int, const char *, int, std::string &, bool)){
44   _ref_policy_handler = f1;
45 }
46
47 message_processor::~message_processor(void){
48   free(scratch_buffer);
49 }
50
51 // main message processing 
52 bool message_processor::operator()(rmr_mbuf_t *message){
53
54   bool res;
55   int message_type, procedure_code;
56   bool send_msg = false;
57   asn_dec_rval_t rval;
58   size_t buf_size = BUFFER_LENGTH;
59   size_t mlen;
60   _num_messages ++;
61   std::string response;
62   //FILE *pfile;
63   //std::string filename = "/opt/out/e2ap_" + std::to_string(_num_messages) + ".per";
64
65   // main message processing code
66   switch(message->mtype){
67     
68   case RIC_INDICATION:
69     
70     if (unlikely(_ref_protector == NULL)){
71       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: No plugin registered to handle ric indication message\n", __FILE__, __LINE__);
72       break;
73     }
74     
75     //pfile = fopen(filename.c_str(), "wb");
76     //fwrite(message->payload, sizeof(char), message->len, pfile);
77     //fclose(pfile);
78
79     e2ap_recv_pdu = 0;
80     e2sm_header = 0;
81    
82     rval = asn_decode(0,ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void**)&(e2ap_recv_pdu), message->payload, message->len);
83
84     if(likely(rval.code == RC_OK)){
85       num_indications ++;
86       res = indication_processor.get_fields(e2ap_recv_pdu->choice.initiatingMessage, indication_data);
87       if (unlikely(!res)){
88         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from RICindication message\n", __FILE__, __LINE__);
89         goto finished;
90       }
91       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
92       //xer_fprint(stdout, &asn_DEF_E2AP_PDU, e2ap_recv_pdu);
93       //std::cout <<"+++++++++++++++++++++++ E2AP Indication ++++++++++++++++++++++++" << std::endl;
94     }
95     else{
96       num_err_indications ++;
97       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2AP PDU\n", __FILE__, __LINE__);
98       goto finished;
99     }
100
101     mdclog_write(MDCLOG_INFO, "E2AP INDICATION :: Successfully received E2AP Indication message with id = %d, sequence no = %d, Number of indication messages = %lu, Number of erroneous indications = %lu\n", indication_data.req_id, indication_data.req_seq_no, num_indications, num_err_indications);
102       
103     //Decode the SM header
104     rval = asn_decode(0, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2SM_gNB_X2_indicationHeader, (void**)&(e2sm_header), indication_data.indication_header, indication_data.indication_header_size);
105     if (likely(rval.code == RC_OK)){
106       res = e2sm_indication_processor.get_header_fields(e2sm_header, header_helper);
107       if (unlikely(!res)){
108         mdclog_write(MDCLOG_ERR, "Error :: %s, %d :: Could not get fields from E2SM HEADER\n", __FILE__, __LINE__);
109         goto finished;
110       }
111       
112       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;
113       // xer_fprint(stdout, &asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
114       // std::cout <<"+++++++++++++++++++++++ E2SM Indication Header ++++++++++++++++++++++++" << std::endl;    
115     }
116     else{
117       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error decoding E2SM Header.", __FILE__, __LINE__);
118       goto finished;
119     }
120     mdclog_write(MDCLOG_DEBUG, "E2SM INDICATION HEADER :: Successfully decoded E2SM Indication Header of size %lu\n", indication_data.indication_header_size);
121
122     
123     // NOTE : We assume RICindicationMessage contains payload (not E2SM message)
124     // Send payload to plugin
125     current_index = 0;
126     remaining_buffer = BUFFER_LENGTH;
127     
128     res = (*_ref_protector)(indication_data.indication_msg, indication_data.indication_msg_size, scratch_buffer, &buf_size);
129     if(unlikely(!res)){
130       goto finished;
131     }
132     
133     // Do we respond ? Depends on RIC indication type ...
134     // if RICindicationType == report, then no response
135     // else if RICindicationType == insert, generate control ...
136     if (report_mode_only|| indication_data.indication_type == RICindicationType::RICindicationType_report){
137       mdclog_write(MDCLOG_INFO, "Indication type is report. Not generating control\n");
138       goto finished;
139     }
140     
141     message_helper.x2ap_pdu = &scratch_buffer[current_index];
142     message_helper.x2ap_pdu_size = buf_size; // re-use for size
143     current_index +=message_helper.x2ap_pdu_size ;
144     remaining_buffer = BUFFER_LENGTH - current_index;
145     
146     if (current_index >= BUFFER_LENGTH){
147       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %d too small to encode further objects \n", __FILE__, __LINE__, BUFFER_LENGTH);
148       goto finished;
149     }
150
151       
152     // Encode the control header
153     // Control header is same as indication header ( except interface direction ?)
154     header_helper.interface_direction = 0;
155     res = e2sm_control_processor.encode_control_header(&scratch_buffer[current_index], &remaining_buffer, header_helper);
156     if (likely(res)){
157       control_data.control_header = &scratch_buffer[current_index];
158       control_data.control_header_size = remaining_buffer;      
159       current_index += remaining_buffer ;
160       remaining_buffer = BUFFER_LENGTH - current_index;
161       
162     }
163     else{
164       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control header. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
165       goto finished;
166     }
167     
168     if (current_index >= BUFFER_LENGTH){
169       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error buffer size %d too small to encode further objects \n", __FILE__, __LINE__, BUFFER_LENGTH);
170       goto finished;
171     }
172     
173     // Encode the control message
174     res = e2sm_control_processor.encode_control_message(&scratch_buffer[current_index], &remaining_buffer, message_helper);
175     if (likely(res)){
176       control_data.control_msg = &scratch_buffer[current_index];
177       control_data.control_msg_size = remaining_buffer;
178       
179       current_index += remaining_buffer ;
180       remaining_buffer = BUFFER_LENGTH - current_index;
181     }
182     else{
183       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control message. Reason = %s\n", __FILE__, __LINE__, e2sm_control_processor.get_error().c_str());
184       goto finished;
185     }
186     
187     // Generate control message
188     mlen = BUFFER_LENGTH;
189     control_data.req_id = indication_data.req_id;
190     control_data.req_seq_no = indication_data.req_seq_no;
191     control_data.func_id = indication_data.func_id;
192     control_data.control_ack = 2; // no ack required 
193       
194     res = control_request_processor.encode_e2ap_control_request(message->payload, &mlen, control_data);
195     
196     if(likely(res)){
197       send_msg = true;
198       message->len = mlen;
199       message->mtype = RIC_CONTROL_REQ;
200       mdclog_write(MDCLOG_INFO, "E2AP CONTROL MESSAGE :: Successfully generated E2AP Control Message\n");
201     }
202     else{
203       mdclog_write(MDCLOG_ERR, "Error :: %s, %d: Error encoding E2SM control . Reason = %s\n", __FILE__, __LINE__, control_request_processor.get_error().c_str());
204       goto finished;
205     }
206     // Record id for tracking .... (tbd)
207   finished:
208     ASN_STRUCT_FREE(asn_DEF_E2SM_gNB_X2_indicationHeader, e2sm_header);
209     ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2ap_recv_pdu);
210     
211     if (send_msg){
212       return true;
213     }
214     
215     break;
216     
217   case (RIC_SUB_RESP):
218   case (RIC_SUB_DEL_RESP):
219   case (RIC_SUB_FAILURE):
220   case ( RIC_SUB_DEL_FAILURE ):
221     if (_ref_sub_handler != NULL){
222       mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
223       _ref_sub_handler->Response(message->mtype, message->payload, message->len);
224     }
225     else{
226       mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
227       return false;
228     }
229     
230     break;
231     
232     
233   case DC_ADM_INT_CONTROL:
234     {
235
236       // Need to apply config. Since config may need to be
237       // applied across all threads, we do a callback to the parent thread.
238       // wait for config to be applied and then send response
239       _ref_policy_handler(DC_ADM_INT_CONTROL, (const char *) message->payload, message->len, response, true);
240       std::memcpy( (char *) message->payload, response.c_str(),  response.length());
241       message->len = response.length();
242       message->mtype = DC_ADM_INT_CONTROL_ACK;
243       return true;
244       
245     }
246     break;
247
248   case DC_ADM_GET_POLICY:
249     {
250       _ref_policy_handler(DC_ADM_GET_POLICY,  (const char *) message->payload, message->len, response, false);
251       std::memcpy((char *)message->payload, response.c_str(), response.length());
252       message->len = response.length();
253       message->mtype = DC_ADM_GET_POLICY_ACK;
254       return true;
255     }
256     break;
257     
258   default:
259     mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
260   };
261   
262   return false;
263 };
264
265    
266 unsigned long const message_processor::get_messages (void){
267     return _num_messages;
268 };
269