ISSUE ID:- (RICAPP-176).
[ric-app/bouncer.git] / Bouncer / src / xapp.cc
1 /*
2 # ==================================================================================
3 # Copyright (c) 2020 HCL Technologies Limited.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
17 */
18
19 #include "xapp.hpp"
20 #include <nlohmann/json.hpp>
21 #include <iostream>
22 #include<string>
23 #include <cpprest/http_client.h>
24 #include <cpprest/filestream.h>
25 #include <cpprest/uri.h>
26 #include <cpprest/json.h>
27 using namespace utility;
28 using namespace web;
29 using namespace web::http;
30 using namespace web::http::client;
31 using namespace concurrency::streams;
32 using jsonn = nlohmann::json;
33 #define BUFFER_SIZE 1024
34 extern std::vector<std::string>SubscriptionIds;
35  Xapp::Xapp(XappSettings &config, XappRmr &rmr){
36
37           rmr_ref = &rmr;
38           config_ref = &config;
39           xapp_mutex = NULL;
40           subhandler_ref = NULL;
41           return;
42   }
43
44 Xapp::~Xapp(void){
45
46         //Joining the threads
47         int threadcnt = xapp_rcv_thread.size();
48                 for(int i=0; i<threadcnt; i++){
49                         if(xapp_rcv_thread[i].joinable())
50                                 xapp_rcv_thread[i].join();
51         }
52         xapp_rcv_thread.clear();
53
54         if(xapp_mutex!=NULL){
55                 xapp_mutex->~mutex();
56                 delete xapp_mutex;
57         }
58 };
59
60 //Stop the xapp. Note- To be run only from unit test scripts.
61 void Xapp::stop(void){
62   // Get the mutex lock
63         std::lock_guard<std::mutex> guard(*xapp_mutex);
64         rmr_ref->set_listen(false);
65         rmr_ref->~XappRmr();
66
67         //Detaching the threads....not sure if this is the right way to stop the receiver threads.
68         //Hence function should be called only in Unit Tests
69         int threadcnt = xapp_rcv_thread.size();
70         for(int i=0; i<threadcnt; i++){
71                 xapp_rcv_thread[i].detach();
72         }
73         sleep(10);
74 }
75
76 void Xapp::startup(SubscriptionHandler &sub_ref) {
77
78         subhandler_ref = &sub_ref;
79         set_rnib_gnblist();
80
81         sleep(70);
82
83         //send subscriptions.
84         startup_subscribe_requests();
85
86         //read A1 policies
87         //startup_get_policies();
88         return;
89 }
90 void Xapp::Run(){
91         rmr_ref->set_listen(true);
92         if(xapp_mutex == NULL){
93                 xapp_mutex = new std::mutex();
94         }
95         std::lock_guard<std::mutex> guard(*xapp_mutex);
96
97         for(int j=0; j < _callbacks.size(); j++){
98                 std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(_callbacks[j]), rmr_ref);});
99                 xapp_rcv_thread.push_back(std::move(th_recv));
100         }
101
102         return;
103 }
104
105 //Starting a seperate single receiver
106 void Xapp::start_xapp_receiver(XappMsgHandler& mp_handler){
107         //start a receiver thread. Can be multiple receiver threads for more than 1 listening port.
108         rmr_ref->set_listen(true);
109         if(xapp_mutex == NULL){
110                 xapp_mutex = new std::mutex();
111         }
112
113         mdclog_write(MDCLOG_INFO,"Receiver Thread file= %s, line=%d",__FILE__,__LINE__);
114         std::lock_guard<std::mutex> guard(*xapp_mutex);
115         std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(mp_handler), rmr_ref);});
116         xapp_rcv_thread.push_back(std::move(th_recv));
117         return;
118 }
119
120 void Xapp::shutdown(){
121         
122         sleep(70);
123         //send subscriptions delete.
124         shutdown_subscribe_deletes();
125         return;
126 }
127
128 void Xapp::shutdown_subscribe_deletes(void )
129 {
130
131         bool res;
132         size_t data_size = ASN_BUFF_MAX_SIZE;
133         unsigned char   data[data_size];
134         //unsigned char meid[RMR_MAX_MEID];
135         char meid[RMR_MAX_MEID];
136         std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
137
138         mdclog_write(MDCLOG_INFO,"Preparing to send subscription Delete in file= %s, line=%d",__FILE__,__LINE__);
139
140         auto gnblist = get_rnib_gnblist();
141
142         int sz = gnblist.size();
143          mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
144
145         if(sz <= 0)
146                 mdclog_write(MDCLOG_INFO,"Subscriptions Delete cannot be sent as GNBList in RNIB is NULL");
147
148         for(int i = 0; i<sz; i++)
149         {
150                 sleep(15);
151                 //give the message to subscription handler, along with the transmitter.
152                 strcpy((char*)meid,gnblist[i].c_str());
153                 mdclog_write(MDCLOG_INFO,"sending %d subscription delete request out of : %d",i+1, sz);
154                 mdclog_write(MDCLOG_INFO,"sending subscription delete to ,meid = %s", meid);
155                 
156                 if (SubscriptionIds.size()>0)
157                 {
158                 auto delJson = pplx::create_task([i,meid]() {
159                 utility::string_t port = U("8088");
160                 utility::string_t address = U("http://service-ricplt-submgr-http.ricplt.svc.cluster.local:");
161                 address.append(port);
162                 address.append(U("/ric/v1/subscriptions/"));
163                 address.append( utility::string_t(SubscriptionIds.back()));
164                                 SubscriptionIds.pop_back();
165                 uri_builder uri(address);
166                 auto addr = uri.to_uri().to_string();
167                 http_client client(addr);
168                 ucout << utility::string_t(U("making requests at: ")) << addr <<std::endl;
169                 return client.request(methods::DEL);
170
171                  
172                         })
173
174                         // Get the response.
175                                 .then([](http_response response) {
176                                 // Check the status code.
177                                 if (response.status_code() != 204) {
178                                         throw std::runtime_error("Returned " + std::to_string(response.status_code()));
179                                 }
180
181                                 // Convert the response body to JSON object.
182                                         std::wcout << "Deleted: " << std::boolalpha << (response.status_code() == 204) << std::endl;
183                                         });
184
185                                 // serailize the user details.
186                       
187
188                                         try {
189                                                 delJson.wait();
190                                         }
191                                         catch (const std::exception& e) {
192                                                 printf("Error exception:%s\n", e.what());
193                                         }
194                                                                                 
195                 }
196                 
197                 else{
198                  mdclog_write(MDCLOG_ERR,"Subscription delete cannot send in file= %s, line=%d for MEID %s as no valid subIDS",__FILE__,__LINE__, meid);
199                 }
200
201                 /*
202
203
204                 subscription_helper  din;
205                 subscription_helper  dout;
206
207                 subscription_delete sub_del;
208                 subscription_delete sub_recv;
209
210  
211                 unsigned char buf[BUFFER_SIZE];
212                 size_t buf_size = BUFFER_SIZE;
213                 bool res;
214
215
216                 //Random Data  for request
217                 int request_id = 1;
218                 int function_id = 1;
219
220                 din.set_request(request_id);
221                 din.set_function_id(function_id);
222
223                 res = sub_del.encode_e2ap_subscription(&buf[0], &buf_size, din);
224
225                 mdclog_write(MDCLOG_INFO,"Sending subscription delete  in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
226
227                 xapp_rmr_header rmr_header; 
228                 rmr_header.message_type = RIC_SUB_DEL_REQ;
229                 rmr_header.payload_length = buf_size; //data_size
230
231                 strcpy((char*)rmr_header.meid,gnblist[i].c_str());
232                 auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)buf); //(void*)data)
233                 if (subhandler_ref)
234                 {
235                         mdclog_write(MDCLOG_INFO,"subhandler_ref is valid pointer");
236                 }
237                 else
238                 {
239                         mdclog_write(MDCLOG_INFO,"subhandler_ref is invalid pointer");
240                 }
241                 int result = subhandler_ref->manage_subscription_delete_request(gnblist[i], transmitter);
242
243                 if(result==SUBSCR_SUCCESS)
244                 {
245
246                         mdclog_write(MDCLOG_INFO,"Subscription Delete SUCCESSFUL in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
247                 }
248                 else 
249                 {
250                         mdclog_write(MDCLOG_ERR,"Subscription Delete FAILED in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
251                 }
252                 */
253         }
254 }
255
256 void Xapp::startup_subscribe_requests(void ){
257    bool res;
258    size_t data_size = ASN_BUFF_MAX_SIZE;
259    unsigned char        data[data_size];
260    char meid[RMR_MAX_MEID];
261    std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
262    //int a =std::stoi(xapp_id);
263    mdclog_write(MDCLOG_INFO,"Preparing to send subscription in file= %s, line=%d",__FILE__,__LINE__);
264
265    auto gnblist = get_rnib_gnblist();
266
267    int sz = gnblist.size();
268         mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
269    if(sz <= 0)
270            mdclog_write(MDCLOG_INFO,"Subscriptions cannot be sent as GNBList in RNIB is NULL");
271
272    for(int i = 0; i<sz; i++)
273    {
274          sleep(15);
275          strcpy((char*)meid,gnblist[i].c_str());
276         mdclog_write(MDCLOG_INFO,"sending %d subscription request out of : %d",i+1, sz);
277
278          //mdclog_write(MDCLOG_INFO,"GNBList,gnblist[i] = %s and ith val = %d", gnblist[i], i);
279          mdclog_write(MDCLOG_INFO,"sending subscription to ,meid = %s", meid);
280
281 auto postJson = pplx::create_task([meid,xapp_id]() {
282
283
284                 jsonn jsonObject;
285                  jsonObject =
286     {
287
288
289
290         {"SubscriptionId",""},
291         {"ClientEndpoint",{{"Host","service-ricxapp-bouncer-xapp-http.ricxapp"},{"HTTPPort",8080},{"RMRPort",4560}}},
292         {"Meid",meid},
293         {"RANFunctionID",0},
294         {"SubscriptionDetails",
295                 {
296                         {
297                             {"XappEventInstanceId",12345},{"EventTriggers",{0}},
298                             {"ActionToBeSetupList",
299                                     {
300                                         {
301                                                 {"ActionID",1},{"ActionType","report"},{"ActionDefinition",{0}},{"SubsequentAction",{{"SubsequentActionType","continue"},{"TimeToWait","zero"}}}
302                                         }
303                                     }
304                             }
305                         }
306                 }
307         }
308
309     };
310                         std::cout <<jsonObject.dump(4) << "\n";
311                         utility::stringstream_t s;
312                         s << jsonObject.dump().c_str();
313                         web::json::value ret = json::value::parse(s);
314                        // std::wcout << ret.serialize().c_str() << std::endl;
315                 utility::string_t port = U("8088");
316                  utility::string_t address = U("http://service-ricplt-submgr-http.ricplt.svc.cluster.local:");
317                   address.append(port);
318                   address.append(U("/ric/v1/subscriptions"));
319                 uri_builder uri(address);
320                 auto addr = uri.to_uri().to_string();
321                 http_client client(addr);
322                 //std::cout<<uri::validate(addr)<<" validation \n";
323                 ucout << utility::string_t(U("making requests at: ")) << addr << "\n";
324                 return client.request(methods::POST,U("/"),ret.serialize(),U("application/json"));
325                         })
326
327                         // Get the response.
328                                 .then([](http_response response) {
329                                 // Check the status code.
330                                 if (response.status_code() != 201) {
331                                         throw std::runtime_error("Returned " + std::to_string(response.status_code()));
332                                 }
333
334                                 // Convert the response body to JSON object.
335                                 return response.extract_json();
336                                         })
337
338                                 // serailize the user details.
339                                                 .then([](json::value jsonObject) {
340                                                 std::cout<<"\nRecieved REST subscription response\n";
341                                                 std::wcout << jsonObject.serialize().c_str() << "\n";
342                                                 std::string tmp;
343                                                 tmp=jsonObject[U("SubscriptionId")].as_string();
344                                                 SubscriptionIds.push_back(tmp);
345
346                                                         });
347
348                                         try {
349                                                 postJson.wait();
350                                         }
351                                         catch (const std::exception& e) {
352                                                 printf("Error exception:%s\n", e.what());
353                                         }
354
355         
356         /*
357          //give the message to subscription handler, along with the transmitter.
358          strcpy((char*)meid,gnblist[i].c_str());
359
360          mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
361         mdclog_write(MDCLOG_INFO,"sending %d subscription request out of : %d",i+1, sz);
362          subscription_helper  din;
363          subscription_helper  dout;
364
365          subscription_request sub_req;
366          subscription_request sub_recv;
367
368          unsigned char buf[BUFFER_SIZE];
369          size_t buf_size = BUFFER_SIZE;
370          bool res;
371
372
373          //Random Data  for request
374          int request_id = 1;
375          int function_id = 0;
376          std::string event_def = "01";
377
378          din.set_request(request_id);
379          din.set_function_id(function_id);
380          din.set_event_def(event_def.c_str(), event_def.length());
381
382          std::string act_def = "01";
383
384          din.add_action(1,0,(void*)act_def.c_str(), act_def.length(), 0);
385
386          res = sub_req.encode_e2ap_subscription(&buf[0], &buf_size, din);
387
388          //mdclog_write(MDCLOG_INFO,"GNBList = %s and ith val = %d", gnblist[i], i);
389
390          mdclog_write(MDCLOG_INFO,"Sending subscription in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
391          
392          xapp_rmr_header rmr_header;
393          rmr_header.message_type = RIC_SUB_REQ;
394          rmr_header.payload_length = buf_size; //data_size
395
396          strcpy((char*)rmr_header.meid,gnblist[i].c_str());
397
398          auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)buf); //(void*)data);
399          
400          int result = subhandler_ref->manage_subscription_request(gnblist[i], transmitter);
401          
402          if(result==SUBSCR_SUCCESS){
403
404               mdclog_write(MDCLOG_INFO,"Subscription SUCCESSFUL in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
405           }
406           else {
407                  mdclog_write(MDCLOG_ERR,"Subscription FAILED in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
408               }
409             */  
410         }
411    std::cout<<"\n SubscriptionIds vector size= "<<SubscriptionIds.size()<<"\n"; 
412 }
413
414 void Xapp::startup_get_policies(void){
415
416     int policy_id = BOUNCER_POLICY_ID;
417
418     std::string policy_query = "{\"policy_type_id\":" + std::to_string(policy_id) + "}";
419     unsigned char * message = (unsigned char *)calloc(policy_query.length(), sizeof(unsigned char));
420     memcpy(message, policy_query.c_str(),  policy_query.length());
421     xapp_rmr_header header;
422     header.payload_length = policy_query.length();
423     header.message_type = A1_POLICY_QUERY;
424     mdclog_write(MDCLOG_INFO, "Sending request for policy id %d\n", policy_id);
425     rmr_ref->xapp_rmr_send(&header, (void *)message);
426     free(message);
427
428 }
429
430 void Xapp::set_rnib_gnblist(void) {
431
432            openSdl();
433
434            void *result = getListGnbIds();
435            if(strlen((char*)result) < 1){
436                     mdclog_write(MDCLOG_ERR, "ERROR: no data from getListGnbIds\n");
437                 return;
438             }
439
440             mdclog_write(MDCLOG_INFO, "GNB List in R-NIB %s\n", (char*)result);
441
442
443             Document doc;
444             ParseResult parseJson = doc.Parse<kParseStopWhenDoneFlag>((char*)result);
445             if (!parseJson) {
446                 std::cerr << "JSON parse error: %s (%u)", GetParseErrorFunc(parseJson.Code());
447                 return;
448             }
449
450             if(!doc.HasMember("gnb_list")){
451                 mdclog_write(MDCLOG_INFO, "JSON Has No GNB List Object");
452                 return;
453             }
454             assert(doc.HasMember("gnb_list"));
455
456             const Value& gnblist = doc["gnb_list"];
457             if (gnblist.IsNull())
458               return;
459
460             if(!gnblist.IsArray()){
461                 mdclog_write(MDCLOG_INFO, "GNB List is not an array");
462                 return;
463             }
464
465
466                 assert(gnblist.IsArray());
467             for (SizeType i = 0; i < gnblist.Size(); i++) // Uses SizeType instead of size_t
468             {
469                 assert(gnblist[i].IsObject());
470                 const Value& gnbobj = gnblist[i];
471                 assert(gnbobj.HasMember("inventory_name"));
472                 assert(gnbobj["inventory_name"].IsString());
473                 std::string name = gnbobj["inventory_name"].GetString();
474                 rnib_gnblist.push_back(name);
475
476             }
477             closeSdl();
478             return;
479
480 }
481