3 ==================================================================================
4 Copyright (c) 2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
22 Abstract: Traffic Steering xApp
24 2. Receives anomaly detection
25 3. Requests prediction for UE throughput on current and neighbor cells
26 4. Receives prediction
27 5. Optionally exercises Traffic Steering action over E2
32 Modified: 21 May 2021 (Alexandre Huff)
33 Update for traffic steering use case in release D.
34 07 Dec 2021 (Alexandre Huff)
35 Update for traffic steering use case in release E.
50 #include <unordered_map>
52 #include <rapidjson/document.h>
53 #include <rapidjson/writer.h>
54 #include <rapidjson/stringbuffer.h>
55 #include <rapidjson/schema.h>
56 #include <rapidjson/reader.h>
57 #include <rapidjson/prettywriter.h>
59 #include <rmr/RIC_message_types.h>
60 #include <ricxfcpp/xapp.hpp>
61 #include <ricxfcpp/config.hpp>
64 FIXME unfortunately this RMR flag has to be disabled
65 due to name resolution conflicts.
66 RC xApp defines the same name for gRPC control messages.
68 #undef RIC_CONTROL_ACK
70 #include <grpc/grpc.h>
71 #include <grpcpp/channel.h>
72 #include <grpcpp/client_context.h>
73 #include <grpcpp/create_channel.h>
74 #include <grpcpp/security/credentials.h>
75 #include "protobuf/rc.grpc.pb.h"
77 #include "utils/restclient.hpp"
80 using namespace rapidjson;
84 using Namespace = std::string;
85 using Key = std::string;
86 using Data = std::vector<uint8_t>;
87 using DataMap = std::map<Key, Data>;
88 using Keys = std::set<Key>;
91 // ----------------------------------------------------------
92 std::unique_ptr<Xapp> xfw;
93 std::unique_ptr<rc::MsgComm::Stub> rc_stub;
95 int downlink_threshold = 0; // A1 policy type 20008 (in percentage)
97 // scoped enum to identify which API is used to send control messages
98 enum class TsControlApi { REST, gRPC };
99 TsControlApi ts_control_api; // api to send control messages
100 string ts_control_ep; // api target endpoint
102 typedef struct nodeb {
110 unordered_map<string, shared_ptr<nodeb_t>> cell_map; // maps each cell to its nodeb
114 int serving_cell_rsrp;
117 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
119 Assuming we receive the following payload from A1 Mediator
120 {"operation": "CREATE", "policy_type_id": 20008, "policy_instance_id": "tsapolicy145", "payload": {"threshold": 5}}
122 unordered_map<string, string> cell_pred;
124 bool ue_id_found = false;
125 string curr_key = "";
126 string curr_value = "";
128 int policy_instance_id;
130 std::string operation;
131 bool found_threshold = false;
133 bool Null() { return true; }
134 bool Bool(bool b) { return true; }
137 if (curr_key.compare("policy_type_id") == 0) {
139 } else if (curr_key.compare("policy_instance_id") == 0) {
140 policy_instance_id = i;
141 } else if (curr_key.compare("threshold") == 0) {
142 found_threshold = true;
148 bool Uint(unsigned u) {
150 if (curr_key.compare("policy_type_id") == 0) {
152 } else if (curr_key.compare("policy_instance_id") == 0) {
153 policy_instance_id = u;
154 } else if (curr_key.compare("threshold") == 0) {
155 found_threshold = true;
161 bool Int64(int64_t i) { return true; }
162 bool Uint64(uint64_t u) { return true; }
163 bool Double(double d) { return true; }
164 bool String(const char* str, SizeType length, bool copy) {
166 if (curr_key.compare("operation") != 0) {
176 bool Key(const char* str, SizeType length, bool copy) {
182 bool EndObject(SizeType memberCount) { return true; }
183 bool StartArray() { return true; }
184 bool EndArray(SizeType elementCount) { return true; }
188 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
189 unordered_map<string, int> cell_pred_down;
190 unordered_map<string, int> cell_pred_up;
192 bool ue_id_found = false;
193 string curr_key = "";
194 string curr_value = "";
195 string serving_cell_id;
196 bool down_val = true;
197 bool Null() { return true; }
198 bool Bool(bool b) { return true; }
199 bool Int(int i) { return true; }
200 bool Uint(unsigned u) {
201 // Currently, we assume the first cell in the prediction message is the serving cell
202 if ( serving_cell_id.empty() ) {
203 serving_cell_id = curr_key;
207 cell_pred_down[curr_key] = u;
210 cell_pred_up[curr_key] = u;
217 bool Int64(int64_t i) { return true; }
218 bool Uint64(uint64_t u) { return true; }
219 bool Double(double d) { return true; }
220 bool String(const char* str, SizeType length, bool copy) {
224 bool StartObject() { return true; }
225 bool Key(const char* str, SizeType length, bool copy) {
235 bool EndObject(SizeType memberCount) { return true; }
236 bool StartArray() { return true; }
237 bool EndArray(SizeType elementCount) { return true; }
240 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
242 Assuming we receive the following payload from AD
243 [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
245 vector<string> prediction_ues;
246 string curr_key = "";
248 bool Key(const Ch* str, SizeType len, bool copy) {
253 bool String(const Ch* str, SizeType len, bool copy) {
254 // We are only interested in the "ue-id"
255 if ( curr_key.compare( "ue-id") == 0 ) {
256 prediction_ues.push_back( str );
262 struct NodebListHandler : public BaseReaderHandler<UTF8<>, NodebListHandler> {
263 vector<string> nodeb_list;
264 string curr_key = "";
266 bool Key(const Ch* str, SizeType length, bool copy) {
271 bool String(const Ch* str, SizeType length, bool copy) {
272 if( curr_key.compare( "inventoryName" ) == 0 ) {
273 nodeb_list.push_back( str );
279 struct NodebHandler : public BaseReaderHandler<UTF8<>, NodebHandler> {
280 string curr_key = "";
281 shared_ptr<nodeb_t> nodeb = make_shared<nodeb_t>();
283 bool Key(const Ch* str, SizeType length, bool copy) {
288 bool String(const Ch* str, SizeType length, bool copy) {
289 if( curr_key.compare( "ranName" ) == 0 ) {
290 nodeb->ran_name = str;
291 } else if( curr_key.compare( "plmnId" ) == 0 ) {
292 nodeb->global_nb_id.plmn_id = str;
293 } else if( curr_key.compare( "nbId" ) == 0 ) {
294 nodeb->global_nb_id.nb_id = str;
295 } else if( curr_key.compare( "cellId" ) == 0 ) {
296 cell_map[str] = nodeb;
304 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
305 unordered_map<string, string> cell_pred;
306 std::string serving_cell_id;
307 int serving_cell_rsrp;
308 int serving_cell_rsrq;
309 int serving_cell_sinr;
310 bool in_serving_array = false;
311 int rf_meas_index = 0;
313 bool in_serving_report_object = false;
315 string curr_key = "";
316 string curr_value = "";
317 bool Null() { return true; }
318 bool Bool(bool b) { return true; }
324 bool Uint(unsigned i) {
326 if (in_serving_report_object) {
327 if (curr_key.compare("rsrp") == 0) {
328 serving_cell_rsrp = i;
329 } else if (curr_key.compare("rsrq") == 0) {
330 serving_cell_rsrq = i;
331 } else if (curr_key.compare("rssinr") == 0) {
332 serving_cell_sinr = i;
337 bool Int64(int64_t i) {
340 bool Uint64(uint64_t i) {
343 bool Double(double d) { return true; }
344 bool String(const char* str, SizeType length, bool copy) {
346 if (curr_key.compare("ServingCellID") == 0) {
347 serving_cell_id = str;
353 if (curr_key.compare("ServingCellRF") == 0) {
354 in_serving_report_object = true;
358 bool Key(const char* str, SizeType length, bool copy) {
363 bool EndObject(SizeType memberCount) {
364 if (curr_key.compare("ServingCellRF") == 0) {
365 in_serving_report_object = false;
370 if (curr_key.compare("ServingCellRF") == 0) {
371 in_serving_array = true;
376 bool EndArray(SizeType elementCount) {
378 if (curr_key.compare("servingCellRF") == 0) {
379 in_serving_array = false;
387 /* unordered_map<string, UEData> get_sdl_ue_data() {
389 fprintf(stderr, "In get_sdl_ue_data()\n");
391 unordered_map<string, string> ue_data;
393 unordered_map<string, UEData> return_ue_data_map;
395 std::string prefix3="";
396 Keys K2 = sdl->findKeys(nsu, prefix3);
397 DataMap Dk2 = sdl->get(nsu, K2);
402 for(auto si=K2.begin();si!=K2.end();++si){
403 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
404 char val[val_v.size()+1]; // from Data
407 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
409 ue_id.assign((std::string)*si);
412 ue_data[ue_id] = ue_json;
415 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
416 UEDataHandler handler;
418 StringStream ss(map_iter->second.c_str());
419 reader.Parse(ss,handler);
421 string ueID = map_iter->first;
422 string serving_cell_id = handler.serving_cell_id;
423 int serv_rsrp = handler.serving_cell_rsrp;
425 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
429 return return_ue_data_map;
432 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
433 string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
435 cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length=" << len << "\n";
436 cout << "[INFO] Payload is " << arg << endl;
438 PolicyHandler handler;
440 StringStream ss(arg.c_str());
441 reader.Parse(ss,handler);
443 //Set the threshold value
444 if (handler.found_threshold) {
445 cout << "[INFO] Setting Threshold for A1-P value: " << handler.threshold << "%\n";
446 downlink_threshold = handler.threshold;
451 // sends a handover message through REST
452 void send_rest_control_request( string ue_id, string serving_cell_id, string target_cell_id ) {
455 static unsigned int seq_number = 0; // static counter, not thread-safe
457 // building a handoff control message
458 now = time( nullptr );
459 str_now = ctime( &now );
460 str_now.pop_back(); // removing the \n character
462 seq_number++; // static counter, not thread-safe
464 rapidjson::StringBuffer s;
465 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
466 writer.StartObject();
467 writer.Key( "command" );
468 writer.String( "HandOff" );
469 writer.Key( "seqNo" );
470 writer.Int( seq_number );
472 writer.String( ue_id.c_str() );
473 writer.Key( "fromCell" );
474 writer.String( serving_cell_id.c_str() );
475 writer.Key( "toCell" );
476 writer.String( target_cell_id.c_str() );
477 writer.Key( "timestamp" );
478 writer.String( str_now.c_str() );
479 writer.Key( "reason" );
480 writer.String( "HandOff Control Request from TS xApp" );
484 // creates a message like
486 "command": "HandOff",
491 "timestamp": "Sat May 22 10:35:33 2021",
492 "reason": "HandOff Control Request from TS xApp",
496 string msg = s.GetString();
498 cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
499 cout << "[INFO] HandOff request is " << msg << endl;
502 restclient::RestClient client( ts_control_ep );
503 restclient::response_t resp = client.do_post( "", msg ); // we already have the full path in ts_control_ep
505 if( resp.status_code == 200 ) {
506 // ============== DO SOMETHING USEFUL HERE ===============
507 // Currently, we only print out the HandOff reply
508 rapidjson::Document document;
509 document.Parse( resp.body.c_str() );
510 rapidjson::StringBuffer s;
511 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
512 document.Accept( writer );
513 cout << "[INFO] HandOff reply is " << s.GetString() << endl;
516 cout << "[ERROR] Unexpected HTTP code " << resp.status_code << " from " << \
517 client.getBaseUrl() << \
518 "\n[ERROR] HTTP payload is " << resp.body.c_str() << endl;
523 // sends a handover message to RC xApp through gRPC
524 void send_grpc_control_request( string ue_id, string target_cell_id ) {
525 grpc::ClientContext context;
527 rc::RicControlGrpcRsp response;
528 shared_ptr<rc::RicControlGrpcReq> request = make_shared<rc::RicControlGrpcReq>();
530 rc::RICE2APHeader *apHeader = request->mutable_rice2apheaderdata();
531 apHeader->set_ranfuncid( 300 );
532 apHeader->set_ricrequestorid( 1001 );
534 rc::RICControlHeader *ctrlHeader = request->mutable_riccontrolheaderdata();
535 ctrlHeader->set_controlstyle( 3 );
536 ctrlHeader->set_controlactionid( 1 );
537 ctrlHeader->set_ueid( ue_id );
539 rc::RICControlMessage *ctrlMsg = request->mutable_riccontrolmessagedata();
540 ctrlMsg->set_riccontrolcelltypeval( rc::RIC_CONTROL_CELL_UNKWON );
541 ctrlMsg->set_targetcellid( target_cell_id );
543 auto data = cell_map.find( target_cell_id );
544 if( data != cell_map.end() ) {
545 request->set_e2nodeid( data->second->global_nb_id.nb_id );
546 request->set_plmnid( data->second->global_nb_id.plmn_id );
547 request->set_ranname( data->second->ran_name );
549 request->set_e2nodeid( "unknown_e2nodeid" );
550 request->set_plmnid( "unknown_plmnid" );
551 request->set_ranname( "unknown_ranname" );
553 request->set_riccontrolackreqval( rc::RIC_CONTROL_ACK_UNKWON ); // not yet used in api.proto
555 grpc::Status status = rc_stub->SendRICControlReqServiceGrpc( &context, *request, &response );
558 if( response.rspcode() == 0 ) {
559 cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
561 cout << "[ERROR] Control Request failed with code=" << response.rspcode()
562 << ", description=" << response.description() << endl;
566 cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
567 << status.error_code() << ", error_msg=" << status.error_message() << endl;
572 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
573 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
575 cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
576 cout << "[INFO] Payload is " << json << endl;
578 PredictionHandler handler;
581 StringStream ss(json.c_str());
582 reader.Parse(ss,handler);
584 cout << "[ERROR] Got an exception on stringstream read parse\n";
587 // We are only considering download throughput
588 unordered_map<string, int> throughput_map = handler.cell_pred_down;
590 // Decision about CONTROL message
591 // (1) Identify UE Id in Prediction message
592 // (2) Iterate through Prediction message.
593 // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
594 // We assume the first cell in the prediction message is the serving cell
596 int serving_cell_throughput = 0;
597 int highest_throughput = 0;
598 string highest_throughput_cell_id;
600 // Getting the current serving cell throughput prediction
601 auto cell = throughput_map.find( handler.serving_cell_id );
602 serving_cell_throughput = cell->second;
604 // Iterating to identify the highest throughput prediction
605 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
607 string curr_cellid = map_iter->first;
608 int curr_throughput = map_iter->second;
610 if ( highest_throughput < curr_throughput ) {
611 highest_throughput = curr_throughput;
612 highest_throughput_cell_id = curr_cellid;
618 if( downlink_threshold > 0 ) { // we also take into account the threshold in A1 policy type 20008
619 thresh = serving_cell_throughput * (downlink_threshold / 100.0);
622 if ( highest_throughput > ( serving_cell_throughput + thresh ) ) {
624 // sending a control request message
625 if ( ts_control_api == TsControlApi::REST ) {
626 send_rest_control_request( handler.ue_id, handler.serving_cell_id, highest_throughput_cell_id );
628 send_grpc_control_request( handler.ue_id, highest_throughput_cell_id );
632 cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
637 void send_prediction_request( vector<string> ues_to_predict ) {
638 std::unique_ptr<Message> msg;
639 Msg_component payload; // special type of unique pointer to the payload
644 Msg_component send_payload;
646 msg = xfw->Alloc_msg( 2048 );
648 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
650 fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
654 string ues_list = "[";
656 for (int i = 0; i < ues_to_predict.size(); i++) {
657 if (i == ues_to_predict.size() - 1) {
658 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
660 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
664 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
666 send_payload = msg->Get_payload(); // direct access to payload
667 snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
669 plen = strlen( (char *)send_payload.get() );
671 cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
673 // payload updated in place, nothing to copy from, so payload parm is nil
674 if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
675 fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
680 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
681 * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
682 * sends a prediction request to the QP Driver xApp.
684 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
685 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
687 cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
688 cout << "[INFO] Payload is " << json << "\n";
690 AnomalyHandler handler;
692 StringStream ss(json.c_str());
693 reader.Parse(ss,handler);
695 // just sending ACK to the AD xApp
696 mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
698 send_prediction_request(handler.prediction_ues);
701 vector<string> get_nodeb_list( restclient::RestClient& client ) {
703 restclient::response_t response = client.do_get( "/v1/nodeb/states" );
705 NodebListHandler handler;
706 if( response.status_code == 200 ) {
708 StringStream ss( response.body.c_str() );
709 reader.Parse( ss, handler );
711 cout << "[INFO] nodeb list is " << response.body.c_str() << endl;
714 if( response.body.empty() ) {
715 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() << endl;
717 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() <<
718 ". HTTP payload is " << response.body.c_str() << endl;
722 return handler.nodeb_list;
725 bool build_cell_mapping() {
727 char *data = getenv( "SERVICE_E2MGR_HTTP_BASE_URL" );
728 if ( data == NULL ) {
729 base_url = "http://service-ricplt-e2mgr-http.ricplt:3800";
731 base_url = string( data );
734 restclient::RestClient client( base_url );
736 vector<string> nb_list = get_nodeb_list( client );
738 for( string nb : nb_list ) {
739 string full_path = string("/v1/nodeb/") + nb;
740 restclient::response_t response = client.do_get( full_path );
741 if( response.status_code != 200 ) {
742 if( response.body.empty() ) {
743 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
744 client.getBaseUrl() + full_path << endl;
746 cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
747 client.getBaseUrl() + full_path << ". HTTP payload is " << response.body.c_str() << endl;
753 NodebHandler handler;
755 StringStream ss( response.body.c_str() );
756 reader.Parse( ss, handler );
758 cout << "[ERROR] Got an exception on parsing nodeb (stringstream read parse)\n";
766 extern int main( int argc, char** argv ) {
768 char* port = (char *) "4560";
769 shared_ptr<grpc::Channel> channel;
771 Config *config = new Config();
772 string api = config->Get_control_str("ts_control_api");
773 ts_control_ep = config->Get_control_str("ts_control_ep");
775 cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
778 if ( api.compare("rest") == 0 ) {
779 ts_control_api = TsControlApi::REST;
781 ts_control_api = TsControlApi::gRPC;
783 if( !build_cell_mapping() ) {
784 cout << "[ERROR] unable to map cells to nodeb\n";
787 channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
788 rc_stub = rc::MsgComm::NewStub(channel, grpc::StubOptions());
791 fprintf( stderr, "[TS xApp] listening on port %s\n", port );
792 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
794 xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
795 xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
796 xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
798 xfw->Run( nthreads );