2 ==================================================================================
4 Copyright (c) 2019-2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
22 * Mar, 2020 (Shraboni Jana)
27 Xapp::Xapp(XappSettings &config, XappRmr &rmr){
32 subhandler_ref = NULL;
36 Xapp::Xapp(XappSettings &config, XappRmr &rmr, SubscriptionHandler &sub_ref){
40 subhandler_ref = &sub_ref;
49 int threadcnt = xapp_rcv_thread.size();
50 for(int i=0; i<threadcnt; i++){
51 if(xapp_rcv_thread[i].joinable())
52 xapp_rcv_thread[i].join();
54 xapp_rcv_thread.clear();
62 //Stop the xapp. Note- To be run only from unit test scripts.
63 void Xapp::stop(void){
65 std::lock_guard<std::mutex> guard(*xapp_mutex);
66 rmr_ref->set_listen(false);
69 //Detaching the threads....not sure if this is the right way to stop the receiver threads.
70 //Hence function should be called only in Unit Tests
71 int threadcnt = xapp_rcv_thread.size();
72 for(int i=0; i<threadcnt; i++){
73 xapp_rcv_thread[i].detach();
78 void Xapp::startup() {
80 startup_subscribe_requests();
83 startup_get_policies();
87 rmr_ref->set_listen(true);
88 if(xapp_mutex == NULL){
89 xapp_mutex = new std::mutex();
91 std::lock_guard<std::mutex> guard(*xapp_mutex);
93 for(int j=0; j < _callbacks.size(); j++){
94 std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(_callbacks[j]), rmr_ref);});
95 xapp_rcv_thread.push_back(std::move(th_recv));
100 //Starting a seperate single receiver
101 void Xapp::start_xapp_receiver(XappMsgHandler& mp_handler){
102 //start a receiver thread. Can be multiple receiver threads for more than 1 listening port.
103 rmr_ref->set_listen(true);
104 if(xapp_mutex == NULL){
105 xapp_mutex = new std::mutex();
108 mdclog_write(MDCLOG_INFO,"Receiver Thread file= %s, line=%d",__FILE__,__LINE__);
109 std::lock_guard<std::mutex> guard(*xapp_mutex);
110 std::thread th_recv([&](){ rmr_ref->xapp_rmr_receive(std::move(mp_handler), rmr_ref);});
111 xapp_rcv_thread.push_back(std::move(th_recv));
117 void Xapp::shutdown(){
124 void Xapp::startup_subscribe_requests(void ){
127 size_t data_size = ASN_BUFF_MAX_SIZE;
128 unsigned char data[data_size];
129 unsigned char meid[RMR_MAX_MEID];
130 std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
132 mdclog_write(MDCLOG_INFO,"Sending subscription in file= %s, line=%d",__FILE__,__LINE__);
134 auto gnblist = get_rnib_gnblist();
135 int sz = gnblist.size();
137 for(int i = 0; i<sz; i++){
139 //give the message to subscription handler, along with the transmitter.
140 strcpy((char*)meid,gnblist[i].c_str());
142 char *strMsg = "Subscription Request from HelloWorld XApp\0";
143 strncpy((char *)data,strMsg,strlen(strMsg));
144 data_size = strlen(strMsg);
146 xapp_rmr_header rmr_header;
147 rmr_header.message_type = RIC_SUB_REQ;
148 rmr_header.payload_length = data_size;
149 strcpy((char*)rmr_header.meid,gnblist[i].c_str());
151 mdclog_write(MDCLOG_INFO,"Sending subscription in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
152 auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)data);
154 int res = subhandler_ref->manage_subscription_request(meid, transmitter);
156 mdclog_write(MDCLOG_INFO,"Subscription SUCCESSFUL in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
163 void Xapp::startup_get_policies(void){
165 int policy_id = HELLOWORLD_POLICY_ID;
167 std::string policy_query = "{\"policy_id\":" + std::to_string(policy_id) + "}";
168 unsigned char * message = (unsigned char *)calloc(policy_query.length(), sizeof(unsigned char));
169 memcpy(message, policy_query.c_str(), policy_query.length());
170 xapp_rmr_header header;
171 header.payload_length = policy_query.length();
172 header.message_type = A1_POLICY_QUERY;
173 mdclog_write(MDCLOG_INFO, "Sending request for policy id %d\n", policy_id);
174 rmr_ref->xapp_rmr_send(&header, (void *)message);
179 void Xapp::set_rnib_gnblist(void) {
183 void *result = getListGnbIds();
184 if(strlen((char*)result) < 1){
185 mdclog_write(MDCLOG_ERR, "ERROR: no data from getListGnbIds\n");
189 mdclog_write(MDCLOG_INFO, "GNB List in R-NIB %s\n", (char*)result);
193 ParseResult parseJson = doc.Parse((char*)result);
195 std::cerr << "JSON parse error: %s (%u)", GetParseErrorFunc(parseJson.Code());
199 if(!doc.HasMember("gnb_list")){
200 mdclog_write(MDCLOG_INFO, "JSON Has No GNB List Object");
203 assert(doc.HasMember("gnb_list"));
205 const Value& gnblist = doc["gnb_list"];
206 if (gnblist.IsNull())
209 if(!gnblist.IsArray()){
210 mdclog_write(MDCLOG_INFO, "GNB List is not an array");
215 assert(gnblist.IsArray());
216 for (SizeType i = 0; i < gnblist.Size(); i++) // Uses SizeType instead of size_t
218 assert(gnblist[i].IsObject());
219 const Value& gnbobj = gnblist[i];
220 assert(gnbobj.HasMember("inventory_name"));
221 assert(gnbobj["inventory_name"].IsString());
222 std::string name = gnbobj["inventory_name"].GetString();
223 rnib_gnblist.push_back(name);