From b85024cd183a527cd8d61353637850cb6d30cf36 Mon Sep 17 00:00:00 2001 From: sjana Date: Tue, 24 Mar 2020 13:57:25 -0400 Subject: [PATCH] Incorporating RMR Health check code Issue-ID: RICAPP-77 Signed-off-by: sjana Change-Id: I49ac33498309e971b684024fb1edde3e4f3de1f9 --- src/Makefile | 17 ++++--- src/hw_xapp_main.cc | 62 +++++++++++++------------ src/routes.txt | 0 src/run_xapp.sh | 18 -------- src/xapp-asn/e2sm/e2sm_helpers.hpp | 17 ------- src/xapp-mgmt/msgs_proc.cc | 15 +++--- src/xapp-mgmt/msgs_proc.hpp | 2 +- src/xapp-mgmt/xapp_handler.hpp | 21 +++++++++ src/xapp-utils/xapp_rmr.cc | 82 ++++++++++++++++++++++++--------- src/xapp-utils/xapp_rmr.hpp | 94 ++++++++++++++++++++++++++++---------- src/xapp-utils/xapp_sdl.cc | 54 ++++++++++++++++------ src/xapp-utils/xapp_sdl.hpp | 16 +++---- src/xapp.cc | 71 +++++++++++++++------------- src/xapp.hpp | 41 +++++++++++++---- 14 files changed, 317 insertions(+), 193 deletions(-) mode change 100644 => 100755 src/routes.txt create mode 100644 src/xapp-mgmt/xapp_handler.hpp diff --git a/src/Makefile b/src/Makefile index 700b0d5..aebc506 100755 --- a/src/Makefile +++ b/src/Makefile @@ -1,5 +1,5 @@ -CXX:= g++ --std=c++14 -O2 -CC:= gcc -O2 +CXX:= g++ --std=c++14 -O2 -L/usr/local/lib +CC:= gcc -O2 -L/usr/local/lib SRC:=./ HWSRC:=./ @@ -14,9 +14,11 @@ E2SMSRC:=./xapp-asn/e2sm CLOGFLAGS:= `pkg-config mdclog --cflags` LOG_LIBS:= `pkg-config mdclog --libs` CURL_LIBS:= `pkg-config libcurl --libs` - +RNIB_LIBS:= -pthread /usr/local/lib/rnibreader.a ######## Keep include dirs separate so we have transparency -BASEFLAGS= -Wall -std=c++14 $(CLOGFLAGS) + + +BASEFLAGS= -Wall -std=c++14 $(CLOGFLAGS) C_BASEFLAGS= -Wall $(CLOGFLAGS) -DASN_DISABLE_OER_SUPPORT XAPPFLAGS= -I./ @@ -29,7 +31,8 @@ E2APFLAGS = -I$(E2APSRC) E2SMFLAGS = -I$(E2SMSRC) ########libs -LIBS= -lsdl -lrmr_nng -lnng -lpthread -lm $(LOG_LIBS) $(CURL_LIBS) + +LIBS= -lsdl -lrmr_nng -lnng -lpthread -lm $(LOG_LIBS) $(CURL_LIBS) $(RNIB_LIBS) COV_FLAGS= -fprofile-arcs -ftest-coverage ####### @@ -67,10 +70,10 @@ OBJ= $(HWXAPP_OBJ) $(UTIL_OBJ) $(MSG_OBJ) $(ASN1C_MODULES) $(E2AP_OBJ) $(E2SM_O print-% : ; @echo $* = $($*) hw_xapp_main: $(OBJ) - $(CXX) -o $@ $(OBJ) $(LIBS) $(CPPFLAGS) $(CLOGFLAGS) + $(CXX) -o $@ $(OBJ) $(LIBS) $(RNIBFLAGS) $(CPPFLAGS) $(CLOGFLAGS) install: hw_xapp_main install -D hw_xapp_main /usr/local/bin/hw_xapp_main clean: - -rm *.o $(ASNSRC)/*.o $(E2APSRC)/*.o $(UTILSRC)/*.o $(E2SMSRC)/*.o $(MSGSRC)/*.o $(SRC)/*.o hw_xapp_main + -rm *.o $(ASNSRC)/*.o $(E2APSRC)/*.o $(UTILSRC)/*.o $(E2SMSRC)/*.o $(MSGSRC)/*.o $(SRC)/*.o hw_xapp_main diff --git a/src/hw_xapp_main.cc b/src/hw_xapp_main.cc index 3c45e6a..624d88f 100644 --- a/src/hw_xapp_main.cc +++ b/src/hw_xapp_main.cc @@ -22,8 +22,6 @@ */ #include "xapp.hpp" -#include "subscription_request.hpp" -#include "xapp_sdl.hpp" void signalHandler( int signum ) { cout << "Interrupt signal (" << signum << ") received.\n"; @@ -32,6 +30,15 @@ void signalHandler( int signum ) { int main(int argc, char *argv[]){ + // Get the thread id + std::thread::id my_id = std::this_thread::get_id(); + std::stringstream thread_id; + std::stringstream ss; + + thread_id << my_id; + + mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str()); + //get configuration XappSettings config; //change the priority depending upon application requirement @@ -39,52 +46,49 @@ int main(int argc, char *argv[]){ config.loadEnvVarSettings(); config.loadCmdlineSettings(argc, argv); + //Register signal handler to stop + signal(SIGINT, signalHandler); + signal(SIGTERM, signalHandler); + //getting the listening port and xapp name info std::string port = config[XappSettings::SettingName::HW_PORTS]; std::string name = config[XappSettings::SettingName::XAPP_NAME]; - //initialize rmr std::unique_ptr rmr; - rmr = std::make_unique(name,port); + rmr = std::make_unique(port); rmr->xapp_rmr_init(); - //Register signal handler to stop - signal(SIGINT, signalHandler); - signal(SIGTERM, signalHandler); - - //Test SDL. - XappSDL sdl = XappSDL("hw-xapp"); - - //Initiate the Xapp functionality - std::unique_ptr hw_xapp = std::make_unique(std::ref(config), std::ref(*rmr),std::ref(sdl)); + std::unique_ptr hw_xapp = std::make_unique(std::ref(config),std::ref(*rmr)); - //define the startup mode. - hw_xapp->startup(); + //register MsgHandler plugin for a received rmr_buffer + std::unique_ptr mp_handler = std::make_unique(); + hw_xapp->register_handler(std::bind(&XappMsgHandler::operator (),mp_handler.get(),std::placeholders::_1,std::placeholders::_2)); - //Register Callback Handlers - //Register E2 Msg Handlers - Subscription/Indication. - //Register A1 Msg Handlers. - //Register Callback Handlers + rmr->set_listen(true); + hw_xapp->start_xapp_receiver(std::ref(*mp_handler)); + sleep(5); - //start the receiver thread listening at HW_PORT - //currently only one receiver thread. In total how many receiver threads depend on the xapp developer. - //Register all the handlers required and start the receiver - //register_msgproc(RIC_SUB_RESP, sub_handler); - //register_msgproc(RIC_SUB_DEL_RESP, sub_handler); - //register_msgproc(RIC_SUB_FAILURE, sub_handler); + //Delete all subscriptions if any based on Xapp Mode. + //xapp->shutdown(); + xapp_rmr_header hdr; + hdr.message_type = RIC_HEALTH_CHECK_REQ; - hw_xapp->start_xapp_receiver(); - sleep(5); + char *strMsg = "HelloWorld: RMR Health Check\0"; + clock_gettime(CLOCK_REALTIME, &(hdr.ts)); + hdr.payload_length = strlen(strMsg); - //Delete all subscriptions if any based on Xapp Mode. - //xapp->shutdown(); + bool res = rmr->xapp_rmr_send(&hdr,(void*)strMsg); + if (!res){ + std::cout << "Xapp RMR Send Failure"; + } + usleep(10); while(1){ sleep(1); } diff --git a/src/routes.txt b/src/routes.txt old mode 100644 new mode 100755 diff --git a/src/run_xapp.sh b/src/run_xapp.sh index 1eff6b1..460e59a 100755 --- a/src/run_xapp.sh +++ b/src/run_xapp.sh @@ -1,23 +1,5 @@ #! /bin/bash -/* -================================================================================== - Copyright (c) 2018-2019 AT&T Intellectual Property. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -================================================================================== - */ -export RMR_SEED_RT="routes.txt" export RMR_RTG_SVC="9999" export XAPP_NAME="HELLOWORLD_XAPP" export HW_PORTS="4560" diff --git a/src/xapp-asn/e2sm/e2sm_helpers.hpp b/src/xapp-asn/e2sm/e2sm_helpers.hpp index ae7a481..cee7b53 100644 --- a/src/xapp-asn/e2sm/e2sm_helpers.hpp +++ b/src/xapp-asn/e2sm/e2sm_helpers.hpp @@ -1,21 +1,4 @@ -/* -================================================================================== - Copyright (c) 2018-2019 AT&T Intellectual Property. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -================================================================================== - */ #ifndef E2SM_HELPER_ #define E2SM_HELPER_ diff --git a/src/xapp-mgmt/msgs_proc.cc b/src/xapp-mgmt/msgs_proc.cc index d20ef76..3b5223d 100644 --- a/src/xapp-mgmt/msgs_proc.cc +++ b/src/xapp-mgmt/msgs_proc.cc @@ -232,11 +232,11 @@ bool XappMsgHandler::decode_subscription_delete_response_failure(unsigned char* } //For processing received messages. -rmr_mbuf_t * XappMsgHandler::operator()(rmr_mbuf_t *message){ +void XappMsgHandler::operator()(rmr_mbuf_t *message, bool *resend){ if (message->len > MAX_RMR_RECV_SIZE){ mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE); - return message; + return; } switch(message->mtype){ @@ -244,38 +244,37 @@ rmr_mbuf_t * XappMsgHandler::operator()(rmr_mbuf_t *message){ case (RIC_HEALTH_CHECK_REQ): message->mtype = RIC_HEALTH_CHECK_RESP; // if we're here we are running and all is ok message->sub_id = -1; - break; + strncpy( (char*)message->payload, "HELLOWORLD OK\n", rmr_payload_size( message) ); + *resend = true; + break; case (RIC_SUB_RESP): //Received Subscription Response Message decode_subscription_response(message->payload,message->len); - message = NULL; break; case (RIC_SUB_DEL_RESP): decode_subscription_delete_response(message->payload,message->len); - message = NULL; break; case (RIC_SUB_FAILURE): decode_subscription_response_failure(message->payload, message->len); - message = NULL; break; case (RIC_SUB_DEL_FAILURE): decode_subscription_delete_response_failure(message->payload,message->len); - message = NULL; break; // case A1_POLICY_REQ: // break; default: + *resend = false; mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype); } - return message; + return; }; diff --git a/src/xapp-mgmt/msgs_proc.hpp b/src/xapp-mgmt/msgs_proc.hpp index ec15933..ed9941f 100644 --- a/src/xapp-mgmt/msgs_proc.hpp +++ b/src/xapp-mgmt/msgs_proc.hpp @@ -51,7 +51,7 @@ private: size_t* message_length; public: - rmr_mbuf_t * operator() (rmr_mbuf_t *); + void operator() (rmr_mbuf_t *, bool*); bool encode_subscription_request(unsigned char*, size_t* ); bool encode_subscription_delete_request(unsigned char*, size_t* ); diff --git a/src/xapp-mgmt/xapp_handler.hpp b/src/xapp-mgmt/xapp_handler.hpp new file mode 100644 index 0000000..5b547b7 --- /dev/null +++ b/src/xapp-mgmt/xapp_handler.hpp @@ -0,0 +1,21 @@ +/* + * xapp_handler.hpp + * + * Created on: Mar 16, 2020 + * Author: Shraboni Jana + */ + +#ifndef SRC_XAPP_MGMT_XAPP_HANDLER_HPP_ +#define SRC_XAPP_MGMT_XAPP_HANDLER_HPP_ + +class XappHandler{ + XappHandler *xhandler; +public: + virtual ~XappHandler(){delete xhandler;}; + virtual void register_handler(XappHandler *xhandler) = 0; + virtual XappHandler* get_handler() = 0; +}; + + + +#endif /* SRC_XAPP_MGMT_XAPP_HANDLER_HPP_ */ diff --git a/src/xapp-utils/xapp_rmr.cc b/src/xapp-utils/xapp_rmr.cc index f757667..c822e55 100755 --- a/src/xapp-utils/xapp_rmr.cc +++ b/src/xapp-utils/xapp_rmr.cc @@ -20,15 +20,15 @@ #include "xapp_rmr.hpp" -XappRmr::XappRmr(std::string xname, std::string port, int rmrattempts){ +XappRmr::XappRmr(std::string port, int rmrattempts){ _proto_port = port; - _xapp_name = xname; _nattempts = rmrattempts; _xapp_rmr_ctx = NULL; _xapp_received_buff = NULL; _xapp_send_buff =NULL; _rmr_is_ready = false; + _listen = false; }; @@ -65,57 +65,95 @@ void XappRmr::xapp_rmr_init(){ return; } - +//RMR Returning to the sender. bool XappRmr::xapp_rmr_rts() { - _xapp_send_buff = rmr_realloc_payload( _xapp_send_buff, 128, false, false ); // ensure payload is large enough - strncpy( (char*)_xapp_send_buff->payload, "OK\n", rmr_payload_size( _xapp_send_buff) ); - rmr_rts_msg(_xapp_rmr_ctx, _xapp_send_buff ); - _xapp_send_buff = NULL; + mdclog_write(MDCLOG_INFO,"RMR Return to sender, file= %s, line=%d",__FILE__,__LINE__); + if ( _xapp_rmr_ctx == NULL){ + mdclog_write(MDCLOG_ERR,"Error Initializing RMR, file= %s, line=%d",__FILE__,__LINE__); + } + while( ! rmr_ready(_xapp_rmr_ctx) ) { + mdclog_write(MDCLOG_INFO,">>> waiting for RMR, file= %s, line=%d",__FILE__,__LINE__); + sleep(1); + } + rmr_rts_msg(_xapp_rmr_ctx, _xapp_received_buff ); + sleep(1); return true; } //RMR Send with payload and header. bool XappRmr::xapp_rmr_send(xapp_rmr_header *hdr, void *payload){ + int rmr_attempts = _nattempts; + if( _xapp_send_buff == NULL ) { _xapp_send_buff = rmr_alloc_msg(_xapp_rmr_ctx, RMR_DEF_SIZE); } - - _xapp_send_buff->mtype = hdr->message_type; - memcpy(_xapp_send_buff->payload, payload, hdr->payload_length); - _xapp_send_buff->len = hdr->payload_length; + if(!_rmr_is_ready) { mdclog_write(MDCLOG_ERR,"RMR Context is Not Ready in SENDER, file= %s, line=%d",__FILE__,__LINE__); return false; } - if( _xapp_send_buff == NULL ) { - return false; - } - - while(_nattempts > 0){ + while(rmr_attempts > 0){ _xapp_send_buff = rmr_send_msg(_xapp_rmr_ctx,_xapp_send_buff); if(!_xapp_send_buff) { - mdclog_write(MDCLOG_ERR,"Error In Sending Message , file= %s, line=%d",__FILE__,__LINE__); - _nattempts--; + mdclog_write(MDCLOG_ERR,"Error In Sending Message , file= %s, line=%d, attempt=%d",__FILE__,__LINE__,rmr_attempts); + rmr_attempts--; } else if (_xapp_send_buff->state == RMR_OK){ - mdclog_write(MDCLOG_INFO,"The okay message is %d, file= %s, line=%d", RMR_OK, __FILE__,__LINE__); - _nattempts = 0; + mdclog_write(MDCLOG_INFO,"Message Sent: RMR State = RMR_OK"); + rmr_attempts = 0; + _xapp_send_buff = NULL; return true; } else { - mdclog_write(MDCLOG_INFO,"Need to retry RMR MSG NUM %d, file= %s, line=%d",_xapp_send_buff->state, __FILE__,__LINE__); - _nattempts--; + mdclog_write(MDCLOG_INFO,"Need to retry RMR: state=%d, attempt=%d, file=%s, line=%d",_xapp_send_buff->state, rmr_attempts,__FILE__,__LINE__); + rmr_attempts--; } sleep(1); } return false; } +//---------------------------------------- +// Some get/set methods +//--------------------------------------- +bool XappRmr::get_listen(void){ + return _listen; +} + + +void XappRmr::set_listen(bool listen){ + _listen = listen; +} + +int XappRmr::get_is_ready(void){ + return _rmr_is_ready; +} + +bool XappRmr::get_isRunning(void){ + return _listen; +} + + +void * XappRmr::get_rmr_context(void){ + return _xapp_rmr_ctx; +} + + +void init_logger(const char *AppName, mdclog_severity_t log_level) +{ + mdclog_attr_t *attr; + mdclog_attr_init(&attr); + mdclog_attr_set_ident(attr, AppName); + mdclog_init(attr); + mdclog_level_set(log_level); + mdclog_attr_destroy(attr); +} + diff --git a/src/xapp-utils/xapp_rmr.hpp b/src/xapp-utils/xapp_rmr.hpp index 2ba6fb1..30aea8c 100755 --- a/src/xapp-utils/xapp_rmr.hpp +++ b/src/xapp-utils/xapp_rmr.hpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -58,10 +59,10 @@ typedef struct{ class XappRmr{ private: - std::string _xapp_name; std::string _proto_port; int _nattempts; bool _rmr_is_ready; + bool _listen; void* _xapp_rmr_ctx; rmr_mbuf_t* _xapp_send_buff; // send buffer rmr_mbuf_t* _xapp_received_buff; // received buffer @@ -69,7 +70,7 @@ private: public: - XappRmr(std::string, std::string, int rmrattempts=10); + XappRmr(std::string, int rmrattempts=10); ~XappRmr(void); void xapp_rmr_init(void); @@ -78,40 +79,83 @@ public: bool xapp_rmr_send(xapp_rmr_header*, void*); bool xapp_rmr_rts(); + void set_listen(bool); + bool get_listen(void); + int get_is_ready(void); + bool get_isRunning(void); + void* get_rmr_context(void); + }; -//RMR receive -template -void XappRmr::xapp_rmr_receive(MessageProcessor&& msgproc, XappRmr *parent){ - char* listen_port; +// main workhorse thread which does the listen->process->respond loop +template +void XappRmr::xapp_rmr_receive(MsgHandler&& msgproc, XappRmr *parent){ - if( (listen_port = getenv( "RMR_RCV_PORT" )) == NULL ) { - mdclog_write(MDCLOG_ERR,"No Listening port assigned, file= %s, line=%d",__FILE__,__LINE__); - } + bool* resend = new bool(false); + // Get the thread id + std::thread::id my_id = std::this_thread::get_id(); + std::stringstream thread_id; + std::stringstream ss; - if(!_rmr_is_ready){ + thread_id << my_id; + + // Get the rmr context from parent (all threads and parent use same rmr context. rmr context is expected to be thread safe) + if(!parent->get_is_ready()){ mdclog_write( MDCLOG_ERR, "RMR Shows Not Ready in RECEIVER, file= %s, line=%d ",__FILE__,__LINE__); return; - } + } + void *rmr_context = parent->get_rmr_context(); + assert(rmr_context != NULL); + + // Get buffer specific to this thread + this->_xapp_received_buff = NULL; + this->_xapp_received_buff = rmr_alloc_msg(rmr_context, RMR_DEF_SIZE); + assert(this->_xapp_received_buff != NULL); + + mdclog_write(MDCLOG_INFO, "Starting thread %s", thread_id.str().c_str()); + + while(parent->get_listen()) { + mdclog_write(MDCLOG_INFO, "Listening at Thread: %s", thread_id.str().c_str()); - while( 1 ) { - parent->_xapp_received_buff = rmr_rcv_msg( parent->_xapp_rmr_ctx, parent->_xapp_received_buff ); // block until one arrives - if( parent->_xapp_received_buff->mtype < 0 || parent->_xapp_received_buff->state != RMR_OK ) { - mdclog_write(MDCLOG_ERR, "bad msg: state=%d errno=%d, file= %s, line=%d", parent->_xapp_received_buff->state, errno, __FILE__,__LINE__ ); - return; - } else { - std::cout << "The Message Received is:" << (char*)parent->_xapp_received_buff->payload <mtype <_xapp_received_buff = rmr_rcv_msg( rmr_context, this->_xapp_received_buff ); + + if( this->_xapp_received_buff->mtype < 0 || this->_xapp_received_buff->state != RMR_OK ) { + mdclog_write(MDCLOG_ERR, "bad msg: state=%d errno=%d, file= %s, line=%d", this->_xapp_received_buff->state, errno, __FILE__,__LINE__ ); + return; + } + else + { + mdclog_write(MDCLOG_INFO,"RMR Received Message of Type: %d",this->_xapp_received_buff->mtype); + mdclog_write(MDCLOG_INFO,"RMR Received Message: %s",(char*)this->_xapp_received_buff->payload); + + //in case message handler returns true, need to resend the message. + msgproc(this->_xapp_received_buff, resend); + if(*resend){ + rmr_rts_msg(rmr_context, this->_xapp_received_buff ); + sleep(1); + *resend = false; } + continue; } - return; + + } + // Clean up + try{ + delete resend; + rmr_free_msg(this->_xapp_received_buff); + } + catch(std::runtime_error &e){ + std::string identifier = __FILE__ + std::string(", Line: ") + std::to_string(__LINE__) ; + std::string error_string = identifier = " Error freeing RMR message "; + mdclog_write(MDCLOG_ERR, error_string.c_str(), ""); + } + + return; } + + + #endif /* XAPP_RMR_XAPP_RMR_H_ */ diff --git a/src/xapp-utils/xapp_sdl.cc b/src/xapp-utils/xapp_sdl.cc index b098273..d74e584 100644 --- a/src/xapp-utils/xapp_sdl.cc +++ b/src/xapp-utils/xapp_sdl.cc @@ -24,24 +24,48 @@ * Author: Shraboni Jana */ #include "xapp_sdl.hpp" -/*need to work on the SDL FLow. -As per Matti +/*need to work on the SDL FLow. Currently data hardcoded. An xApp can use the SDL for two things: - persisting state for itself (in case it fails and recovers) -- making information available for other xApps. The xApp would typically write using SDL directly. The consumer of the data could also use SDL directly or use an access library like in the case of the R-NIB. +- making information available for other xApps. The xApp would typically write using SDL directly. +- The consumer of the data could also use SDL directly or use an access library like in the case of the R-NIB. */ -void XappSDL::insert_data(){ - //connecting to the Redis and generating a random key for namespace "hwxapp" - - DataMap dmap; - char key[4]={'a','b','c'}; - std::cout << "KEY: "<< key << std::endl; - Key k = key; - Data d; - uint8_t num = 101; - d.push_back(num); - dmap.insert({k,d}); - sdl->set(ns, dmap); +bool XappSDL::set_data(shareddatalayer::SyncStorage *sdl){ + try{ + //connecting to the Redis and generating a random key for namespace "hwxapp" + mdclog_write(MDCLOG_INFO, "IN SDL Set Data", __FILE__, __LINE__); + DataMap dmap; + char key[4]="abc"; + std::cout << "KEY: "<< key << std::endl; + Key k = key; + Data d; + uint8_t num = 101; + d.push_back(num); + dmap.insert({k,d}); + Namespace ns(sdl_namespace); + sdl->set(ns, dmap); + } + catch(...){ + mdclog_write(MDCLOG_ERR, "SDL Error in Set Data for Namespace=%s",sdl_namespace); + return false; + } + return true; } +void XappSDL::get_data(shareddatalayer::SyncStorage *sdl){ + Namespace ns(sdl_namespace); + DataMap dmap; + std::string prefix=""; + Keys K = sdl->findKeys(ns, prefix); // just the prefix + DataMap Dk = sdl->get(ns, K); + for(auto si=K.begin();si!=K.end();++si){ + std::vector val_v = Dk[(*si)]; // 4 lines to unpack a string + char val[val_v.size()+1]; // from Data + int i; + for(i=0;i #include #include - +#include using namespace std; using Namespace = std::string; @@ -46,16 +46,12 @@ using Keys = std::set; class XappSDL{ private: - std::unique_ptr sdl; - Namespace ns; -public: + std::string sdl_namespace; - XappSDL(std::string s) { - Namespace temp(s); - ns = temp; - sdl = (shareddatalayer::SyncStorage::create()); - }; - void insert_data(); +public: + XappSDL(std::string ns) { sdl_namespace=ns; } + void get_data(shareddatalayer::SyncStorage *); + bool set_data(shareddatalayer::SyncStorage *); }; #endif /* SRC_XAPP_UTILS_XAPP_SDL_HPP_ */ diff --git a/src/xapp.cc b/src/xapp.cc index ca737e6..4101699 100644 --- a/src/xapp.cc +++ b/src/xapp.cc @@ -32,16 +32,6 @@ Xapp::Xapp(XappSettings &config, XappRmr &rmr){ return; } - -Xapp::Xapp(XappSettings &config, XappRmr &rmr, XappSDL &sdl){ - rmr_ref = &rmr; - config_ref = &config; - sdl_ref = &sdl; - //sdl_ref.insert_data(); - xapp_mutex = NULL; - - return; -} Xapp::~Xapp(void){ //Joining the threads @@ -50,25 +40,36 @@ Xapp::~Xapp(void){ if(xapp_rcv_thread[i].joinable()) xapp_rcv_thread[i].join(); } + delete xapp_mutex; + }; + +void Xapp::init() { + + //get rnib information + get_rnib_gnblist(); + + +} void Xapp::startup() { //send subscriptions and read A1 policies. startup_subscribe_requests(); - startup_get_policies(); + //startup_get_policies(); return; } -void Xapp::start_xapp_receiver(){ +void Xapp::start_xapp_receiver(XappMsgHandler& mp_handler){ //start a receiver thread. Can be multiple receiver threads for more than 1 listening port. xapp_mutex = new std::mutex(); - std::vector> message_procs; mdclog_write(MDCLOG_INFO,"Receiver Thread file= %s, line=%d",__FILE__,__LINE__); - std::unique_ptr mp_handler = std::make_unique(); + //std::unique_ptr mp_handler = std::make_unique(); + //auto mp_handler = _callbacks[0]; std::lock_guard guard(*xapp_mutex); - std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(*mp_handler.get()), rmr_ref);}); + std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(mp_handler), rmr_ref);}); + xapp_rcv_thread.push_back(std::move(th_recv)); @@ -88,11 +89,7 @@ void Xapp::startup_subscribe_requests(void ){ size_t data_size = ASN_BUFF_MAX_SIZE; unsigned char data[data_size]; - std::vector gNodeBs; - gNodeBs.push_back("GNB1001"); //this line should come from RNIB - - - for(auto &it: gNodeBs){ + for(auto &it: rnib_gnblist){ int attempt = 0; XappMsgHandler msg; @@ -129,25 +126,37 @@ void Xapp::startup_get_policies(void){ free(message); } -void Xapp::sdl_data(void) { - sdl_ref->insert_data(); -} -/*void Xapp::rnib_data(void) { - printf("Using rnibreader lib from C:\n"); - open(); +void Xapp::set_rnib_gnblist(void) { + + openSdl(); void *result = getListGnbIds(); if(result == NULL){ - - printf("ERROR: no data from getListGnbIds\n"); + mdclog_write(MDCLOG_ERR, "ERROR: no data from getListGnbIds\n"); return; } - printf("getListGnbIds response: %s\n", (char *)result); - close(); + + mdclog_write(MDCLOG_INFO, "GNB List in R-NIB %s\n", (char*)result); + + Document doc; + doc.Parse((char*)result); + assert(doc.HasMember("gnb_list")); + const Value& gnblist = doc["gnb_list"]; + assert(gnblist.IsArray()); + for (SizeType i = 0; i < gnblist.Size(); i++) // Uses SizeType instead of size_t + { + assert(gnblist[i].IsObject()); + const Value& gnbobj = gnblist[i]; + assert(gnbobj.HasMember("inventory_name")); + assert(gnbobj["inventory_name"].IsString()); + rnib_gnblist.push_back(gnbobj["inventory_name"].GetString()); + + } + closeSdl(); free(result); return; -}*/ +} diff --git a/src/xapp.hpp b/src/xapp.hpp index bcfff5b..680db1b 100644 --- a/src/xapp.hpp +++ b/src/xapp.hpp @@ -32,46 +32,67 @@ #include #include #include +#include +#include #include "xapp_rmr.hpp" #include "xapp_sdl.hpp" #include "rapidjson/writer.h" - +#include "rapidjson/document.h" #include "msgs_proc.hpp" #include "subs_mgmt.hpp" #include "xapp_config.hpp" -//#include "rnib/rnibreader.h" - - +extern "C" { +#include "rnib/rnibreader.h" +} using namespace std; using namespace std::placeholders; +using namespace rapidjson; + +using callback_type = std::function< void(rmr_mbuf_t*,bool*) > ; class Xapp{ public: Xapp(XappSettings &, XappRmr &); - Xapp(XappSettings &, XappRmr &, XappSDL &); ~Xapp(void); void startup(); void shutdown(void); - - void start_xapp_receiver(); + void init(void); + void start_xapp_receiver(XappMsgHandler &); + void sdl_data(void); Xapp(Xapp const &)=delete; Xapp& operator=(Xapp const &) = delete; + template + void register_handler(FunctionObject fn){ + _callbacks.emplace_back(fn); + } + + void callback_handler(){ + + } + + void set_rnib_gnblist(void); + //getters/setters. + std::vector get_rnib_gnblist(){ return rnib_gnblist; } + private: void startup_subscribe_requests(void ); void shutdown_subscribe_deletes(void); void startup_get_policies(void ); - void sdl_data(void); - void rnib_data(void); + XappRmr * rmr_ref; XappSettings * config_ref; - XappSDL *sdl_ref = NULL; + std::mutex *xapp_mutex; std::vector xapp_rcv_thread; + std::vector rnib_gnblist; + + std::vector _callbacks; + }; -- 2.16.6