2 # ==================================================================================
3 # Copyright (c) 2020 HCL Technologies Limited.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
20 #include <nlohmann/json.hpp>
23 #include <cpprest/http_client.h>
24 #include <cpprest/filestream.h>
25 #include <cpprest/uri.h>
26 #include <cpprest/json.h>
27 using namespace utility;
29 using namespace web::http;
30 using namespace web::http::client;
31 using namespace concurrency::streams;
32 using jsonn = nlohmann::json;
33 #define BUFFER_SIZE 1024
34 extern std::vector<std::string>SubscriptionIds;
35 Xapp::Xapp(XappSettings &config, XappRmr &rmr){
40 subhandler_ref = NULL;
47 int threadcnt = xapp_rcv_thread.size();
48 for(int i=0; i<threadcnt; i++){
49 if(xapp_rcv_thread[i].joinable())
50 xapp_rcv_thread[i].join();
52 xapp_rcv_thread.clear();
60 //Stop the xapp. Note- To be run only from unit test scripts.
61 void Xapp::stop(void){
63 std::lock_guard<std::mutex> guard(*xapp_mutex);
64 rmr_ref->set_listen(false);
67 //Detaching the threads....not sure if this is the right way to stop the receiver threads.
68 //Hence function should be called only in Unit Tests
69 int threadcnt = xapp_rcv_thread.size();
70 for(int i=0; i<threadcnt; i++){
71 xapp_rcv_thread[i].detach();
76 void Xapp::startup(SubscriptionHandler &sub_ref) {
78 subhandler_ref = &sub_ref;
84 startup_subscribe_requests();
87 //startup_get_policies();
91 rmr_ref->set_listen(true);
92 if(xapp_mutex == NULL){
93 xapp_mutex = new std::mutex();
95 std::lock_guard<std::mutex> guard(*xapp_mutex);
97 for(int j=0; j < _callbacks.size(); j++){
98 std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(_callbacks[j]), rmr_ref);});
99 xapp_rcv_thread.push_back(std::move(th_recv));
105 //Starting a seperate single receiver
106 void Xapp::start_xapp_receiver(XappMsgHandler& mp_handler){
107 //start a receiver thread. Can be multiple receiver threads for more than 1 listening port.
108 rmr_ref->set_listen(true);
109 if(xapp_mutex == NULL){
110 xapp_mutex = new std::mutex();
113 mdclog_write(MDCLOG_INFO,"Receiver Thread file= %s, line=%d",__FILE__,__LINE__);
114 std::lock_guard<std::mutex> guard(*xapp_mutex);
115 std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(mp_handler), rmr_ref);});
116 xapp_rcv_thread.push_back(std::move(th_recv));
120 void Xapp::shutdown(){
123 //send subscriptions delete.
124 shutdown_subscribe_deletes();
128 void Xapp::shutdown_subscribe_deletes(void )
132 size_t data_size = ASN_BUFF_MAX_SIZE;
133 unsigned char data[data_size];
134 //unsigned char meid[RMR_MAX_MEID];
135 char meid[RMR_MAX_MEID];
136 std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
138 mdclog_write(MDCLOG_INFO,"Preparing to send subscription Delete in file= %s, line=%d",__FILE__,__LINE__);
140 auto gnblist = get_rnib_gnblist();
142 int sz = gnblist.size();
143 mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
146 mdclog_write(MDCLOG_INFO,"Subscriptions Delete cannot be sent as GNBList in RNIB is NULL");
148 for(int i = 0; i<sz; i++)
151 //give the message to subscription handler, along with the transmitter.
152 strcpy((char*)meid,gnblist[i].c_str());
153 mdclog_write(MDCLOG_INFO,"sending %d subscription delete request out of : %d",i+1, sz);
154 mdclog_write(MDCLOG_INFO,"sending subscription delete to ,meid = %s", meid);
156 if (SubscriptionIds.size()>0)
158 auto delJson = pplx::create_task([i,meid]() {
159 utility::string_t port = U("8088");
160 utility::string_t address = U("http://service-ricplt-submgr-http.ricplt.svc.cluster.local:");
161 address.append(port);
162 address.append(U("/ric/v1/subscriptions/"));
163 address.append( utility::string_t(SubscriptionIds.back()));
164 SubscriptionIds.pop_back();
165 uri_builder uri(address);
166 auto addr = uri.to_uri().to_string();
167 http_client client(addr);
168 ucout << utility::string_t(U("making requests at: ")) << addr <<std::endl;
169 return client.request(methods::DEL);
175 .then([](http_response response) {
176 // Check the status code.
177 if (response.status_code() != 204) {
178 throw std::runtime_error("Returned " + std::to_string(response.status_code()));
181 // Convert the response body to JSON object.
182 std::wcout << "Deleted: " << std::boolalpha << (response.status_code() == 204) << std::endl;
185 // serailize the user details.
191 catch (const std::exception& e) {
192 printf("Error exception:%s\n", e.what());
198 mdclog_write(MDCLOG_ERR,"Subscription delete cannot send in file= %s, line=%d for MEID %s as no valid subIDS",__FILE__,__LINE__, meid);
204 subscription_helper din;
205 subscription_helper dout;
207 subscription_delete sub_del;
208 subscription_delete sub_recv;
211 unsigned char buf[BUFFER_SIZE];
212 size_t buf_size = BUFFER_SIZE;
216 //Random Data for request
220 din.set_request(request_id);
221 din.set_function_id(function_id);
223 res = sub_del.encode_e2ap_subscription(&buf[0], &buf_size, din);
225 mdclog_write(MDCLOG_INFO,"Sending subscription delete in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
227 xapp_rmr_header rmr_header;
228 rmr_header.message_type = RIC_SUB_DEL_REQ;
229 rmr_header.payload_length = buf_size; //data_size
231 strcpy((char*)rmr_header.meid,gnblist[i].c_str());
232 auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)buf); //(void*)data)
235 mdclog_write(MDCLOG_INFO,"subhandler_ref is valid pointer");
239 mdclog_write(MDCLOG_INFO,"subhandler_ref is invalid pointer");
241 int result = subhandler_ref->manage_subscription_delete_request(gnblist[i], transmitter);
243 if(result==SUBSCR_SUCCESS)
246 mdclog_write(MDCLOG_INFO,"Subscription Delete SUCCESSFUL in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
250 mdclog_write(MDCLOG_ERR,"Subscription Delete FAILED in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
256 void Xapp::startup_subscribe_requests(void ){
258 size_t data_size = ASN_BUFF_MAX_SIZE;
259 unsigned char data[data_size];
260 char meid[RMR_MAX_MEID];
261 std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
262 //int a =std::stoi(xapp_id);
263 mdclog_write(MDCLOG_INFO,"Preparing to send subscription in file= %s, line=%d",__FILE__,__LINE__);
265 auto gnblist = get_rnib_gnblist();
267 int sz = gnblist.size();
268 mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
270 mdclog_write(MDCLOG_INFO,"Subscriptions cannot be sent as GNBList in RNIB is NULL");
272 for(int i = 0; i<sz; i++)
275 strcpy((char*)meid,gnblist[i].c_str());
276 mdclog_write(MDCLOG_INFO,"sending %d subscription request out of : %d",i+1, sz);
278 //mdclog_write(MDCLOG_INFO,"GNBList,gnblist[i] = %s and ith val = %d", gnblist[i], i);
279 mdclog_write(MDCLOG_INFO,"sending subscription to ,meid = %s", meid);
281 auto postJson = pplx::create_task([meid,xapp_id]() {
290 {"SubscriptionId",""},
291 {"ClientEndpoint",{{"Host","service-ricxapp-bouncer-xapp-http.ricxapp"},{"HTTPPort",8080},{"RMRPort",4560}}},
294 {"SubscriptionDetails",
297 {"XappEventInstanceId",12345},{"EventTriggers",{0}},
298 {"ActionToBeSetupList",
301 {"ActionID",1},{"ActionType","report"},{"ActionDefinition",{0}},{"SubsequentAction",{{"SubsequentActionType","continue"},{"TimeToWait","zero"}}}
310 std::cout <<jsonObject.dump(4) << "\n";
311 utility::stringstream_t s;
312 s << jsonObject.dump().c_str();
313 web::json::value ret = json::value::parse(s);
314 // std::wcout << ret.serialize().c_str() << std::endl;
315 utility::string_t port = U("8088");
316 utility::string_t address = U("http://service-ricplt-submgr-http.ricplt.svc.cluster.local:");
317 address.append(port);
318 address.append(U("/ric/v1/subscriptions"));
319 uri_builder uri(address);
320 auto addr = uri.to_uri().to_string();
321 http_client client(addr);
322 //std::cout<<uri::validate(addr)<<" validation \n";
323 ucout << utility::string_t(U("making requests at: ")) << addr << "\n";
324 return client.request(methods::POST,U("/"),ret.serialize(),U("application/json"));
328 .then([](http_response response) {
329 // Check the status code.
330 if (response.status_code() != 201) {
331 throw std::runtime_error("Returned " + std::to_string(response.status_code()));
334 // Convert the response body to JSON object.
335 return response.extract_json();
338 // serailize the user details.
339 .then([](json::value jsonObject) {
340 std::cout<<"\nRecieved REST subscription response\n";
341 std::wcout << jsonObject.serialize().c_str() << "\n";
343 tmp=jsonObject[U("SubscriptionId")].as_string();
344 SubscriptionIds.push_back(tmp);
351 catch (const std::exception& e) {
352 printf("Error exception:%s\n", e.what());
357 //give the message to subscription handler, along with the transmitter.
358 strcpy((char*)meid,gnblist[i].c_str());
360 mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
361 mdclog_write(MDCLOG_INFO,"sending %d subscription request out of : %d",i+1, sz);
362 subscription_helper din;
363 subscription_helper dout;
365 subscription_request sub_req;
366 subscription_request sub_recv;
368 unsigned char buf[BUFFER_SIZE];
369 size_t buf_size = BUFFER_SIZE;
373 //Random Data for request
376 std::string event_def = "01";
378 din.set_request(request_id);
379 din.set_function_id(function_id);
380 din.set_event_def(event_def.c_str(), event_def.length());
382 std::string act_def = "01";
384 din.add_action(1,0,(void*)act_def.c_str(), act_def.length(), 0);
386 res = sub_req.encode_e2ap_subscription(&buf[0], &buf_size, din);
388 //mdclog_write(MDCLOG_INFO,"GNBList = %s and ith val = %d", gnblist[i], i);
390 mdclog_write(MDCLOG_INFO,"Sending subscription in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
392 xapp_rmr_header rmr_header;
393 rmr_header.message_type = RIC_SUB_REQ;
394 rmr_header.payload_length = buf_size; //data_size
396 strcpy((char*)rmr_header.meid,gnblist[i].c_str());
398 auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)buf); //(void*)data);
400 int result = subhandler_ref->manage_subscription_request(gnblist[i], transmitter);
402 if(result==SUBSCR_SUCCESS){
404 mdclog_write(MDCLOG_INFO,"Subscription SUCCESSFUL in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
407 mdclog_write(MDCLOG_ERR,"Subscription FAILED in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
411 std::cout<<"\n SubscriptionIds vector size= "<<SubscriptionIds.size()<<"\n";
414 void Xapp::startup_get_policies(void){
416 int policy_id = BOUNCER_POLICY_ID;
418 std::string policy_query = "{\"policy_type_id\":" + std::to_string(policy_id) + "}";
419 unsigned char * message = (unsigned char *)calloc(policy_query.length(), sizeof(unsigned char));
420 memcpy(message, policy_query.c_str(), policy_query.length());
421 xapp_rmr_header header;
422 header.payload_length = policy_query.length();
423 header.message_type = A1_POLICY_QUERY;
424 mdclog_write(MDCLOG_INFO, "Sending request for policy id %d\n", policy_id);
425 rmr_ref->xapp_rmr_send(&header, (void *)message);
430 void Xapp::set_rnib_gnblist(void) {
434 void *result = getListGnbIds();
435 if(strlen((char*)result) < 1){
436 mdclog_write(MDCLOG_ERR, "ERROR: no data from getListGnbIds\n");
440 mdclog_write(MDCLOG_INFO, "GNB List in R-NIB %s\n", (char*)result);
444 ParseResult parseJson = doc.Parse<kParseStopWhenDoneFlag>((char*)result);
446 std::cerr << "JSON parse error: %s (%u)", GetParseErrorFunc(parseJson.Code());
450 if(!doc.HasMember("gnb_list")){
451 mdclog_write(MDCLOG_INFO, "JSON Has No GNB List Object");
454 assert(doc.HasMember("gnb_list"));
456 const Value& gnblist = doc["gnb_list"];
457 if (gnblist.IsNull())
460 if(!gnblist.IsArray()){
461 mdclog_write(MDCLOG_INFO, "GNB List is not an array");
466 assert(gnblist.IsArray());
467 for (SizeType i = 0; i < gnblist.Size(); i++) // Uses SizeType instead of size_t
469 assert(gnblist[i].IsObject());
470 const Value& gnbobj = gnblist[i];
471 assert(gnbobj.HasMember("inventory_name"));
472 assert(gnbobj["inventory_name"].IsString());
473 std::string name = gnbobj["inventory_name"].GetString();
474 rnib_gnblist.push_back(name);