RUN apt-get install -y libhiredis-dev
#RUN apt-get install -y valgrind
+RUN mkdir /usr/local/include/nlohmann
+RUN git clone https://github.com/azadkuh/nlohmann_json_release.git
+RUN cp nlohmann_json_release/json.hpp /usr/local/include/nlohmann
+
RUN git clone https://gerrit.o-ran-sc.org/r/ric-plt/dbaas
RUN cd dbaas/redismodule && \
./autogen.sh && \
cd ${STAGE_DIR} && \
rm -rf rapidjson
+WORKDIR ${STAGE_DIR}
+## Install CPPRESTSDK
+
+RUN apt-get install -y libcpprest-dev
+RUN apt-get install -y g++ git libboost-atomic-dev libboost-thread-dev libboost-system-dev libboost-date-time-dev libboost-regex-dev libboost-filesystem-dev libboost-random-dev libboost-chrono-dev libboost-serialization-dev libwebsocketpp-dev openssl libssl-dev ninja-build zlib1g-dev
+RUN git clone https://github.com/Microsoft/cpprestsdk.git casablanca && \
+ cd casablanca && \
+ mkdir build && \
+ cd build && \
+ cmake -G Ninja .. -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTS=OFF -DBUILD_SAMPLES=OFF -DCMAKE_INSTALL_PREFIX=/usr/local .. && \
+ ninja && \
+ ninja install && \
+ cd ${STAGE_DIR}
+ #rm -rf casablanca
##-----------------------------------
# Now install the program
#------------------------------------
RUN dpkg -i /tmp/*.deb
RUN apt-get update && \
apt-get install -y libcurl3 python3 && \
- apt-get install -y libboost-all-dev cpputest libhiredis-dev valgrind && \
+ apt-get install -y libboost-all-dev cpputest libcpprest-dev libhiredis-dev valgrind && \
apt-get clean
COPY --from=ricbuild /etc/xapp/* /etc/xapp/
COPY --from=ricbuild /usr/local/bin/b_xapp_main /usr/local/bin/b_xapp_main
+COPY --from=ricbuild /usr/local/lib/libcpprest.so* /usr/local/bin/
+COPY --from=ricbuild /usr/local/lib/libcpprest.so* /usr/local/lib/
COPY --from=ricbuild /usr/local/include/rnib/*.h /usr/local/include/rnib/
COPY --from=ricbuild /usr/local/include/rnib/rnibreader.a /usr/local/include/rnib/
],
"messaging": {
"ports": [
+ {
+ "name": "http",
+ "container": "bouncer-xapp",
+ "port": 8080,
+ "description": "http service"
+ },
{
"name": "rmr-data",
"container": "bouncer-xapp",
"port": 4560,
-
- "rxMessages": ["RIC_SUB_RESP", "RIC_INDICATION"],
- "txMessages": ["RIC_SUB_REQ"],
+
+ "rxMessages": ["RIC_SUB_RESP", "RIC_INDICATION","RIC_SUB_DEL_RESP"],
+ "txMessages": ["RIC_SUB_REQ","RIC_SUB_DEL_REQ"],
"policies": [1],
"description": "rmr receive data port for Bouncer xApp"
},
"protPort": "tcp:4560",
"maxSize": 2072,
"numWorkers": 1,
- "txMessages": ["RIC_SUB_REQ"],
- "rxMessages": ["RIC_SUB_RESP", "RIC_INDICATION"],
- "policies": [1]
+ "txMessages": ["RIC_SUB_REQ","RIC_SUB_DEL_REQ"],
+ "rxMessages": ["RIC_SUB_RESP", "RIC_INDICATION","RIC_SUB_DEL_RESP"],
+ "policies": [1]
+ },
+ "http":{
+ "protPort": "tcp:8080"
+
}
}
########libs
-LIBS= -lsdl -lrmr_si -lpthread -lm $(LOG_LIBS) $(CURL_LIBS) $(RNIB_LIBS)
+LIBS= -lsdl -lrmr_si -lpthread -lm -lboost_system -lcrypto -lssl -lcpprest $(LOG_LIBS) $(CURL_LIBS) $(RNIB_LIBS)
COV_FLAGS= -fprofile-arcs -ftest-coverage
#######
*/
#include "xapp.hpp"
+#include <cpprest/http_listener.h>
+#include <cpprest/json.h>
+#include <cpprest/uri.h>
+using namespace web;
+using namespace web::http;
+using namespace web::http::experimental::listener;
+using namespace utility;
+std::vector<std::string>SubscriptionIds;
+#define TRACE(msg) wcout << msg
+
+
+void display_json(
+ json::value const & jvalue)
+{
+ cout<<"\ndisplaying REST Notification\n";
+ wcout << jvalue.serialize().c_str() << endl;
+}
+
+
+void handle_request(http_request request)
+{
+auto answer = json::value::object();
+cout<<"\nPrinting POST request content\n";
+cout<<request.to_string()<<"\n";
+ request
+ .extract_json()
+ .then([&answer](pplx::task<json::value> task) {
+ try
+ {
+ answer = task.get();
+ display_json(answer);
+ }
+ catch (http_exception const & e)
+ {
+ cout<<"\ninside catch block";
+ wcout << e.what() << endl;
+ }
+
+ })
+ .wait();
+
+ request.reply(status_codes::OK, answer);
+}
+
+void handle_post(http_request request)
+{
+ TRACE("\nhandle POST\n");
+
+ handle_request(request);
+}
+
+void handle_put(http_request request)
+{
+ TRACE("\nhandle PUT\n");
+
+ handle_request(request);
+}
+
+void start_server()
+{
+
+ utility::string_t port = U("8080");
+ utility::string_t address = U("http://0.0.0.0:");
+ address.append(port);
+ address.append(U("/ric/v1/subscriptions/response"));
+ uri_builder uri(address);
+
+ auto addr = uri.to_uri().to_string();
+ http_listener listener(addr);
+ //http_listener listener("http://localhost:8080/ric");
+ cout<<"validated uri = "<<uri::validate(addr)<<"\n";
+ ucout << utility::string_t(U("Listening for REST Notification at: ")) << addr << std::endl;
+ listener.support(methods::POST,[](http_request request) { handle_post(request);});
+ listener.support(methods::PUT,[](http_request request){ handle_put(request);});
+ try
+ {
+ listener
+ .open()
+ .then([&listener]() { })
+ .wait();
+
+ while (true);
+ }
+ catch (exception const & e)
+ {
+ wcout << e.what() << endl;
+ }
+
+}
void signalHandler( int signum ) {
cout << "Interrupt signal (" << signum << ") received.\n";
mdclog_write(MDCLOG_INFO, "Created Bouncer Xapp Instance");
//Startup E2 subscription
+ std::thread t1(std::ref(start_server));
+ t1.detach();
b_xapp->startup(sub_handler);
sleep(10);
b_xapp->start_xapp_receiver(std::ref(*mp_handler));
- sleep(1);
-
+ sleep(20);//waiting for some time before sending delete.
+ b_xapp->shutdown();//will start the sending delete procedure.
while(1){
sleep(1);
}
}
-bool subscription_delete::set_fields( subscription_helper &helper){
- unsigned int ie_index;
-
- ie_index = 0;
- RICsubscriptionDeleteRequest_IEs_t *ies_ricreq = &IE_array[ie_index];
- ies_ricreq->criticality = Criticality_reject;
- ies_ricreq->id = ProtocolIE_ID_id_RICrequestID;
- ies_ricreq->value.present = RICsubscriptionDeleteRequest_IEs__value_PR_RICrequestID;
- RICrequestID_t *ricrequest_ie = &ies_ricreq->value.choice.RICrequestID;
- ricrequest_ie->ricRequestorID = helper.get_request_id();
- //ricrequest_ie->ricRequestSequenceNumber = helper.get_req_seq();
+bool subscription_delete::set_fields( subscription_helper &helper)
+{
+ static long update_instance=0;//static variable to update ricInstaceID for sending delete req
+ unsigned int ie_index;
+ ie_index = 0;
+ RICsubscriptionDeleteRequest_IEs_t *ies_ricreq = &IE_array[ie_index];
+ ies_ricreq->criticality = Criticality_reject;
+ ies_ricreq->id = ProtocolIE_ID_id_RICrequestID;
+ ies_ricreq->value.present = RICsubscriptionDeleteRequest_IEs__value_PR_RICrequestID;
+ RICrequestID_t *ricrequest_ie = &ies_ricreq->value.choice.RICrequestID;
+ ricrequest_ie->ricRequestorID = helper.get_request_id();
+ update_instance++;//incrementing ricInstanceID by one, each time the bouncer send delete req
+ ricrequest_ie->ricInstanceID = update_instance;
+ mdclog_write(MDCLOG_INFO,"instance id for subsdelreq = %d", update_instance);
+ //ricrequest_ie->ricRequestSequenceNumber = helper.get_req_seq();
- ie_index = 1;
- RICsubscriptionDeleteRequest_IEs_t *ies_ranfunc = &IE_array[ie_index];
- ies_ranfunc->criticality = Criticality_reject;
- ies_ranfunc->id = ProtocolIE_ID_id_RANfunctionID;
- ies_ranfunc->value.present = RICsubscriptionDeleteRequest_IEs__value_PR_RANfunctionID;
- RANfunctionID_t *ranfunction_ie = &ies_ranfunc->value.choice.RANfunctionID;
- *ranfunction_ie = helper.get_function_id();
-
+ ie_index = 1;
+ RICsubscriptionDeleteRequest_IEs_t *ies_ranfunc = &IE_array[ie_index];
+ ies_ranfunc->criticality = Criticality_reject;
+ ies_ranfunc->id = ProtocolIE_ID_id_RANfunctionID;
+ ies_ranfunc->value.present = RICsubscriptionDeleteRequest_IEs__value_PR_RANfunctionID;
+ RANfunctionID_t *ranfunction_ie = &ies_ranfunc->value.choice.RANfunctionID;
+ *ranfunction_ie = helper.get_function_id();
+ mdclog_write(MDCLOG_INFO,"ran function id for subsdelreq = %d", helper.get_function_id());
+ //*ranfunction_ie =1;
- return true;
+ return true;
};
ies_ricreq->value.present = RICsubscriptionRequest_IEs__value_PR_RICrequestID;
RICrequestID_t *ricrequest_ie = &ies_ricreq->value.choice.RICrequestID;
ricrequest_ie->ricRequestorID = helper.get_request_id();
+ mdclog_write(MDCLOG_INFO,"instance id for subsreq = %d", ricrequest_ie->ricInstanceID);
//ricrequest_ie->ricRequestSequenceNumber = helper.get_req_seq();
result = ASN_SEQUENCE_ADD(&(ric_subscription->protocolIEs), &IE_array[ie_index]);
assert(result == 0);
//For processing received messages.XappMsgHandler should mention if resend is required or not.
-void XappMsgHandler::operator()(rmr_mbuf_t *message, bool *resend){
+void XappMsgHandler::operator()(rmr_mbuf_t *message, bool *resend)
+{
- if (message->len > MAX_RMR_RECV_SIZE){
+ if (message->len > MAX_RMR_RECV_SIZE)
+ {
mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
return;
}
- //a1_policy_helper helper;
+ //a1_policy_helper helper;
bool res=false;
E2AP_PDU_t* e2pdu = (E2AP_PDU_t*)calloc(1, sizeof(E2AP_PDU));
int num = 0;
- switch(message->mtype){
+ switch(message->mtype)
+ {
//need to fix the health check.
case (RIC_HEALTH_CHECK_REQ):
- message->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok
- message->sub_id = -1;
- strncpy( (char*)message->payload, "Bouncer OK\n", rmr_payload_size( message) );
- *resend = true;
- break;
+ message->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok
+ message->sub_id = -1;
+ strncpy( (char*)message->payload, "Bouncer OK\n", rmr_payload_size( message) );
+ *resend = true;
+ break;
case (RIC_SUB_RESP):
mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
- unsigned char *me_id;
- if( (me_id = (unsigned char *) malloc( sizeof( unsigned char ) * RMR_MAX_MEID )) == NULL ) {
- mdclog_write(MDCLOG_ERR, "Error : %s, %d : malloc failed for me_id", __FILE__, __LINE__);
- me_id = rmr_get_meid(message, NULL);
- } else {
- rmr_get_meid(message, me_id);
- }
- if(me_id == NULL){
- mdclog_write(MDCLOG_ERR, " Error :: %s, %d : rmr_get_meid failed me_id is NULL", __FILE__, __LINE__);
- break;
- }
- mdclog_write(MDCLOG_INFO,"RMR Received MEID: %s",me_id);
- if(_ref_sub_handler !=NULL){
- _ref_sub_handler->manage_subscription_response(message->mtype, reinterpret_cast< char const* >(me_id));
- } else {
- mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
- }
- *resend = false;
- if (me_id != NULL) {
- mdclog_write(MDCLOG_INFO, "Free RMR Received MEID memory: %s(0x%x)", me_id, me_id);
- free(me_id);
- }
+ unsigned char *me_id;
+ if( (me_id = (unsigned char *) malloc( sizeof( unsigned char ) * RMR_MAX_MEID )) == NULL )
+ {
+ mdclog_write(MDCLOG_ERR, "Error : %s, %d : malloc failed for me_id", __FILE__, __LINE__);
+ me_id = rmr_get_meid(message, NULL);
+ }
+ else
+ {
+ rmr_get_meid(message, me_id);
+ }
+ if(me_id == NULL)
+ {
+ mdclog_write(MDCLOG_ERR, " Error :: %s, %d : rmr_get_meid failed me_id is NULL", __FILE__, __LINE__);
+ break;
+ }
+ mdclog_write(MDCLOG_INFO,"RMR Received MEID: %s",me_id);
+ if(_ref_sub_handler !=NULL)
+ {
+ _ref_sub_handler->manage_subscription_response(message->mtype, reinterpret_cast< char const* >(me_id));
+ }
+ else
+ {
+ mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
+ }
+ *resend = false;
+ if (me_id != NULL)
+ {
+ mdclog_write(MDCLOG_INFO, "Free RMR Received MEID memory: %s(0x%x)", me_id, me_id);
+ free(me_id);
+ }
+ break;
+
+ case (RIC_SUB_DEL_RESP):
+ mdclog_write(MDCLOG_INFO, "Received subscription delete message of type = %d", message->mtype);
+ //unsigned char *me_id;
+ if( (me_id = (unsigned char *) malloc( sizeof( unsigned char ) * RMR_MAX_MEID )) == NULL )
+ {
+ mdclog_write(MDCLOG_ERR, "Error : %s, %d : malloc failed for me_id", __FILE__, __LINE__);
+ me_id = rmr_get_meid(message, NULL);
+ }
+ else
+
+ {
+ rmr_get_meid(message, me_id);
+ }
+ if(me_id == NULL)
+ {
+ mdclog_write(MDCLOG_ERR, " Error :: %s, %d : rmr_get_meid failed me_id is NULL", __FILE__, __LINE__);
break;
+ }
+ mdclog_write(MDCLOG_INFO,"RMR Received MEID: %s",me_id);
+ if(_ref_sub_handler !=NULL)
+ {
+ _ref_sub_handler->manage_subscription_response(message->mtype, reinterpret_cast< char const* >(me_id));
+ }
+ else
+ {
+ mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
+ }
+ *resend = false;
+ if (me_id != NULL)
+ {
+ mdclog_write(MDCLOG_INFO, "Free RMR Received MEID memory: %s(0x%x)", me_id, me_id);
+ free(me_id);
+ }
+ break;
case (RIC_INDICATION):
if(message->mtype == 12050)
- {
- mdclog_write(MDCLOG_INFO, "Decoding indication for msg = %d", message->mtype);
-
- ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, e2pdu);
- asn_transfer_syntax syntax;
- syntax = ATS_ALIGNED_BASIC_PER;
-
- mdclog_write(MDCLOG_INFO, "Data_size = %d", message->len);
-
- auto rval = asn_decode(nullptr, syntax, &asn_DEF_E2AP_PDU, (void**)&e2pdu, message->payload, message->len);
-
- if(rval.code == RC_OK)
- {
- mdclog_write(MDCLOG_INFO, "rval.code = %d ", rval.code);
- }
- else{
- mdclog_write(MDCLOG_ERR, " rval.code = %d ", rval.code);
- break;
- }
-
- asn_fprint(stdout, &asn_DEF_E2AP_PDU, e2pdu);
- mdclog_write(MDCLOG_INFO, "Received indication message of type = %d", message->mtype);
- num++;
- message->mtype = RIC_CONTROL_REQ; // if we're here we are running and all is ok
- message->sub_id = -1;
- strncpy((char*)message->payload, "Bouncer Control OK\n", rmr_payload_size(message));
- *resend = true;
- ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2pdu);
+ {
+ mdclog_write(MDCLOG_INFO, "Decoding indication for msg = %d", message->mtype);
+
+ ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, e2pdu);
+ asn_transfer_syntax syntax;
+ syntax = ATS_ALIGNED_BASIC_PER;
+
+ mdclog_write(MDCLOG_INFO, "Data_size = %d", message->len);
+
+ auto rval = asn_decode(nullptr, syntax, &asn_DEF_E2AP_PDU, (void**)&e2pdu, message->payload, message->len);
+
+ if(rval.code == RC_OK)
+ {
+ mdclog_write(MDCLOG_INFO, "rval.code = %d ", rval.code);
+ }
+ else
+ {
+ mdclog_write(MDCLOG_ERR, " rval.code = %d ", rval.code);
+ break;
+ }
+
+ asn_fprint(stdout, &asn_DEF_E2AP_PDU, e2pdu);
+ mdclog_write(MDCLOG_INFO, "Received indication message of type = %d", message->mtype);
+ num++;
+ message->mtype = RIC_CONTROL_REQ; // if we're here we are running and all is ok
+ message->sub_id = -1;
+ strncpy((char*)message->payload, "Bouncer Control OK\n", rmr_payload_size(message));
+ *resend = true;
+ ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2pdu);
}
- mdclog_write(MDCLOG_INFO, "Number of Indications Received = %d", num);
- break;
+ mdclog_write(MDCLOG_INFO, "Number of Indications Received = %d", num);
+ break;
- /*case A1_POLICY_REQ:
+ /*case A1_POLICY_REQ:
- mdclog_write(MDCLOG_INFO, "In Message Handler: Received A1_POLICY_REQ.");
+ mdclog_write(MDCLOG_INFO, "In Message Handler: Received A1_POLICY_REQ.");
helper.handler_id = xapp_id;
res = a1_policy_handler((char*)message->payload, &message->len, helper);
- if(res){
+ if(res)
+ {
message->mtype = A1_POLICY_RESP; // if we're here we are running and all is ok
message->sub_id = -1;
*resend = true;
}
break;*/
- default:
+ default:
{
mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
*resend = false;
// Handles subscription responses
-void SubscriptionHandler::manage_subscription_response(int message_type, transaction_identifier id){
+void SubscriptionHandler::manage_subscription_response(int message_type, transaction_identifier id)
+{
// Make This Thread sleep for 1 Second
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- {
- std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
- mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for meid %s WAS: %d",id.c_str(),this->get_request_status(id));
-
- //from the message type we can know if its a success/failure etc.
- if(message_type==RIC_SUB_RESP)
- this->set_request_status(id, request_success);
-
- if(message_type==RIC_SUB_FAILURE)
- this->set_request_status(id,request_failed);
-
- mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for meid %s IS: %d",id.c_str(),this->get_request_status(id));
-
-
- //this->print_subscription_status();
- }
- //_cv.get()->notify_all();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ {
+ std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
+ mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for meid %s WAS: %d",id.c_str(),this->get_request_status(id));
+
+ //from the message type we can know if its a success/failure etc.
+ if(message_type==RIC_SUB_RESP)
+ this->set_request_status(id, request_success);
+
+ if(message_type==RIC_SUB_DEL_RESP)
+ this->set_request_status(id, request_success);
+
+ if(message_type==RIC_SUB_FAILURE)
+ this->set_request_status(id,request_failed);
+
+ if(message_type==RIC_SUB_DEL_FAILURE)
+ this->set_request_status(id,request_failed);
+
+ mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for meid %s IS: %d",id.c_str(),this->get_request_status(id));
+
+
+ //this->print_subscription_status();
+ }
+ //_cv.get()->notify_all();
}
#define SUBSCR_ERR_FAIL -3
#define SUBSCR_ERR_UNKNOWN -4
#define SUBSCR_ERR_DUPLICATE -5
-
+#define SUBSCR_ERR_NOT_FOUND -6
using namespace std;
class TransmitterBase
// put entry in request table
{
std::lock_guard<std::mutex> lock(*(_data_lock.get()));
-
res = add_request_entry(rmr_trans_id, request_pending);
if(! res){
// acquire lock ...
std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
-
// Send the message
bool flg = tx();
-
if (!flg){
// clear state
delete_request_entry(rmr_trans_id);
return res;
};
+template<typename AppTransmitter>
+int SubscriptionHandler:: manage_subscription_delete_request(transaction_identifier rmr_trans_id, AppTransmitter && tx)
+{
+ int res;
+ // delete entry in request table
+ {
+ std::lock_guard<std::mutex> lock(*(_data_lock.get()));
+ res = delete_request_entry(rmr_trans_id);
+ mdclog_write(MDCLOG_INFO,"res=%d",res);
+ if(! res)
+ {
+ mdclog_write(MDCLOG_ERR, "%s : Error deleting new subscription request %s from queue because request with key doesn't present", __FILE__, __LINE__);
+
+ return SUBSCR_ERR_NOT_FOUND;
+ }
+
+ }
+
+
+ // acquire lock ...
+ std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
+ // Send the message
+ bool flg = tx();
+
+ if (!flg)
+ {
+ // add state
+ res = add_request_entry(rmr_trans_id, request_pending);
+ mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription delete request %s", __FILE__, __LINE__, rmr_trans_id.c_str());
+ return SUBSCR_ERR_TX;
+ }
+ else
+ {
+ mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription delete request for trans_id %s", __FILE__, __LINE__, rmr_trans_id.c_str());
+
+ }
+
+ // record time stamp ..
+ auto start = std::chrono::system_clock::now();
+ std::chrono::milliseconds t_out(_time_out);
+
+ //the wait functionality has been removed.
+
+
+ _local_lock.unlock();
+ // std::cout <<"Returning res = " << res << " for request = " << rmr_trans_id << std::endl;
+ return res;
+};
#endif
*/
#include "xapp.hpp"
+#include <nlohmann/json.hpp>
+#include <iostream>
+#include<string>
+#include <cpprest/http_client.h>
+#include <cpprest/filestream.h>
+#include <cpprest/uri.h>
+#include <cpprest/json.h>
+using namespace utility;
+using namespace web;
+using namespace web::http;
+using namespace web::http::client;
+using namespace concurrency::streams;
+using jsonn = nlohmann::json;
#define BUFFER_SIZE 1024
-
+extern std::vector<std::string>SubscriptionIds;
Xapp::Xapp(XappSettings &config, XappRmr &rmr){
rmr_ref = &rmr;
}
void Xapp::shutdown(){
- return;
+
+ sleep(70);
+ //send subscriptions delete.
+ shutdown_subscribe_deletes();
+ return;
}
+void Xapp::shutdown_subscribe_deletes(void )
+{
+
+ bool res;
+ size_t data_size = ASN_BUFF_MAX_SIZE;
+ unsigned char data[data_size];
+ //unsigned char meid[RMR_MAX_MEID];
+ char meid[RMR_MAX_MEID];
+ std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
+
+ mdclog_write(MDCLOG_INFO,"Preparing to send subscription Delete in file= %s, line=%d",__FILE__,__LINE__);
+
+ auto gnblist = get_rnib_gnblist();
+
+ int sz = gnblist.size();
+ mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
+
+ if(sz <= 0)
+ mdclog_write(MDCLOG_INFO,"Subscriptions Delete cannot be sent as GNBList in RNIB is NULL");
+
+ for(int i = 0; i<sz; i++)
+ {
+ sleep(15);
+ //give the message to subscription handler, along with the transmitter.
+ strcpy((char*)meid,gnblist[i].c_str());
+ mdclog_write(MDCLOG_INFO,"sending %d subscription delete request out of : %d",i+1, sz);
+ mdclog_write(MDCLOG_INFO,"sending subscription delete to ,meid = %s", meid);
+
+ if (SubscriptionIds.size()>0)
+ {
+ auto delJson = pplx::create_task([i,meid]() {
+ utility::string_t port = U("8088");
+ utility::string_t address = U("http://service-ricplt-submgr-http.ricplt.svc.cluster.local:");
+ address.append(port);
+ address.append(U("/ric/v1/subscriptions/"));
+ address.append( utility::string_t(SubscriptionIds.back()));
+ SubscriptionIds.pop_back();
+ uri_builder uri(address);
+ auto addr = uri.to_uri().to_string();
+ http_client client(addr);
+ ucout << utility::string_t(U("making requests at: ")) << addr <<std::endl;
+ return client.request(methods::DEL);
+
+
+ })
+
+ // Get the response.
+ .then([](http_response response) {
+ // Check the status code.
+ if (response.status_code() != 204) {
+ throw std::runtime_error("Returned " + std::to_string(response.status_code()));
+ }
+
+ // Convert the response body to JSON object.
+ std::wcout << "Deleted: " << std::boolalpha << (response.status_code() == 204) << std::endl;
+ });
+
+ // serailize the user details.
+
+
+ try {
+ delJson.wait();
+ }
+ catch (const std::exception& e) {
+ printf("Error exception:%s\n", e.what());
+ }
+
+ }
+
+ else{
+ mdclog_write(MDCLOG_ERR,"Subscription delete cannot send in file= %s, line=%d for MEID %s as no valid subIDS",__FILE__,__LINE__, meid);
+ }
+
+ /*
+
+
+ subscription_helper din;
+ subscription_helper dout;
+
+ subscription_delete sub_del;
+ subscription_delete sub_recv;
+
+
+ unsigned char buf[BUFFER_SIZE];
+ size_t buf_size = BUFFER_SIZE;
+ bool res;
+
+
+ //Random Data for request
+ int request_id = 1;
+ int function_id = 1;
+
+ din.set_request(request_id);
+ din.set_function_id(function_id);
+
+ res = sub_del.encode_e2ap_subscription(&buf[0], &buf_size, din);
+
+ mdclog_write(MDCLOG_INFO,"Sending subscription delete in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
+
+ xapp_rmr_header rmr_header;
+ rmr_header.message_type = RIC_SUB_DEL_REQ;
+ rmr_header.payload_length = buf_size; //data_size
+
+ strcpy((char*)rmr_header.meid,gnblist[i].c_str());
+ auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)buf); //(void*)data)
+ if (subhandler_ref)
+ {
+ mdclog_write(MDCLOG_INFO,"subhandler_ref is valid pointer");
+ }
+ else
+ {
+ mdclog_write(MDCLOG_INFO,"subhandler_ref is invalid pointer");
+ }
+ int result = subhandler_ref->manage_subscription_delete_request(gnblist[i], transmitter);
+
+ if(result==SUBSCR_SUCCESS)
+ {
+
+ mdclog_write(MDCLOG_INFO,"Subscription Delete SUCCESSFUL in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
+ }
+ else
+ {
+ mdclog_write(MDCLOG_ERR,"Subscription Delete FAILED in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
+ }
+ */
+ }
+}
void Xapp::startup_subscribe_requests(void ){
-
bool res;
size_t data_size = ASN_BUFF_MAX_SIZE;
unsigned char data[data_size];
- unsigned char meid[RMR_MAX_MEID];
+ char meid[RMR_MAX_MEID];
std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
-
+ //int a =std::stoi(xapp_id);
mdclog_write(MDCLOG_INFO,"Preparing to send subscription in file= %s, line=%d",__FILE__,__LINE__);
auto gnblist = get_rnib_gnblist();
int sz = gnblist.size();
-
+ mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
if(sz <= 0)
mdclog_write(MDCLOG_INFO,"Subscriptions cannot be sent as GNBList in RNIB is NULL");
for(int i = 0; i<sz; i++)
{
- sleep(15);
+ sleep(15);
+ strcpy((char*)meid,gnblist[i].c_str());
+ mdclog_write(MDCLOG_INFO,"sending %d subscription request out of : %d",i+1, sz);
+
+ //mdclog_write(MDCLOG_INFO,"GNBList,gnblist[i] = %s and ith val = %d", gnblist[i], i);
+ mdclog_write(MDCLOG_INFO,"sending subscription to ,meid = %s", meid);
+
+auto postJson = pplx::create_task([meid,xapp_id]() {
+
+
+ jsonn jsonObject;
+ jsonObject =
+ {
+
+
+
+ {"SubscriptionId",""},
+ {"ClientEndpoint",{{"Host","service-ricxapp-bouncer-xapp-http.ricxapp"},{"HTTPPort",8080},{"RMRPort",4560}}},
+ {"Meid",meid},
+ {"RANFunctionID",0},
+ {"SubscriptionDetails",
+ {
+ {
+ {"XappEventInstanceId",12345},{"EventTriggers",{0}},
+ {"ActionToBeSetupList",
+ {
+ {
+ {"ActionID",1},{"ActionType","report"},{"ActionDefinition",{0}},{"SubsequentAction",{{"SubsequentActionType","continue"},{"TimeToWait","zero"}}}
+ }
+ }
+ }
+ }
+ }
+ }
+
+ };
+ std::cout <<jsonObject.dump(4) << "\n";
+ utility::stringstream_t s;
+ s << jsonObject.dump().c_str();
+ web::json::value ret = json::value::parse(s);
+ // std::wcout << ret.serialize().c_str() << std::endl;
+ utility::string_t port = U("8088");
+ utility::string_t address = U("http://service-ricplt-submgr-http.ricplt.svc.cluster.local:");
+ address.append(port);
+ address.append(U("/ric/v1/subscriptions"));
+ uri_builder uri(address);
+ auto addr = uri.to_uri().to_string();
+ http_client client(addr);
+ //std::cout<<uri::validate(addr)<<" validation \n";
+ ucout << utility::string_t(U("making requests at: ")) << addr << "\n";
+ return client.request(methods::POST,U("/"),ret.serialize(),U("application/json"));
+ })
+
+ // Get the response.
+ .then([](http_response response) {
+ // Check the status code.
+ if (response.status_code() != 201) {
+ throw std::runtime_error("Returned " + std::to_string(response.status_code()));
+ }
+
+ // Convert the response body to JSON object.
+ return response.extract_json();
+ })
+
+ // serailize the user details.
+ .then([](json::value jsonObject) {
+ std::cout<<"\nRecieved REST subscription response\n";
+ std::wcout << jsonObject.serialize().c_str() << "\n";
+ std::string tmp;
+ tmp=jsonObject[U("SubscriptionId")].as_string();
+ SubscriptionIds.push_back(tmp);
+
+ });
+
+ try {
+ postJson.wait();
+ }
+ catch (const std::exception& e) {
+ printf("Error exception:%s\n", e.what());
+ }
+
+
+ /*
//give the message to subscription handler, along with the transmitter.
strcpy((char*)meid,gnblist[i].c_str());
mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
-
+ mdclog_write(MDCLOG_INFO,"sending %d subscription request out of : %d",i+1, sz);
subscription_helper din;
subscription_helper dout;
res = sub_req.encode_e2ap_subscription(&buf[0], &buf_size, din);
- //mdclog_write(MDCLOG_INFO,"GNBList = %s and ith val = %d", gnblist[i], i);
+ //mdclog_write(MDCLOG_INFO,"GNBList = %s and ith val = %d", gnblist[i], i);
mdclog_write(MDCLOG_INFO,"Sending subscription in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
}
else {
mdclog_write(MDCLOG_ERR,"Subscription FAILED in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
- }
- }
+ }
+ */
+ }
+ std::cout<<"\n SubscriptionIds vector size= "<<SubscriptionIds.size()<<"\n";
}
void Xapp::startup_get_policies(void){
#include "rapidjson/writer.h"
#include "rapidjson/document.h"
#include "rapidjson/error/error.h"
-
+#include<thread>
#include "msgs_proc.hpp"
#include "subs_mgmt.hpp"
#include "xapp_config.hpp"