From e187d5001a6bf0129c48dfbdbc7d0ee86585d21d Mon Sep 17 00:00:00 2001 From: Ron Shacham Date: Fri, 8 May 2020 12:14:44 -0400 Subject: [PATCH] TS pred logic Issue-ID: RICAPP-19 Signed-off-by: Ron Shacham Change-Id: Id64cd20bc14533f4c01ea0775dbfd53e4fca1ba0 --- Dockerfile | 12 +- container-tag.yaml | 2 +- examples/Makefile | 33 ---- examples/ts_xapp.cpp | 346 --------------------------------- rmr-version.yaml | 2 +- routes.txt | 4 +- run_xapp.sh | 21 -- src/ts_xapp/ts_xapp.cpp | 503 +++++++++++++++++++++++++++++++----------------- 8 files changed, 344 insertions(+), 579 deletions(-) delete mode 100644 examples/Makefile delete mode 100644 examples/ts_xapp.cpp delete mode 100755 run_xapp.sh diff --git a/Dockerfile b/Dockerfile index 6201076..1f8f596 100644 --- a/Dockerfile +++ b/Dockerfile @@ -40,7 +40,7 @@ ARG SRC=. WORKDIR /playpen # versions we snarf from package cloud -ARG RMR_VER=4.0.2 +ARG RMR_VER=4.0.5 ARG SDL_VER=1.0.4 ARG XFCPP_VER=1.0.0 @@ -65,6 +65,16 @@ RUN wget -nv --content-disposition ${PC_STG_URL}/sdl_${SDL_VER}-1_amd64.deb/down wget -nv --content-disposition ${PC_STG_URL}/sdl-dev_${SDL_VER}-1_amd64.deb/download.deb &&\ dpkg -i sdl-dev_${SDL_VER}-1_amd64.deb sdl_${SDL_VER}-1_amd64.deb +RUN git clone https://github.com/Tencent/rapidjson && \ + cd rapidjson && \ + mkdir build && \ + cd build && \ + cmake -DCMAKE_INSTALL_PREFIX=/usr/local .. && \ + make install && \ + cd ${STAGE_DIR} && \ + rm -rf rapidjson + + # # build and install the application(s) diff --git a/container-tag.yaml b/container-tag.yaml index 252b070..d0cf09e 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -1,3 +1,3 @@ # this is used by CI jobs to apply a tag when it builds the image --- -tag: '1.0.1' +tag: '1.0.2' diff --git a/examples/Makefile b/examples/Makefile deleted file mode 100644 index 08a350b..0000000 --- a/examples/Makefile +++ /dev/null @@ -1,33 +0,0 @@ -# vim: ts=4 sw=4 noet: - -#================================================================================== -# Copyright (c) 2020 Nokia -# Copyright (c) 2020 AT&T Intellectual Property. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#================================================================================== - -# simple makefile to build the examples. This assumes that the xapp framework -# library has been installed or the LD_LIBRARY_PATH and C_INCLUDE_PATH environent -# variables are set to reference the needed files. - -%.o:: %.cpp %.hpp - g++ -g ${prereq%% *} -c - -% :: %.cpp - g++ $< -g -o $@ -lricxfcpp -lrmr_si -lpthread -lm -lsdl - -all:: ts_xapp - -install:: - cp ts_xapp /usr/local/bin/ diff --git a/examples/ts_xapp.cpp b/examples/ts_xapp.cpp deleted file mode 100644 index 50ed0ab..0000000 --- a/examples/ts_xapp.cpp +++ /dev/null @@ -1,346 +0,0 @@ -// vi: ts=4 sw=4 noet: -/* -================================================================================== - Copyright (c) 2020 Nokia - Copyright (c) 2020 AT&T Intellectual Property. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -================================================================================== -*/ - -/* - Mnemonic: ts_xapp.cpp - Abstract: Traffic Steering xApp; - 1. Receives A1 Policy - 2. Queries SDL to decide which UE to attempt Traffic Steering for - 3. Requests prediction for UE throughput on current and neighbor cells - 4. Receives prediction - 5. Optionally exercises Traffic Steering action over E2 - - Date: 22 April 2020 - Author: Ron Shacham - -*/ - -#include -#include -#include - -#include -#include - -#include -#include -#include -#include -#include - -#include "ricxfcpp/xapp.hpp" - -using Namespace = std::string; -using Key = std::string; -using Data = std::vector; -using DataMap = std::map; -using Keys = std::set; - - -// ---------------------------------------------------------- - -std::unique_ptr xfw; - - -void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) { - - long now; - long total_count; - - int sz; - int i; - - int response_to = 0; // max timeout wating for a response - - int send_mtype = 0; - int rmtype; // received message type - int delay = 1000000; // mu-sec delay; default 1s - - std::unique_ptr msg; - Msg_component send_payload; // special type of unique pointer to the payload - - fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len); - fprintf(stderr, "payload is %s\n", payload.get()); - - //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len ); - mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls - mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" ); - - mtype = 0; - - fprintf(stderr, "cb 1\n"); - - msg = xfw->Alloc_msg( 2048 ); - - sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough - if( sz < 2048 ) { - fprintf( stderr, " fail: message returned did not have enough size: %d [%d]\n", sz, i ); - exit( 1 ); - } - - fprintf(stderr, "cb 2"); - - send_payload = msg->Get_payload(); // direct access to payload - snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\" : [\"222\", \"333\", \"444\"]}" ); - - fprintf(stderr, "cb 3"); - - // payload updated in place, nothing to copy from, so payload parm is nil - if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() )+1, NULL )) { - fprintf( stderr, " send failed: %d\n", i ); - } - - fprintf(stderr, "cb 4"); - - /* - msg = xfw->Receive( response_to ); - if( msg != NULL ) { - rmtype = msg->Get_mtype(); - send_payload = msg->Get_payload(); - fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() ); - } - */ - -} - - -void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) { - - long now; - long total_count; - - int sz; - int i; - - int response_to = 0; // max timeout wating for a response - - int send_mtype = 0; - int rmtype; // received message type - int delay = 1000000; // mu-sec delay; default 1s - - std::unique_ptr msg; - Msg_component send_payload; // special type of unique pointer to the payload - - fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len); - fprintf(stderr, "payload is %s\n", payload.get()); - - mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls - mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" ); - - mtype = 0; - - fprintf(stderr, "cb 1\n"); - -} - - - -extern int main( int argc, char** argv ) { - - std::unique_ptr msg; - Msg_component payload; // special type of unique pointer to the payload - - int nthreads = 1; - - int response_to = 0; // max timeout wating for a response - - int delay = 1000000; // mu-sec delay; default 1s - - char* port = (char *) "4560"; - - int ai; - - ai = 1; - while( ai < argc ) { // very simple flag processing (no bounds/error checking) - if( argv[ai][0] != '-' ) { - break; - } - - switch( argv[ai][1] ) { // we only support -x so -xy must be -x -y - case 'd': // delay between messages (mu-sec) - delay = atoi( argv[ai+1] ); - ai++; - break; - - case 'p': - port = argv[ai+1]; - ai++; - break; - - case 't': // timeout in seconds; we need to convert to ms for rmr calls - response_to = atoi( argv[ai+1] ) * 1000; - ai++; - break; - } - ai++; - } - - fprintf( stderr, " response timeout set to: %d\n", response_to ); - fprintf( stderr, " listening on port: %s\n", port ); - - xfw = std::unique_ptr( new Xapp( port, true ) ); // new xAPP thing; wait for a route table - - fprintf(stderr, "code1\n"); - - xfw->Add_msg_cb( 20010, policy_callback, NULL ); - xfw->Add_msg_cb( 30002, prediction_callback, NULL ); - - fprintf(stderr, "code2\n"); - - std::string sdl_namespace_u = "TS-UE-metrics"; - std::string sdl_namespace_c = "TS-cell-metrics"; - - fprintf(stderr, "code5\n"); - - std::unique_ptr sdl(shareddatalayer::SyncStorage::create()); - - Namespace nsu(sdl_namespace_u); - Namespace nsc(sdl_namespace_c); - - /* - - fprintf(stderr, "before sdl set\n"); - - try{ - //connecting to the Redis and generating a random key for namespace "hwxapp" - fprintf(stderr, "IN SDL Set Data"); - // std::string data_string = "{\"rsrp\" : -110}"; - - - std::string data_string = "{\"CellID\": \"310-680-200-555001\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 2000000, \"PDCPBytesUL\": 1200000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 50 }"; - - DataMap dmap; - // char key[4]="abc"; - char key[] = "310-680-200-555001"; - std::cout << "KEY: "<< key << std::endl; - Key k = key; - Data d; - // uint8_t num = 101; - d.assign(data_string.begin(), data_string.end()); - // d.push_back(num); - dmap.insert({k,d}); - - sdl->set(nsc, dmap); - - data_string = "{ \"CellID\": \"310-680-200-555002\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }"; - - Data d2; - DataMap dmap2; - char key2[] = "310-680-200-555002"; - std::cout << "KEY: "<< key2 << std::endl; - Key k2 = key2; - d2.assign(data_string.begin(), data_string.end()); - // d.push_back(num); - dmap2.insert({k2,d}); - - sdl->set(nsc, dmap2); - - - - std::string data_string = "{ \"CellID\": \"310-680-200-555003\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }"; - - Data d3; - DataMap dmap3; - char key3[] = "310-680-200-555003"; - std::cout << "KEY: "<< key3 << std::endl; - Key k3 = key3; - d3.assign(data_string.begin(), data_string.end()); - // d.push_back(num); - dmap3.insert({k3,d3}); - - sdl->set(nsc, dmap3); - - - - data_string = "{ \"UEID\": 12345, \"ServingCellID\": \"310-680-200-555002\", \"MeasTimestampUEPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPDCPBytes\": 20,\"UEPDCPBytesDL\": 250000,\"UEPDCPBytesUL\": 100000, \"MeasTimestampUEPRBUsage\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPRBUsage\": 20, \"UEPRBUsageDL\": 10, \"UEPRBUsageUL\": 30, \"MeasTimestampRF\": \"2020-03-18 02:23:18.210\",\"MeasPeriodRF\": 40, \"ServingCellRF\": [-115,-16,-5], \"NeighborCellRF\": [ {\"CID\": \"310-680-200-555001\",\"Cell-RF\": [-90,-13,-2.5 ] }, {\"CID\": \"310-680-200-555003\", \"Cell-RF\": [-140,-17,-6 ] } ] }"; - - Data d4; - DataMap dmap4; - char key4[] = "12345"; - std::cout << "KEY: "<< key << std::endl; - d4.assign(data_string.begin(), data_string.end()); - Key k4 = key4; - // d.push_back(num); - dmap4.insert({k4,d4}); - - sdl->set(nsu, dmap4); - - - } - catch(...){ - fprintf(stderr,"SDL Error in Set Data for Namespace"); - return false; - } - - fprintf(stderr, "after sdl set\n"); - - */ - - fprintf(stderr, "before sdl get\n"); - - - std::string prefix2="310"; - Keys K = sdl->findKeys(nsc, prefix2); // just the prefix - DataMap Dk = sdl->get(nsc, K); - - std::cout << "K contains " << K.size() << " elements.\n"; - - fprintf(stderr, "before forloop\n"); - - for(auto si=K.begin();si!=K.end();++si){ - std::vector val_v = Dk[(*si)]; // 4 lines to unpack a string - char val[val_v.size()+1]; // from Data - int i; - for(i=0;ifindKeys(nsu, prefix3); // just the prefix - DataMap Dk2 = sdl->get(nsu, K2); - - std::cout << "K contains " << K2.size() << " elements.\n"; - - fprintf(stderr, "before forloop\n"); - - for(auto si=K2.begin();si!=K2.end();++si){ - std::vector val_v = Dk2[(*si)]; // 4 lines to unpack a string - char val[val_v.size()+1]; // from Data - int i; - for(i=0;iRun( nthreads ); - - fprintf(stderr, "code3\n"); - - msg = xfw->Alloc_msg( 2048 ); - - fprintf(stderr, "code4\n"); - - -} diff --git a/rmr-version.yaml b/rmr-version.yaml index c03848e..ceaf505 100644 --- a/rmr-version.yaml +++ b/rmr-version.yaml @@ -1,3 +1,3 @@ # Communicate to CI which version of RMR to install in the build/vet environment --- -version: 4.0.2 +version: 4.0.5 diff --git a/routes.txt b/routes.txt index f0242cf..5837eb1 100755 --- a/routes.txt +++ b/routes.txt @@ -1,6 +1,4 @@ newrt|start -mse|20010|20008|4560 rte|20011|service-ricplt-a1mediator-rmr:10000 -rte|TS_QUE_PREDICTION|4560 -rte|TS_UE_LIST|service-ricxapp-qpd:4560 +rte|30000|service-ricxapp-qpdriver.ricxapp.svc.cluster.local:4562 newrt|end diff --git a/run_xapp.sh b/run_xapp.sh deleted file mode 100755 index f65a4b7..0000000 --- a/run_xapp.sh +++ /dev/null @@ -1,21 +0,0 @@ -#! /bin/bash - -export RMR_SEED_RT="routes.txt" -export RMR_RTG_SVC="9999" -export XAPP_NAME="HELLOWORLD_XAPP" -export HW_PORTS="4560" -export MSG_MAX_BUFFER="2048" -export THREADS="1" -export VERBOSE="0" -export CONFIG_FILE="config/config-file.json" -export GNODEB="NYC123" -export XAPP_ID="3489-er492k-92389" -export A1_SCHEMA_FILE="schemas/hwxapp-policy.json" -export VES_SCHEMA_FILE="schemas/hwxapp-ves.json" -export VES_COLLECTOR_URL="127.0.0.1:6350" -export VES_MEASUREMENT_INTERVAL="10" -export LOG_LEVEL="MDCLOG_ERR" -export OPERATING_MODE="CONTROL" - - - diff --git a/src/ts_xapp/ts_xapp.cpp b/src/ts_xapp/ts_xapp.cpp index f6e2c37..4cb3ebc 100644 --- a/src/ts_xapp/ts_xapp.cpp +++ b/src/ts_xapp/ts_xapp.cpp @@ -36,6 +36,7 @@ #include #include +#include #include #include @@ -44,9 +45,19 @@ #include #include #include +#include + +#include +#include +#include +#include +#include + #include "ricxfcpp/xapp.hpp" +using namespace rapidjson; +using namespace std; using Namespace = std::string; using Key = std::string; using Data = std::vector; @@ -58,23 +69,195 @@ using Keys = std::set; std::unique_ptr xfw; +std::string sdl_namespace_u = "TS-UE-metrics"; +std::string sdl_namespace_c = "TS-cell-metrics"; + +int rsrp_threshold = 0; + +std::unique_ptr sdl; + +Namespace nsu; +Namespace nsc; + +struct UEData { + string serving_cell; + int serving_cell_rsrp; + +}; + +struct PredictionHandler : public BaseReaderHandler, PredictionHandler> { + unordered_map cell_pred; + std::string ue_id; + bool ue_id_found = false; + string curr_key = ""; + string curr_value = ""; + bool Null() { cout << "Null()" << endl; return true; } + bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; } + bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; } + bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; } + bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; } + bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; } + bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; } + bool String(const char* str, SizeType length, bool copy) { + cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl; + if (curr_key.compare("") != 0) { + cout << "Found throughput\n"; + curr_value = str; + cell_pred[curr_key] = curr_value; + curr_key = ""; + curr_value = ""; + } -void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) { + return true; + } + bool StartObject() { cout << "StartObject()" << endl; return true; } + bool Key(const char* str, SizeType length, bool copy) { + cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl; + if (!ue_id_found) { + cout << "Found UE ID\n"; + ue_id = str; + ue_id_found = true; + } else { + curr_key = str; + } + return true; + } + bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; } + bool StartArray() { cout << "StartArray()" << endl; return true; } + bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; } +}; + + +struct UEDataHandler : public BaseReaderHandler, UEDataHandler> { + unordered_map cell_pred; + std::string serving_cell_id; + int serving_cell_rsrp; + int serving_cell_rsrq; + int serving_cell_sinr; + bool in_serving_array = false; + int rf_meas_index = 0; + + string curr_key = ""; + string curr_value = ""; + bool Null() { cout << "Null()" << endl; return true; } + bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; } + bool Int(int i) { + fprintf(stderr, "Int(%d)\n", i); + if (in_serving_array) { + fprintf(stderr, "we are in serving array\n"); + switch(rf_meas_index) { + case 0: + serving_cell_rsrp = i; + break; + case 1: + serving_cell_rsrq = i; + break; + case 2: + serving_cell_sinr = i; + break; + } + rf_meas_index++; + } + return true; + } + bool Uint(unsigned u) { + fprintf(stderr, "Int(%d)\n", u); return true; } + bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; } + bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; } + bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; } + bool String(const char* str, SizeType length, bool copy) { + fprintf(stderr,"String(%s)\n", str); + if (curr_key.compare("ServingCellID") == 0) { + serving_cell_id = str; + } + + return true; + } + bool StartObject() { cout << "StartObject()" << endl; return true; } + bool Key(const char* str, SizeType length, bool copy) { + fprintf(stderr,"Key(%s)\n", str); + curr_key = str; + return true; + } + bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; } + bool StartArray() { + fprintf(stderr,"StartArray()"); + if (curr_key.compare("ServingCellRF") == 0) { + in_serving_array = true; + } + + return true; + } + bool EndArray(SizeType elementCount) { + fprintf(stderr, "EndArray()\n"); + if (curr_key.compare("servingCellRF") == 0) { + in_serving_array = false; + rf_meas_index = 0; + } - long now; - long total_count; + return true; } +}; - int sz; - int i; - int response_to = 0; // max timeout wating for a response +unordered_map get_sdl_ue_data() { - int send_mtype = 0; - int rmtype; // received message type - int delay = 1000000; // mu-sec delay; default 1s + fprintf(stderr, "In get_sdl_ue_data()\n"); + + unordered_map ue_data; + + unordered_map return_ue_data_map; + + std::string prefix3="12"; + Keys K2 = sdl->findKeys(nsu, prefix3); + DataMap Dk2 = sdl->get(nsu, K2); + + string ue_json; + string ue_id; + + for(auto si=K2.begin();si!=K2.end();++si){ + std::vector val_v = Dk2[(*si)]; // 4 lines to unpack a string + char val[val_v.size()+1]; // from Data + int i; + fprintf(stderr, "val size %d\n", val_v.size()); + for(i=0;isecond.c_str()); + reader.Parse(ss,handler); + + string ueID = map_iter->first; + string serving_cell_id = handler.serving_cell_id; + int serv_rsrp = handler.serving_cell_rsrp; + + fprintf(stderr,"UE data for %s\n", ueID.c_str()); + fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str()); + fprintf(stderr,"RSRP for UE %d\n", serv_rsrp); + + return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp}; + + } + + fprintf(stderr, "\n"); + return return_ue_data_map; +} + +void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) { + + int response_to = 0; // max timeout wating for a response + int rmtype; // received message type - std::unique_ptr msg; - Msg_component send_payload; // special type of unique pointer to the payload fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len); fprintf(stderr, "payload is %s\n", payload.get()); @@ -83,8 +266,23 @@ void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_componen mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" ); - mtype = 0; + //Set the threshold value + + +} + +void send_prediction_request(vector ues_to_predict) { + std::unique_ptr msg; + Msg_component payload; // special type of unique pointer to the payload + + int nthreads = 1; + int response_to = 0; // max timeout wating for a response + int mtype = 30000; + int sz; + int i; + Msg_component send_payload; + fprintf(stderr, "cb 1\n"); msg = xfw->Alloc_msg( 2048 ); @@ -96,18 +294,39 @@ void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_componen } fprintf(stderr, "cb 2"); + + string ues_list = "["; + + for (int i = 0; i < ues_to_predict.size(); i++) { + if (i == ues_to_predict.size() - 1) { + ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\""; + } else { + ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ","; + } + } + + string message_body = "{\"UEPredictionSet\": " + ues_list + "}"; + + const char *body = message_body.c_str(); + + // char *body = "{\"UEPredictionSet\": [\"12345\"]}"; send_payload = msg->Get_payload(); // direct access to payload - snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\" : [\"222\", \"333\", \"444\"]}" ); + // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 ); + // snprintf( (char *) send_payload.get(), 2048, body); + snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}"); - fprintf(stderr, "cb 3"); + fprintf(stderr, "message body %s\n", send_payload.get()); + + fprintf(stderr, "cb 3"); + fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() )); // payload updated in place, nothing to copy from, so payload parm is nil - if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() )+1, NULL )) { + if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) { fprintf( stderr, " send failed: %d\n", msg->Get_state() ); } - fprintf(stderr, "cb 4"); + fprintf(stderr, "cb 4"); /* msg = xfw->Receive( response_to ); @@ -116,10 +335,9 @@ void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_componen send_payload = msg->Get_payload(); fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() ); } - */ - -} + */ +} void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) { @@ -135,9 +353,6 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp int rmtype; // received message type int delay = 1000000; // mu-sec delay; default 1s - std::unique_ptr msg; - Msg_component send_payload; // special type of unique pointer to the payload - fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len); fprintf(stderr, "payload is %s\n", payload.get()); @@ -147,200 +362,142 @@ void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_comp mtype = 0; fprintf(stderr, "cb 1\n"); - -} - + char *incoming_msg = "{\"12345\": {\"222\": \"20000\", \"333\" : \"50000\"} }"; -extern int main( int argc, char** argv ) { - - std::unique_ptr msg; - Msg_component payload; // special type of unique pointer to the payload + PredictionHandler handler; + Reader reader; + StringStream ss(incoming_msg); + reader.Parse(ss,handler); - int nthreads = 1; + std::string pred_ue_id = handler.ue_id; - int response_to = 0; // max timeout wating for a response + cout << "Prediction for " << pred_ue_id << endl; - int delay = 1000000; // mu-sec delay; default 1s + unordered_map throughput_map = handler.cell_pred; - char* port = (char *) "4560"; - int ai; - - ai = 1; - while( ai < argc ) { // very simple flag processing (no bounds/error checking) - if( argv[ai][0] != '-' ) { - break; - } - - switch( argv[ai][1] ) { // we only support -x so -xy must be -x -y - case 'd': // delay between messages (mu-sec) - delay = atoi( argv[ai+1] ); - ai++; - break; - - case 'p': - port = argv[ai+1]; - ai++; - break; - - case 't': // timeout in seconds; we need to convert to ms for rmr calls - response_to = atoi( argv[ai+1] ) * 1000; - ai++; - break; - } - ai++; - } - - fprintf( stderr, " response timeout set to: %d\n", response_to ); - fprintf( stderr, " listening on port: %s\n", port ); - - xfw = std::unique_ptr( new Xapp( port, true ) ); // new xAPP thing; wait for a route table + cout << endl; + + unordered_map sdl_data = get_sdl_ue_data(); - fprintf(stderr, "code1\n"); - - xfw->Add_msg_cb( 20010, policy_callback, NULL ); - xfw->Add_msg_cb( 30002, prediction_callback, NULL ); + //Decision about CONTROL message + //(1) Identify UE Id in Prediction message + //(2) Get UEData struct for this UE Id + //(3) Identify the UE's service cell ID + //(4) Iterate through Prediction message. + // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request - fprintf(stderr, "code2\n"); + UEData pred_ue_data = sdl_data[pred_ue_id]; + std::string serving_cell_id = pred_ue_data.serving_cell; - std::string sdl_namespace_u = "TS-UE-metrics"; - std::string sdl_namespace_c = "TS-cell-metrics"; + int serving_cell_throughput; + int highest_throughput; + std::string highest_throughput_cell_id; + std::string::size_type str_size; - fprintf(stderr, "code5\n"); - - std::unique_ptr sdl(shareddatalayer::SyncStorage::create()); + cout << "Going through throughtput map:" << endl; - Namespace nsu(sdl_namespace_u); - Namespace nsc(sdl_namespace_c); + for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) { + cout << map_iter->first << " : " << map_iter->second << endl; + std::string curr_cellid = map_iter->first; + cout << "Cell ID is " << curr_cellid; + int curr_throughput = stoi(map_iter->second, &str_size); + cout << "Throughput is " << curr_throughput << endl; - /* + if (curr_cellid.compare(serving_cell_id) == 0) { + serving_cell_throughput = curr_throughput; + highest_throughput = serving_cell_throughput; + } - fprintf(stderr, "before sdl set\n"); - - try{ - //connecting to the Redis and generating a random key for namespace "hwxapp" - fprintf(stderr, "IN SDL Set Data"); - // std::string data_string = "{\"rsrp\" : -110}"; + } + //Iterating again to identify the highest throughput prediction - std::string data_string = "{\"CellID\": \"310-680-200-555001\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 2000000, \"PDCPBytesUL\": 1200000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 50 }"; - - DataMap dmap; - // char key[4]="abc"; - char key[] = "310-680-200-555001"; - std::cout << "KEY: "<< key << std::endl; - Key k = key; - Data d; - // uint8_t num = 101; - d.assign(data_string.begin(), data_string.end()); - // d.push_back(num); - dmap.insert({k,d}); + for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) { + cout << map_iter->first << " : " << map_iter->second << endl; + std::string curr_cellid = map_iter->first; + cout << "Cell ID is " << curr_cellid; + int curr_throughput = stoi(map_iter->second, &str_size); + cout << "Throughput is " << curr_throughput << endl; - sdl->set(nsc, dmap); + if (curr_throughput > serving_cell_throughput) { + highest_throughput = curr_throughput; + highest_throughput_cell_id = curr_cellid; + } + } - data_string = "{ \"CellID\": \"310-680-200-555002\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }"; + if (highest_throughput > serving_cell_throughput) { + cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl; + cout << "UE ID: " << pred_ue_id << endl; + cout << "Source cell " << serving_cell_id << endl; + cout << "Target cell " << highest_throughput_cell_id << endl; + } + + +} - Data d2; - DataMap dmap2; - char key2[] = "310-680-200-555002"; - std::cout << "KEY: "<< key2 << std::endl; - Key k2 = key2; - d2.assign(data_string.begin(), data_string.end()); - // d.push_back(num); - dmap2.insert({k2,d}); - sdl->set(nsc, dmap2); +//This function runs a loop that continuously checks SDL for any UE +void run_loop() { + fprintf(stderr, "in run_loop()\n"); - std::string data_string = "{ \"CellID\": \"310-680-200-555003\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }"; + unordered_map uemap; - Data d3; - DataMap dmap3; - char key3[] = "310-680-200-555003"; - std::cout << "KEY: "<< key3 << std::endl; - Key k3 = key3; - d3.assign(data_string.begin(), data_string.end()); - // d.push_back(num); - dmap3.insert({k3,d3}); + vector prediction_ues; - sdl->set(nsc, dmap3); + while (1) { + fprintf(stderr, "in while loop\n"); + uemap = get_sdl_ue_data(); - data_string = "{ \"UEID\": 12345, \"ServingCellID\": \"310-680-200-555002\", \"MeasTimestampUEPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPDCPBytes\": 20,\"UEPDCPBytesDL\": 250000,\"UEPDCPBytesUL\": 100000, \"MeasTimestampUEPRBUsage\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPRBUsage\": 20, \"UEPRBUsageDL\": 10, \"UEPRBUsageUL\": 30, \"MeasTimestampRF\": \"2020-03-18 02:23:18.210\",\"MeasPeriodRF\": 40, \"ServingCellRF\": [-115,-16,-5], \"NeighborCellRF\": [ {\"CID\": \"310-680-200-555001\",\"Cell-RF\": [-90,-13,-2.5 ] }, {\"CID\": \"310-680-200-555003\", \"Cell-RF\": [-140,-17,-6 ] } ] }"; - - Data d4; - DataMap dmap4; - char key4[] = "12345"; - std::cout << "KEY: "<< key << std::endl; - d4.assign(data_string.begin(), data_string.end()); - Key k4 = key4; - // d.push_back(num); - dmap4.insert({k4,d4}); + for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) { + string ueid = map_iter->first; + UEData data = map_iter->second; + if (data.serving_cell_rsrp < rsrp_threshold) { + prediction_ues.push_back(ueid); + } + } - sdl->set(nsu, dmap4); + if (prediction_ues.size() > 0) { + send_prediction_request(prediction_ues); + } - - } - catch(...){ - fprintf(stderr,"SDL Error in Set Data for Namespace"); - return false; + sleep(20); } - - fprintf(stderr, "after sdl set\n"); - - */ - - fprintf(stderr, "before sdl get\n"); - - - std::string prefix2="310"; - Keys K = sdl->findKeys(nsc, prefix2); // just the prefix - DataMap Dk = sdl->get(nsc, K); +} - std::cout << "K contains " << K.size() << " elements.\n"; - fprintf(stderr, "before forloop\n"); - - for(auto si=K.begin();si!=K.end();++si){ - std::vector val_v = Dk[(*si)]; // 4 lines to unpack a string - char val[val_v.size()+1]; // from Data - int i; - for(i=0;ifindKeys(nsu, prefix3); // just the prefix - DataMap Dk2 = sdl->get(nsu, K2); + int nthreads = 1; - std::cout << "K contains " << K2.size() << " elements.\n"; + char* port = (char *) "4560"; - fprintf(stderr, "before forloop\n"); + sdl = shareddatalayer::SyncStorage::create(); - for(auto si=K2.begin();si!=K2.end();++si){ - std::vector val_v = Dk2[(*si)]; // 4 lines to unpack a string - char val[val_v.size()+1]; // from Data - int i; - for(i=0;i listening on port: %s\n", port ); + xfw = std::unique_ptr( new Xapp( port, true ) ); // new xAPP thing; wait for a route table + fprintf(stderr, "code1\n"); - fprintf(stderr, "after sdl get\n"); - - xfw->Run( nthreads ); + + xfw->Add_msg_cb( 20010, policy_callback, NULL ); + xfw->Add_msg_cb( 30002, prediction_callback, NULL ); - fprintf(stderr, "code3\n"); + fprintf(stderr, "code2\n"); - msg = xfw->Alloc_msg( 2048 ); + std::thread loop_thread; - fprintf(stderr, "code4\n"); + loop_thread = std::thread(&run_loop); + xfw->Run( nthreads ); } -- 2.16.6