3 ==================================================================================
4 Copyright (c) 2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
22 Abstract: Traffic Steering xApp
24 2. Receives anomaly detection
25 3. Requests prediction for UE throughput on current and neighbor cells
26 4. Receives prediction
27 5. Optionally exercises Traffic Steering action over E2
32 Modified: 21 May 2021 (Alexandre Huff)
33 Update for traffic steering use case in release D.
34 07 Dec 2021 (Alexandre Huff)
35 Update for traffic steering use case in release E.
50 #include <unordered_map>
52 #include <rapidjson/document.h>
53 #include <rapidjson/writer.h>
54 #include <rapidjson/stringbuffer.h>
55 #include <rapidjson/schema.h>
56 #include <rapidjson/reader.h>
57 #include <rapidjson/prettywriter.h>
59 #include <rmr/RIC_message_types.h>
60 #include <ricxfcpp/xapp.hpp>
61 #include <ricxfcpp/config.hpp>
64 FIXME unfortunately this RMR flag has to be disabled
65 due to name resolution conflicts.
66 RC xApp defines the same name for gRPC control messages.
68 #undef RIC_CONTROL_ACK
70 #include <grpc/grpc.h>
71 #include <grpcpp/channel.h>
72 #include <grpcpp/client_context.h>
73 #include <grpcpp/create_channel.h>
74 #include <grpcpp/security/credentials.h>
75 #include "protobuf/api.grpc.pb.h"
77 #include "utils/restclient.hpp"
80 using namespace rapidjson;
84 using Namespace = std::string;
85 using Key = std::string;
86 using Data = std::vector<uint8_t>;
87 using DataMap = std::map<Key, Data>;
88 using Keys = std::set<Key>;
91 // ----------------------------------------------------------
92 std::unique_ptr<Xapp> xfw;
93 std::unique_ptr<api::MsgComm::Stub> rc_stub;
95 int rsrp_threshold = 0;
97 // scoped enum to identify which API is used to send control messages
98 enum class TsControlApi { REST, gRPC };
99 TsControlApi ts_control_api; // api to send control messages
100 string ts_control_ep; // api target endpoint
102 typedef struct nodeb {
110 unordered_map<string, shared_ptr<nodeb_t>> cell_map; // maps each cell to its nodeb
114 int serving_cell_rsrp;
117 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
118 unordered_map<string, string> cell_pred;
120 bool ue_id_found = false;
121 string curr_key = "";
122 string curr_value = "";
124 int policy_instance_id;
126 std::string operation;
127 bool found_threshold = false;
129 bool Null() { return true; }
130 bool Bool(bool b) { return true; }
133 if (curr_key.compare("policy_type_id") == 0) {
135 } else if (curr_key.compare("policy_instance_id") == 0) {
136 policy_instance_id = i;
137 } else if (curr_key.compare("threshold") == 0) {
138 found_threshold = true;
144 bool Uint(unsigned u) {
146 if (curr_key.compare("policy_type_id") == 0) {
148 } else if (curr_key.compare("policy_instance_id") == 0) {
149 policy_instance_id = u;
150 } else if (curr_key.compare("threshold") == 0) {
151 found_threshold = true;
157 bool Int64(int64_t i) { return true; }
158 bool Uint64(uint64_t u) { return true; }
159 bool Double(double d) { return true; }
160 bool String(const char* str, SizeType length, bool copy) {
162 if (curr_key.compare("operation") != 0) {
172 bool Key(const char* str, SizeType length, bool copy) {
178 bool EndObject(SizeType memberCount) { return true; }
179 bool StartArray() { return true; }
180 bool EndArray(SizeType elementCount) { return true; }
184 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
185 unordered_map<string, int> cell_pred_down;
186 unordered_map<string, int> cell_pred_up;
188 bool ue_id_found = false;
189 string curr_key = "";
190 string curr_value = "";
191 string serving_cell_id;
192 bool down_val = true;
193 bool Null() { return true; }
194 bool Bool(bool b) { return true; }
195 bool Int(int i) { return true; }
196 bool Uint(unsigned u) {
197 // Currently, we assume the first cell in the prediction message is the serving cell
198 if ( serving_cell_id.empty() ) {
199 serving_cell_id = curr_key;
203 cell_pred_down[curr_key] = u;
206 cell_pred_up[curr_key] = u;
213 bool Int64(int64_t i) { return true; }
214 bool Uint64(uint64_t u) { return true; }
215 bool Double(double d) { return true; }
216 bool String(const char* str, SizeType length, bool copy) {
220 bool StartObject() { return true; }
221 bool Key(const char* str, SizeType length, bool copy) {
231 bool EndObject(SizeType memberCount) { return true; }
232 bool StartArray() { return true; }
233 bool EndArray(SizeType elementCount) { return true; }
236 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
238 Assuming we receive the following payload from AD
239 [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
241 vector<string> prediction_ues;
242 string curr_key = "";
244 bool Key(const Ch* str, SizeType len, bool copy) {
249 bool String(const Ch* str, SizeType len, bool copy) {
250 // We are only interested in the "ue-id"
251 if ( curr_key.compare( "ue-id") == 0 ) {
252 prediction_ues.push_back( str );
258 struct NodebListHandler : public BaseReaderHandler<UTF8<>, NodebListHandler> {
259 vector<string> nodeb_list;
260 string curr_key = "";
262 bool Key(const Ch* str, SizeType length, bool copy) {
267 bool String(const Ch* str, SizeType length, bool copy) {
268 if( curr_key.compare( "inventoryName" ) == 0 ) {
269 nodeb_list.push_back( str );
275 struct NodebHandler : public BaseReaderHandler<UTF8<>, NodebHandler> {
276 string curr_key = "";
277 shared_ptr<nodeb_t> nodeb = make_shared<nodeb_t>();
279 bool Key(const Ch* str, SizeType length, bool copy) {
284 bool String(const Ch* str, SizeType length, bool copy) {
285 if( curr_key.compare( "ranName" ) == 0 ) {
286 nodeb->ran_name = str;
287 } else if( curr_key.compare( "plmnId" ) == 0 ) {
288 nodeb->global_nb_id.plmn_id = str;
289 } else if( curr_key.compare( "nbId" ) == 0 ) {
290 nodeb->global_nb_id.nb_id = str;
291 } else if( curr_key.compare( "cellId" ) == 0 ) {
292 cell_map[str] = nodeb;
300 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
301 unordered_map<string, string> cell_pred;
302 std::string serving_cell_id;
303 int serving_cell_rsrp;
304 int serving_cell_rsrq;
305 int serving_cell_sinr;
306 bool in_serving_array = false;
307 int rf_meas_index = 0;
309 bool in_serving_report_object = false;
311 string curr_key = "";
312 string curr_value = "";
313 bool Null() { return true; }
314 bool Bool(bool b) { return true; }
320 bool Uint(unsigned i) {
322 if (in_serving_report_object) {
323 if (curr_key.compare("rsrp") == 0) {
324 serving_cell_rsrp = i;
325 } else if (curr_key.compare("rsrq") == 0) {
326 serving_cell_rsrq = i;
327 } else if (curr_key.compare("rssinr") == 0) {
328 serving_cell_sinr = i;
333 bool Int64(int64_t i) {
336 bool Uint64(uint64_t i) {
339 bool Double(double d) { return true; }
340 bool String(const char* str, SizeType length, bool copy) {
342 if (curr_key.compare("ServingCellID") == 0) {
343 serving_cell_id = str;
349 if (curr_key.compare("ServingCellRF") == 0) {
350 in_serving_report_object = true;
354 bool Key(const char* str, SizeType length, bool copy) {
359 bool EndObject(SizeType memberCount) {
360 if (curr_key.compare("ServingCellRF") == 0) {
361 in_serving_report_object = false;
366 if (curr_key.compare("ServingCellRF") == 0) {
367 in_serving_array = true;
372 bool EndArray(SizeType elementCount) {
374 if (curr_key.compare("servingCellRF") == 0) {
375 in_serving_array = false;
383 /* unordered_map<string, UEData> get_sdl_ue_data() {
385 fprintf(stderr, "In get_sdl_ue_data()\n");
387 unordered_map<string, string> ue_data;
389 unordered_map<string, UEData> return_ue_data_map;
391 std::string prefix3="";
392 Keys K2 = sdl->findKeys(nsu, prefix3);
393 DataMap Dk2 = sdl->get(nsu, K2);
398 for(auto si=K2.begin();si!=K2.end();++si){
399 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
400 char val[val_v.size()+1]; // from Data
403 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
405 ue_id.assign((std::string)*si);
408 ue_data[ue_id] = ue_json;
411 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
412 UEDataHandler handler;
414 StringStream ss(map_iter->second.c_str());
415 reader.Parse(ss,handler);
417 string ueID = map_iter->first;
418 string serving_cell_id = handler.serving_cell_id;
419 int serv_rsrp = handler.serving_cell_rsrp;
421 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
425 return return_ue_data_map;
428 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
430 int response_to = 0; // max timeout wating for a response
431 int rmtype; // received message type
433 string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
435 cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length="<< len << "\n";
436 cout << "[INFO] Payload is " << arg << endl;
438 PolicyHandler handler;
440 StringStream ss(arg.c_str());
441 reader.Parse(ss,handler);
443 //Set the threshold value
444 if (handler.found_threshold) {
445 cout << "[INFO] Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
446 rsrp_threshold = handler.threshold;
451 // sends a handover message through REST
452 void send_rest_control_request( string ue_id, string serving_cell_id, string target_cell_id ) {
455 static unsigned int seq_number = 0; // static counter, not thread-safe
457 // building a handoff control message
458 now = time( nullptr );
459 str_now = ctime( &now );
460 str_now.pop_back(); // removing the \n character
462 seq_number++; // static counter, not thread-safe
464 rapidjson::StringBuffer s;
465 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
466 writer.StartObject();
467 writer.Key( "command" );
468 writer.String( "HandOff" );
469 writer.Key( "seqNo" );
470 writer.Int( seq_number );
472 writer.String( ue_id.c_str() );
473 writer.Key( "fromCell" );
474 writer.String( serving_cell_id.c_str() );
475 writer.Key( "toCell" );
476 writer.String( target_cell_id.c_str() );
477 writer.Key( "timestamp" );
478 writer.String( str_now.c_str() );
479 writer.Key( "reason" );
480 writer.String( "HandOff Control Request from TS xApp" );
484 // creates a message like
486 "command": "HandOff",
491 "timestamp": "Sat May 22 10:35:33 2021",
492 "reason": "HandOff Control Request from TS xApp",
496 string msg = s.GetString();
498 cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
499 cout << "[INFO] HandOff request is " << msg << endl;
502 restclient::RestClient client( ts_control_ep );
503 restclient::response_t resp = client.do_post( "", msg ); // we already have the full path in ts_control_ep
505 if( resp.status_code == 200 ) {
506 // ============== DO SOMETHING USEFUL HERE ===============
507 // Currently, we only print out the HandOff reply
508 rapidjson::Document document;
509 document.Parse( resp.body.c_str() );
510 rapidjson::StringBuffer s;
511 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
512 document.Accept( writer );
513 cout << "[INFO] HandOff reply is " << s.GetString() << endl;
516 cout << "[ERROR] Unexpected HTTP code " << resp.status_code << " from " << \
517 client.getBaseUrl() << \
518 "\n[ERROR] HTTP payload is " << resp.body.c_str() << endl;
523 // sends a handover message to RC xApp through gRPC
524 void send_grpc_control_request( string ue_id, string target_cell_id ) {
525 grpc::ClientContext context;
527 api::RicControlGrpcRsp response;
528 shared_ptr<api::RicControlGrpcReq> request = make_shared<api::RicControlGrpcReq>();
530 api::RICE2APHeader *apHeader = request->mutable_rice2apheaderdata();
531 apHeader->set_ranfuncid( 300 );
532 apHeader->set_ricrequestorid( 1001 );
534 api::RICControlHeader *ctrlHeader = request->mutable_riccontrolheaderdata();
535 ctrlHeader->set_controlstyle( 3 );
536 ctrlHeader->set_controlactionid( 1 );
537 ctrlHeader->set_ueid( ue_id );
539 api::RICControlMessage *ctrlMsg = request->mutable_riccontrolmessagedata();
540 ctrlMsg->set_riccontrolcelltypeval( api::RIC_CONTROL_CELL_UNKWON );
541 ctrlMsg->set_targetcellid( target_cell_id );
543 auto data = cell_map.find( target_cell_id );
544 if( data != cell_map.end() ) {
545 request->set_e2nodeid( data->second->global_nb_id.nb_id );
546 request->set_plmnid( data->second->global_nb_id.plmn_id );
547 request->set_ranname( data->second->ran_name );
549 request->set_e2nodeid( "unknown_e2nodeid" );
550 request->set_plmnid( "unknown_plmnid" );
551 request->set_ranname( "unknown_ranname" );
553 request->set_riccontrolackreqval( api::RIC_CONTROL_ACK_UNKWON ); // not yet used in api.proto
555 grpc::Status status = rc_stub->SendRICControlReqServiceGrpc( &context, *request, &response );
558 if( response.rspcode() == 0 ) {
559 cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
561 cout << "[ERROR] Control Request failed with code=" << response.rspcode()
562 << ", description=" << response.description() << endl;
566 cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
567 << status.error_code() << ", error_msg=" << status.error_message() << endl;
572 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
573 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
575 cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
576 cout << "[INFO] Payload is " << json << endl;
578 PredictionHandler handler;
581 StringStream ss(json.c_str());
582 reader.Parse(ss,handler);
584 cout << "[ERROR] Got an exception on stringstream read parse\n";
587 // We are only considering download throughput
588 unordered_map<string, int> throughput_map = handler.cell_pred_down;
590 // Decision about CONTROL message
591 // (1) Identify UE Id in Prediction message
592 // (2) Iterate through Prediction message.
593 // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
594 // We assume the first cell in the prediction message is the serving cell
596 int serving_cell_throughput = 0;
597 int highest_throughput = 0;
598 string highest_throughput_cell_id;
600 // Getting the current serving cell throughput prediction
601 auto cell = throughput_map.find( handler.serving_cell_id );
602 serving_cell_throughput = cell->second;
604 // Iterating to identify the highest throughput prediction
605 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
607 string curr_cellid = map_iter->first;
608 int curr_throughput = map_iter->second;
610 if ( highest_throughput < curr_throughput ) {
611 highest_throughput = curr_throughput;
612 highest_throughput_cell_id = curr_cellid;
617 if ( highest_throughput > serving_cell_throughput ) {
619 // sending a control request message
620 if ( ts_control_api == TsControlApi::REST ) {
621 send_rest_control_request( handler.ue_id, handler.serving_cell_id, highest_throughput_cell_id );
623 send_grpc_control_request( handler.ue_id, highest_throughput_cell_id );
627 cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
632 void send_prediction_request( vector<string> ues_to_predict ) {
633 std::unique_ptr<Message> msg;
634 Msg_component payload; // special type of unique pointer to the payload
639 Msg_component send_payload;
641 msg = xfw->Alloc_msg( 2048 );
643 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
645 fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
649 string ues_list = "[";
651 for (int i = 0; i < ues_to_predict.size(); i++) {
652 if (i == ues_to_predict.size() - 1) {
653 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
655 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
659 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
661 send_payload = msg->Get_payload(); // direct access to payload
662 snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
664 plen = strlen( (char *)send_payload.get() );
666 cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
668 // payload updated in place, nothing to copy from, so payload parm is nil
669 if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
670 fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
675 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
676 * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
677 * sends a prediction request to the QP Driver xApp.
679 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
680 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
682 cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
683 cout << "[INFO] Payload is " << json << "\n";
685 AnomalyHandler handler;
687 StringStream ss(json.c_str());
688 reader.Parse(ss,handler);
690 // just sending ACK to the AD xApp
691 mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
693 send_prediction_request(handler.prediction_ues);
696 vector<string> get_nodeb_list( restclient::RestClient& client ) {
698 restclient::response_t response = client.do_get( "/v1/nodeb/states" );
700 NodebListHandler handler;
701 if( response.status_code == 200 ) {
703 StringStream ss( response.body.c_str() );
704 reader.Parse( ss, handler );
706 cout << "[INFO] nodeb list is " << response.body.c_str() << endl;
709 if( response.body.empty() ) {
710 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() << endl;
712 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() <<
713 ". HTTP payload is " << response.body.c_str() << endl;
717 return handler.nodeb_list;
720 bool build_cell_mapping() {
722 char *data = getenv( "SERVICE_E2MGR_HTTP_BASE_URL" );
723 if ( data == NULL ) {
724 base_url = "http://service-ricplt-e2mgr-http.ricplt:3800";
726 base_url = string( data );
729 restclient::RestClient client( base_url );
731 vector<string> nb_list = get_nodeb_list( client );
733 for( string nb : nb_list ) {
734 string full_path = string("/v1/nodeb/") + nb;
735 restclient::response_t response = client.do_get( full_path );
736 if( response.status_code != 200 ) {
737 if( response.body.empty() ) {
738 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
739 client.getBaseUrl() + full_path << endl;
741 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
742 client.getBaseUrl() + full_path << ". HTTP payload is " << response.body.c_str() << endl;
748 NodebHandler handler;
750 StringStream ss( response.body.c_str() );
751 reader.Parse( ss, handler );
753 cout << "[ERROR] Got an exception on parsing nodeb (stringstream read parse)\n";
761 extern int main( int argc, char** argv ) {
763 char* port = (char *) "4560";
764 shared_ptr<grpc::Channel> channel;
766 Config *config = new Config();
767 string api = config->Get_control_str("ts_control_api");
768 ts_control_ep = config->Get_control_str("ts_control_ep");
770 cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
773 if ( api.compare("rest") == 0 ) {
774 ts_control_api = TsControlApi::REST;
776 ts_control_api = TsControlApi::gRPC;
778 if( !build_cell_mapping() ) {
779 cout << "[ERROR] unable to map cells to nodeb\n";
782 channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
783 rc_stub = api::MsgComm::NewStub(channel, grpc::StubOptions());
786 fprintf( stderr, "[TS xApp] listening on port %s\n", port );
787 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
789 xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
790 xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
791 xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
793 xfw->Run( nthreads );