/*
==================================================================================
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
/*
* xapp.cc
*
- * Created on: Mar, 2020
+ * Mar, 2020 (Shraboni Jana)
*/
#include "xapp.hpp"
-Xapp::Xapp(XappSettings &config, XappRmr &rmr){
- rmr_ref = &rmr;
- config_ref = &config;
- xapp_mutex = NULL;
- return;
-}
+ Xapp::Xapp(XappSettings &config, XappRmr &rmr){
+
+ rmr_ref = &rmr;
+ config_ref = &config;
+ xapp_mutex = NULL;
+ subhandler_ref = NULL;
+ return;
+ }
+
+Xapp::Xapp(XappSettings &config, XappRmr &rmr, SubscriptionHandler &sub_ref){
+ rmr_ref = &rmr;
+ config_ref = &config;
+ xapp_mutex = NULL;
+ subhandler_ref = &sub_ref;
+ set_rnib_gnblist();
+
+ return;
+ };
Xapp::~Xapp(void){
}
};
-//stop the xapp.
+//Stop the xapp. Note- To be run only from unit test scripts.
void Xapp::stop(void){
// Get the mutex lock
std::lock_guard<std::mutex> guard(*xapp_mutex);
rmr_ref->set_listen(false);
rmr_ref->~XappRmr();
+ //Detaching the threads....not sure if this is the right way to stop the receiver threads.
+ //Hence function should be called only in Unit Tests
+ int threadcnt = xapp_rcv_thread.size();
+ for(int i=0; i<threadcnt; i++){
+ xapp_rcv_thread[i].detach();
+ }
+ sleep(10);
}
-void Xapp::init() {
-
- //get rnib information
- get_rnib_gnblist();
-
-
-}
void Xapp::startup() {
- //send subscriptions and read A1 policies.
+ //send subscriptions.
startup_subscribe_requests();
- //startup_get_policies();
+
+ //read A1 policies
+ startup_get_policies();
return;
}
+void Xapp::Run(){
+ rmr_ref->set_listen(true);
+ if(xapp_mutex == NULL){
+ xapp_mutex = new std::mutex();
+ }
+ std::lock_guard<std::mutex> guard(*xapp_mutex);
+ for(int j=0; j < _callbacks.size(); j++){
+ std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(_callbacks[j]), rmr_ref);});
+ xapp_rcv_thread.push_back(std::move(th_recv));
+ }
+
+ return;
+}
+//Starting a seperate single receiver
void Xapp::start_xapp_receiver(XappMsgHandler& mp_handler){
//start a receiver thread. Can be multiple receiver threads for more than 1 listening port.
rmr_ref->set_listen(true);
- xapp_mutex = new std::mutex();
+ if(xapp_mutex == NULL){
+ xapp_mutex = new std::mutex();
+ }
mdclog_write(MDCLOG_INFO,"Receiver Thread file= %s, line=%d",__FILE__,__LINE__);
- //std::unique_ptr<XappMsgHandler> mp_handler = std::make_unique<XappMsgHandler>();
- //auto mp_handler = _callbacks[0];
std::lock_guard<std::mutex> guard(*xapp_mutex);
std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(mp_handler), rmr_ref);});
-
xapp_rcv_thread.push_back(std::move(th_recv));
-
-
return;
void Xapp::startup_subscribe_requests(void ){
+
+ bool res;
size_t data_size = ASN_BUFF_MAX_SIZE;
unsigned char data[data_size];
+ unsigned char meid[RMR_MAX_MEID];
std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
- for(auto &it: rnib_gnblist){
- int attempt = 0;
- XappMsgHandler msg = XappMsgHandler(xapp_id);
- //bool res_encode = msg.encode_subscription_request(data, &data_size);
- //if(!res_encode) exit(0);
- char *strMsg = "HelloWorld\0";
- strncpy((char *)data,strMsg,strlen(strMsg));
- data_size = sizeof(data);
+ mdclog_write(MDCLOG_INFO,"Sending subscription in file= %s, line=%d",__FILE__,__LINE__);
+
+ auto gnblist = get_rnib_gnblist();
+ int sz = gnblist.size();
+
+ for(int i = 0; i<sz; i++){
+
+ //give the message to subscription handler, along with the transmitter.
+ strcpy((char*)meid,gnblist[i].c_str());
+
+ char *strMsg = "Subscription Request from HelloWorld XApp\0";
+ strncpy((char *)data,strMsg,strlen(strMsg));
+ data_size = strlen(strMsg);
xapp_rmr_header rmr_header;
- rmr_header.message_type = RIC_SUB_RESP;
+ rmr_header.message_type = RIC_SUB_REQ;
rmr_header.payload_length = data_size;
- while(1){
+ strcpy((char*)rmr_header.meid,gnblist[i].c_str());
- auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)data);
- transmitter(); //this will go to subscription manager.
- //rmr_ref->xapp_rmr_call(&rmr_header,(char*)strMsg);
+ mdclog_write(MDCLOG_INFO,"Sending subscription in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
+ auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)data);
+
+ int res = subhandler_ref->manage_subscription_request(meid, transmitter);
+ if(res){
+ mdclog_write(MDCLOG_INFO,"Subscription SUCCESSFUL in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
- break;
}
}
+
}
void Xapp::startup_get_policies(void){
void Xapp::set_rnib_gnblist(void) {
openSdl();
+
void *result = getListGnbIds();
- if(result == NULL){
+ if(strlen((char*)result) < 1){
mdclog_write(MDCLOG_ERR, "ERROR: no data from getListGnbIds\n");
return;
}
mdclog_write(MDCLOG_INFO, "GNB List in R-NIB %s\n", (char*)result);
+
Document doc;
- doc.Parse((char*)result);
- assert(doc.HasMember("gnb_list"));
+ ParseResult parseJson = doc.Parse((char*)result);
+ if (!parseJson) {
+ std::cerr << "JSON parse error: %s (%u)", GetParseErrorFunc(parseJson.Code());
+ return;
+ }
+ if(!doc.HasMember("gnb_list")){
+ mdclog_write(MDCLOG_INFO, "JSON Has No GNB List Object");
+ return;
+ }
+ assert(doc.HasMember("gnb_list"));
const Value& gnblist = doc["gnb_list"];
- assert(gnblist.IsArray());
+ if (gnblist.IsNull())
+ return;
+
+ if(!gnblist.IsArray()){
+ mdclog_write(MDCLOG_INFO, "GNB List is not an array");
+ return;
+ }
+
+ assert(gnblist.IsArray());
for (SizeType i = 0; i < gnblist.Size(); i++) // Uses SizeType instead of size_t
{
assert(gnblist[i].IsObject());
const Value& gnbobj = gnblist[i];
assert(gnbobj.HasMember("inventory_name"));
assert(gnbobj["inventory_name"].IsString());
- rnib_gnblist.push_back(gnbobj["inventory_name"].GetString());
+ std::string name = gnbobj["inventory_name"].GetString();
+ rnib_gnblist.push_back(name);
}
closeSdl();
-
- //delete result;
return;
}