3 ==================================================================================
4 Copyright (c) 2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
22 Abstract: Traffic Steering xApp
24 2. Receives anomaly detection
25 3. Requests prediction for UE throughput on current and neighbor cells
26 4. Receives prediction
27 5. Optionally exercises Traffic Steering action over E2
32 Modified: 21 May 2021 (Alexandre Huff)
33 Update for traffic steering use case in release D.
34 07 Dec 2021 (Alexandre Huff)
35 Update for traffic steering use case in release E.
50 #include <unordered_map>
52 #include <rapidjson/document.h>
53 #include <rapidjson/writer.h>
54 #include <rapidjson/stringbuffer.h>
55 #include <rapidjson/schema.h>
56 #include <rapidjson/reader.h>
57 #include <rapidjson/prettywriter.h>
59 #include <rmr/RIC_message_types.h>
60 #include <ricxfcpp/xapp.hpp>
61 #include <ricxfcpp/config.hpp>
64 FIXME unfortunately this RMR flag has to be disabled
65 due to name resolution conflicts.
66 RC xApp defines the same name for gRPC control messages.
68 #undef RIC_CONTROL_ACK
70 #include <grpc/grpc.h>
71 #include <grpcpp/channel.h>
72 #include <grpcpp/client_context.h>
73 #include <grpcpp/create_channel.h>
74 #include <grpcpp/security/credentials.h>
75 #include "protobuf/rc.grpc.pb.h"
77 #include "utils/restclient.hpp"
80 using namespace rapidjson;
84 using Namespace = std::string;
85 using Key = std::string;
86 using Data = std::vector<uint8_t>;
87 using DataMap = std::map<Key, Data>;
88 using Keys = std::set<Key>;
91 // ----------------------------------------------------------
92 std::unique_ptr<Xapp> xfw;
93 std::unique_ptr<rc::MsgComm::Stub> rc_stub;
95 int downlink_threshold = 0; // A1 policy type 20008 (in percentage)
97 // scoped enum to identify which API is used to send control messages
98 enum class TsControlApi { REST, gRPC };
99 TsControlApi ts_control_api; // api to send control messages
100 string ts_control_ep; // api target endpoint
102 typedef struct nodeb {
110 unordered_map<string, shared_ptr<nodeb_t>> cell_map; // maps each cell to its nodeb
114 int serving_cell_rsrp;
117 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
119 Assuming we receive the following payload from A1 Mediator
120 {"operation": "CREATE", "policy_type_id": 20008, "policy_instance_id": "tsapolicy145", "payload": {"threshold": 5}}
122 unordered_map<string, string> cell_pred;
124 bool ue_id_found = false;
125 string curr_key = "";
126 string curr_value = "";
128 int policy_instance_id;
130 std::string operation;
131 bool found_threshold = false;
133 bool Null() { return true; }
134 bool Bool(bool b) { return true; }
137 if (curr_key.compare("policy_type_id") == 0) {
139 } else if (curr_key.compare("policy_instance_id") == 0) {
140 policy_instance_id = i;
141 } else if (curr_key.compare("threshold") == 0) {
142 found_threshold = true;
148 bool Uint(unsigned u) {
150 if (curr_key.compare("policy_type_id") == 0) {
152 } else if (curr_key.compare("policy_instance_id") == 0) {
153 policy_instance_id = u;
154 } else if (curr_key.compare("threshold") == 0) {
155 found_threshold = true;
161 bool Int64(int64_t i) { return true; }
162 bool Uint64(uint64_t u) { return true; }
163 bool Double(double d) { return true; }
164 bool String(const char* str, SizeType length, bool copy) {
166 if (curr_key.compare("operation") != 0) {
176 bool Key(const char* str, SizeType length, bool copy) {
182 bool EndObject(SizeType memberCount) { return true; }
183 bool StartArray() { return true; }
184 bool EndArray(SizeType elementCount) { return true; }
188 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
189 unordered_map<string, int> cell_pred_down;
190 unordered_map<string, int> cell_pred_up;
192 bool ue_id_found = false;
193 string curr_key = "";
194 string curr_value = "";
195 string serving_cell_id;
196 bool down_val = true;
197 bool Null() { return true; }
198 bool Bool(bool b) { return true; }
199 bool Int(int i) { return true; }
200 bool Uint(unsigned u) {
201 // Currently, we assume the first cell in the prediction message is the serving cell
202 if ( serving_cell_id.empty() ) {
203 serving_cell_id = curr_key;
207 cell_pred_down[curr_key] = u;
210 cell_pred_up[curr_key] = u;
217 bool Int64(int64_t i) { return true; }
218 bool Uint64(uint64_t u) { return true; }
219 bool Double(double d) { return true; }
220 bool String(const char* str, SizeType length, bool copy) {
224 bool StartObject() { return true; }
225 bool Key(const char* str, SizeType length, bool copy) {
235 bool EndObject(SizeType memberCount) { return true; }
236 bool StartArray() { return true; }
237 bool EndArray(SizeType elementCount) { return true; }
240 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
242 Assuming we receive the following payload from AD
243 [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
245 vector<string> prediction_ues;
246 string curr_key = "";
248 bool Key(const Ch* str, SizeType len, bool copy) {
253 bool String(const Ch* str, SizeType len, bool copy) {
254 // We are only interested in the "ue-id"
255 if ( curr_key.compare( "ue-id") == 0 ) {
256 prediction_ues.push_back( str );
262 struct NodebListHandler : public BaseReaderHandler<UTF8<>, NodebListHandler> {
263 vector<string> nodeb_list;
264 string curr_key = "";
266 bool Key(const Ch* str, SizeType length, bool copy) {
271 bool String(const Ch* str, SizeType length, bool copy) {
272 if( curr_key.compare( "inventoryName" ) == 0 ) {
273 nodeb_list.push_back( str );
279 struct NodebHandler : public BaseReaderHandler<UTF8<>, NodebHandler> {
280 string curr_key = "";
281 shared_ptr<nodeb_t> nodeb = make_shared<nodeb_t>();
283 bool Key(const Ch* str, SizeType length, bool copy) {
288 bool String(const Ch* str, SizeType length, bool copy) {
289 if( curr_key.compare( "ranName" ) == 0 ) {
290 nodeb->ran_name = str;
291 } else if( curr_key.compare( "plmnId" ) == 0 ) {
292 nodeb->global_nb_id.plmn_id = str;
293 } else if( curr_key.compare( "nbId" ) == 0 ) {
294 nodeb->global_nb_id.nb_id = str;
295 } else if( curr_key.compare( "cellId" ) == 0 ) {
296 cell_map[str] = nodeb;
304 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
305 unordered_map<string, string> cell_pred;
306 std::string serving_cell_id;
307 int serving_cell_rsrp;
308 int serving_cell_rsrq;
309 int serving_cell_sinr;
310 bool in_serving_array = false;
311 int rf_meas_index = 0;
313 bool in_serving_report_object = false;
315 string curr_key = "";
316 string curr_value = "";
317 bool Null() { return true; }
318 bool Bool(bool b) { return true; }
324 bool Uint(unsigned i) {
326 if (in_serving_report_object) {
327 if (curr_key.compare("rsrp") == 0) {
328 serving_cell_rsrp = i;
329 } else if (curr_key.compare("rsrq") == 0) {
330 serving_cell_rsrq = i;
331 } else if (curr_key.compare("rssinr") == 0) {
332 serving_cell_sinr = i;
337 bool Int64(int64_t i) {
340 bool Uint64(uint64_t i) {
343 bool Double(double d) { return true; }
344 bool String(const char* str, SizeType length, bool copy) {
346 if (curr_key.compare("ServingCellID") == 0) {
347 serving_cell_id = str;
353 if (curr_key.compare("ServingCellRF") == 0) {
354 in_serving_report_object = true;
358 bool Key(const char* str, SizeType length, bool copy) {
363 bool EndObject(SizeType memberCount) {
364 if (curr_key.compare("ServingCellRF") == 0) {
365 in_serving_report_object = false;
370 if (curr_key.compare("ServingCellRF") == 0) {
371 in_serving_array = true;
376 bool EndArray(SizeType elementCount) {
378 if (curr_key.compare("servingCellRF") == 0) {
379 in_serving_array = false;
387 /* unordered_map<string, UEData> get_sdl_ue_data() {
389 fprintf(stderr, "In get_sdl_ue_data()\n");
391 unordered_map<string, string> ue_data;
393 unordered_map<string, UEData> return_ue_data_map;
395 std::string prefix3="";
396 Keys K2 = sdl->findKeys(nsu, prefix3);
397 DataMap Dk2 = sdl->get(nsu, K2);
402 for(auto si=K2.begin();si!=K2.end();++si){
403 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
404 char val[val_v.size()+1]; // from Data
407 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
409 ue_id.assign((std::string)*si);
412 ue_data[ue_id] = ue_json;
415 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
416 UEDataHandler handler;
418 StringStream ss(map_iter->second.c_str());
419 reader.Parse(ss,handler);
421 string ueID = map_iter->first;
422 string serving_cell_id = handler.serving_cell_id;
423 int serv_rsrp = handler.serving_cell_rsrp;
425 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
429 return return_ue_data_map;
432 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
433 string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
435 cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length=" << len << "\n";
436 cout << "[INFO] Payload is " << arg << endl;
438 PolicyHandler handler;
440 StringStream ss(arg.c_str());
441 reader.Parse(ss,handler);
443 //Set the threshold value
444 if (handler.found_threshold) {
445 cout << "[INFO] Setting Threshold for A1-P value: " << handler.threshold << "%\n";
446 downlink_threshold = handler.threshold;
451 // sends a handover message through REST
452 void send_rest_control_request( string ue_id, string serving_cell_id, string target_cell_id ) {
455 static unsigned int seq_number = 0; // static counter, not thread-safe
457 // building a handoff control message
458 now = time( nullptr );
459 str_now = ctime( &now );
460 str_now.pop_back(); // removing the \n character
462 seq_number++; // static counter, not thread-safe
464 rapidjson::StringBuffer s;
465 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
466 writer.StartObject();
467 writer.Key( "command" );
468 writer.String( "HandOff" );
469 writer.Key( "seqNo" );
470 writer.Int( seq_number );
472 writer.String( ue_id.c_str() );
473 writer.Key( "fromCell" );
474 writer.String( serving_cell_id.c_str() );
475 writer.Key( "toCell" );
476 writer.String( target_cell_id.c_str() );
477 writer.Key( "timestamp" );
478 writer.String( str_now.c_str() );
479 writer.Key( "reason" );
480 writer.String( "HandOff Control Request from TS xApp" );
484 // creates a message like
486 "command": "HandOff",
491 "timestamp": "Sat May 22 10:35:33 2021",
492 "reason": "HandOff Control Request from TS xApp",
496 string msg = s.GetString();
498 cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
499 cout << "[INFO] HandOff request is " << msg << endl;
503 restclient::RestClient client( ts_control_ep );
504 restclient::response_t resp = client.do_post( "", msg ); // we already have the full path in ts_control_ep
506 if( resp.status_code == 200 ) {
507 // ============== DO SOMETHING USEFUL HERE ===============
508 // Currently, we only print out the HandOff reply
509 rapidjson::Document document;
510 document.Parse( resp.body.c_str() );
511 rapidjson::StringBuffer s;
512 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
513 document.Accept( writer );
514 cout << "[INFO] HandOff reply is " << s.GetString() << endl;
517 cout << "[ERROR] Unexpected HTTP code " << resp.status_code << " from " << \
518 client.getBaseUrl() << \
519 "\n[ERROR] HTTP payload is " << resp.body.c_str() << endl;
522 } catch( const restclient::RestClientException &e ) {
523 cout << "[ERROR] " << e.what() << endl;
529 // sends a handover message to RC xApp through gRPC
530 void send_grpc_control_request( string ue_id, string target_cell_id ) {
531 grpc::ClientContext context;
533 rc::RicControlGrpcRsp response;
534 shared_ptr<rc::RicControlGrpcReq> request = make_shared<rc::RicControlGrpcReq>();
536 rc::RICE2APHeader *apHeader = request->mutable_rice2apheaderdata();
537 apHeader->set_ranfuncid( 300 );
538 apHeader->set_ricrequestorid( 1001 );
540 rc::RICControlHeader *ctrlHeader = request->mutable_riccontrolheaderdata();
541 ctrlHeader->set_controlstyle( 3 );
542 ctrlHeader->set_controlactionid( 1 );
543 ctrlHeader->set_ueid( ue_id );
545 rc::RICControlMessage *ctrlMsg = request->mutable_riccontrolmessagedata();
546 ctrlMsg->set_riccontrolcelltypeval( rc::RIC_CONTROL_CELL_UNKWON );
547 ctrlMsg->set_targetcellid( target_cell_id );
549 auto data = cell_map.find( target_cell_id );
550 if( data != cell_map.end() ) {
551 request->set_e2nodeid( data->second->global_nb_id.nb_id );
552 request->set_plmnid( data->second->global_nb_id.plmn_id );
553 request->set_ranname( data->second->ran_name );
555 request->set_e2nodeid( "unknown_e2nodeid" );
556 request->set_plmnid( "unknown_plmnid" );
557 request->set_ranname( "unknown_ranname" );
559 request->set_riccontrolackreqval( rc::RIC_CONTROL_ACK_UNKWON ); // not yet used in api.proto
561 cout << "[INFO] Sending gRPC control request to " << ts_control_ep << "\n" << request->DebugString();
563 grpc::Status status = rc_stub->SendRICControlReqServiceGrpc( &context, *request, &response );
566 if( response.rspcode() == 0 ) {
567 cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
569 cout << "[ERROR] Control Request failed with code=" << response.rspcode()
570 << ", description=" << response.description() << endl;
574 cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
575 << status.error_code() << ", error_msg=" << status.error_message() << endl;
580 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
581 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
583 cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
584 cout << "[INFO] Payload is " << json << endl;
586 PredictionHandler handler;
589 StringStream ss(json.c_str());
590 reader.Parse(ss,handler);
592 cout << "[ERROR] Got an exception on stringstream read parse\n";
595 // We are only considering download throughput
596 unordered_map<string, int> throughput_map = handler.cell_pred_down;
598 // Decision about CONTROL message
599 // (1) Identify UE Id in Prediction message
600 // (2) Iterate through Prediction message.
601 // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
602 // We assume the first cell in the prediction message is the serving cell
604 int serving_cell_throughput = 0;
605 int highest_throughput = 0;
606 string highest_throughput_cell_id;
608 // Getting the current serving cell throughput prediction
609 auto cell = throughput_map.find( handler.serving_cell_id );
610 serving_cell_throughput = cell->second;
612 // Iterating to identify the highest throughput prediction
613 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
615 string curr_cellid = map_iter->first;
616 int curr_throughput = map_iter->second;
618 if ( highest_throughput < curr_throughput ) {
619 highest_throughput = curr_throughput;
620 highest_throughput_cell_id = curr_cellid;
626 if( downlink_threshold > 0 ) { // we also take into account the threshold in A1 policy type 20008
627 thresh = serving_cell_throughput * (downlink_threshold / 100.0);
630 if ( highest_throughput > ( serving_cell_throughput + thresh ) ) {
632 // sending a control request message
633 if ( ts_control_api == TsControlApi::REST ) {
634 send_rest_control_request( handler.ue_id, handler.serving_cell_id, highest_throughput_cell_id );
636 send_grpc_control_request( handler.ue_id, highest_throughput_cell_id );
640 cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
645 void send_prediction_request( vector<string> ues_to_predict ) {
646 std::unique_ptr<Message> msg;
647 Msg_component payload; // special type of unique pointer to the payload
652 Msg_component send_payload;
654 msg = xfw->Alloc_msg( 2048 );
656 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
658 fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
662 string ues_list = "[";
664 for (int i = 0; i < ues_to_predict.size(); i++) {
665 if (i == ues_to_predict.size() - 1) {
666 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
668 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
672 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
674 send_payload = msg->Get_payload(); // direct access to payload
675 snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
677 plen = strlen( (char *)send_payload.get() );
679 cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
681 // payload updated in place, nothing to copy from, so payload parm is nil
682 if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
683 fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
688 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
689 * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
690 * sends a prediction request to the QP Driver xApp.
692 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
693 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
695 cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
696 cout << "[INFO] Payload is " << json << "\n";
698 AnomalyHandler handler;
700 StringStream ss(json.c_str());
701 reader.Parse(ss,handler);
703 // just sending ACK to the AD xApp
704 mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
706 send_prediction_request(handler.prediction_ues);
709 vector<string> get_nodeb_list( restclient::RestClient& client ) {
711 restclient::response_t response = client.do_get( "/v1/nodeb/states" );
713 NodebListHandler handler;
714 if( response.status_code == 200 ) {
716 StringStream ss( response.body.c_str() );
717 reader.Parse( ss, handler );
719 cout << "[INFO] nodeb list is " << response.body.c_str() << endl;
722 if( response.body.empty() ) {
723 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() << endl;
725 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() <<
726 ". HTTP payload is " << response.body.c_str() << endl;
730 return handler.nodeb_list;
733 bool build_cell_mapping() {
735 char *data = getenv( "SERVICE_E2MGR_HTTP_BASE_URL" );
736 if ( data == NULL ) {
737 base_url = "http://service-ricplt-e2mgr-http.ricplt:3800";
739 base_url = string( data );
743 restclient::RestClient client( base_url );
745 vector<string> nb_list = get_nodeb_list( client );
747 for( string nb : nb_list ) {
748 string full_path = string("/v1/nodeb/") + nb;
749 restclient::response_t response = client.do_get( full_path );
750 if( response.status_code != 200 ) {
751 if( response.body.empty() ) {
752 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
753 client.getBaseUrl() + full_path << endl;
755 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
756 client.getBaseUrl() + full_path << ". HTTP payload is " << response.body.c_str() << endl;
762 NodebHandler handler;
764 StringStream ss( response.body.c_str() );
765 reader.Parse( ss, handler );
767 cout << "[ERROR] Got an exception on parsing nodeb (stringstream read parse)\n";
772 } catch( const restclient::RestClientException &e ) {
773 cout << "[ERROR] " << e.what() << endl;
780 extern int main( int argc, char** argv ) {
782 char* port = (char *) "4560";
783 shared_ptr<grpc::Channel> channel;
785 Config *config = new Config();
786 string api = config->Get_control_str("ts_control_api");
787 ts_control_ep = config->Get_control_str("ts_control_ep");
789 cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
792 if ( api.compare("rest") == 0 ) {
793 ts_control_api = TsControlApi::REST;
795 ts_control_api = TsControlApi::gRPC;
797 if( !build_cell_mapping() ) {
798 cout << "[ERROR] unable to map cells to nodeb\n";
801 channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
802 rc_stub = rc::MsgComm::NewStub(channel, grpc::StubOptions());
805 fprintf( stderr, "[INFO] listening on port %s\n", port );
806 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
808 xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
809 xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
810 xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
812 xfw->Run( nthreads );