3 ==================================================================================
4 Copyright (c) 2020 AT&T Intellectual Property.
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
22 Abstract: Traffic Steering xApp
24 2. Receives anomaly detection
25 3. Requests prediction for UE throughput on current and neighbor cells
26 4. Receives prediction
27 5. Optionally exercises Traffic Steering action over E2
32 Modified: 21 May 2021 (Alexandre Huff)
33 Update for traffic steering use case in release D.
34 07 Dec 2021 (Alexandre Huff)
35 Update for traffic steering use case in release E.
50 #include <unordered_map>
52 #include <rapidjson/document.h>
53 #include <rapidjson/writer.h>
54 #include <rapidjson/stringbuffer.h>
55 #include <rapidjson/schema.h>
56 #include <rapidjson/reader.h>
57 #include <rapidjson/prettywriter.h>
59 #include <curl/curl.h>
60 #include <rmr/RIC_message_types.h>
61 #include "ricxfcpp/xapp.hpp"
62 #include "ricxfcpp/config.hpp"
65 FIXME unfortunately this RMR flag has to be disabled
66 due to name resolution conflicts.
67 RC xApp defines the same name for gRPC control messages.
69 #undef RIC_CONTROL_ACK
71 #include <grpc/grpc.h>
72 #include <grpcpp/channel.h>
73 #include <grpcpp/client_context.h>
74 #include <grpcpp/create_channel.h>
75 #include <grpcpp/security/credentials.h>
76 #include "../../ext/protobuf/api.grpc.pb.h"
79 using namespace rapidjson;
83 using Namespace = std::string;
84 using Key = std::string;
85 using Data = std::vector<uint8_t>;
86 using DataMap = std::map<Key, Data>;
87 using Keys = std::set<Key>;
90 // ----------------------------------------------------------
91 std::unique_ptr<Xapp> xfw;
92 std::unique_ptr<api::MsgComm::Stub> rc_stub;
94 int rsrp_threshold = 0;
96 // scoped enum to identify which API is used to send control messages
97 enum class TsControlApi { REST, gRPC };
98 TsControlApi ts_control_api; // api to send control messages
99 string ts_control_ep; // api target endpoint
103 int serving_cell_rsrp;
106 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
107 unordered_map<string, string> cell_pred;
109 bool ue_id_found = false;
110 string curr_key = "";
111 string curr_value = "";
113 int policy_instance_id;
115 std::string operation;
116 bool found_threshold = false;
118 bool Null() { return true; }
119 bool Bool(bool b) { return true; }
122 if (curr_key.compare("policy_type_id") == 0) {
124 } else if (curr_key.compare("policy_instance_id") == 0) {
125 policy_instance_id = i;
126 } else if (curr_key.compare("threshold") == 0) {
127 found_threshold = true;
133 bool Uint(unsigned u) {
135 if (curr_key.compare("policy_type_id") == 0) {
137 } else if (curr_key.compare("policy_instance_id") == 0) {
138 policy_instance_id = u;
139 } else if (curr_key.compare("threshold") == 0) {
140 found_threshold = true;
146 bool Int64(int64_t i) { return true; }
147 bool Uint64(uint64_t u) { return true; }
148 bool Double(double d) { return true; }
149 bool String(const char* str, SizeType length, bool copy) {
151 if (curr_key.compare("operation") != 0) {
161 bool Key(const char* str, SizeType length, bool copy) {
167 bool EndObject(SizeType memberCount) { return true; }
168 bool StartArray() { return true; }
169 bool EndArray(SizeType elementCount) { return true; }
173 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
174 unordered_map<string, int> cell_pred_down;
175 unordered_map<string, int> cell_pred_up;
177 bool ue_id_found = false;
178 string curr_key = "";
179 string curr_value = "";
180 string serving_cell_id;
181 bool down_val = true;
182 bool Null() { return true; }
183 bool Bool(bool b) { return true; }
184 bool Int(int i) { return true; }
185 bool Uint(unsigned u) {
186 // Currently, we assume the first cell in the prediction message is the serving cell
187 if ( serving_cell_id.empty() ) {
188 serving_cell_id = curr_key;
192 cell_pred_down[curr_key] = u;
195 cell_pred_up[curr_key] = u;
202 bool Int64(int64_t i) { return true; }
203 bool Uint64(uint64_t u) { return true; }
204 bool Double(double d) { return true; }
205 bool String(const char* str, SizeType length, bool copy) {
209 bool StartObject() { return true; }
210 bool Key(const char* str, SizeType length, bool copy) {
220 bool EndObject(SizeType memberCount) { return true; }
221 bool StartArray() { return true; }
222 bool EndArray(SizeType elementCount) { return true; }
225 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
227 Assuming we receive the following payload from AD
228 [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
230 vector<string> prediction_ues;
231 string curr_key = "";
233 bool Key(const Ch* str, SizeType len, bool copy) {
238 bool String(const Ch* str, SizeType len, bool copy) {
239 // We are only interested in the "ue-id"
240 if ( curr_key.compare( "ue-id") == 0 ) {
241 prediction_ues.push_back( str );
248 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
249 unordered_map<string, string> cell_pred;
250 std::string serving_cell_id;
251 int serving_cell_rsrp;
252 int serving_cell_rsrq;
253 int serving_cell_sinr;
254 bool in_serving_array = false;
255 int rf_meas_index = 0;
257 bool in_serving_report_object = false;
259 string curr_key = "";
260 string curr_value = "";
261 bool Null() { return true; }
262 bool Bool(bool b) { return true; }
268 bool Uint(unsigned i) {
270 if (in_serving_report_object) {
271 if (curr_key.compare("rsrp") == 0) {
272 serving_cell_rsrp = i;
273 } else if (curr_key.compare("rsrq") == 0) {
274 serving_cell_rsrq = i;
275 } else if (curr_key.compare("rssinr") == 0) {
276 serving_cell_sinr = i;
281 bool Int64(int64_t i) {
284 bool Uint64(uint64_t i) {
287 bool Double(double d) { return true; }
288 bool String(const char* str, SizeType length, bool copy) {
290 if (curr_key.compare("ServingCellID") == 0) {
291 serving_cell_id = str;
297 if (curr_key.compare("ServingCellRF") == 0) {
298 in_serving_report_object = true;
302 bool Key(const char* str, SizeType length, bool copy) {
307 bool EndObject(SizeType memberCount) {
308 if (curr_key.compare("ServingCellRF") == 0) {
309 in_serving_report_object = false;
314 if (curr_key.compare("ServingCellRF") == 0) {
315 in_serving_array = true;
320 bool EndArray(SizeType elementCount) {
322 if (curr_key.compare("servingCellRF") == 0) {
323 in_serving_array = false;
331 /* unordered_map<string, UEData> get_sdl_ue_data() {
333 fprintf(stderr, "In get_sdl_ue_data()\n");
335 unordered_map<string, string> ue_data;
337 unordered_map<string, UEData> return_ue_data_map;
339 std::string prefix3="";
340 Keys K2 = sdl->findKeys(nsu, prefix3);
341 DataMap Dk2 = sdl->get(nsu, K2);
346 for(auto si=K2.begin();si!=K2.end();++si){
347 std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
348 char val[val_v.size()+1]; // from Data
351 for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
353 ue_id.assign((std::string)*si);
356 ue_data[ue_id] = ue_json;
359 for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
360 UEDataHandler handler;
362 StringStream ss(map_iter->second.c_str());
363 reader.Parse(ss,handler);
365 string ueID = map_iter->first;
366 string serving_cell_id = handler.serving_cell_id;
367 int serv_rsrp = handler.serving_cell_rsrp;
369 return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
373 return return_ue_data_map;
376 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
378 int response_to = 0; // max timeout wating for a response
379 int rmtype; // received message type
381 string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
383 cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length="<< len << "\n";
384 cout << "[INFO] Payload is " << arg << endl;
386 PolicyHandler handler;
388 StringStream ss(arg.c_str());
389 reader.Parse(ss,handler);
391 //Set the threshold value
392 if (handler.found_threshold) {
393 cout << "[INFO] Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
394 rsrp_threshold = handler.threshold;
397 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
398 mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
401 // callback to handle handover reply (json http response)
402 size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
403 const size_t totalBytes( size * num );
404 out->append( in, totalBytes );
408 // sends a handover message through REST
409 void send_rest_control_request( string msg ) {
410 CURL *curl = curl_easy_init();
411 curl_easy_setopt( curl, CURLOPT_URL, ts_control_ep.c_str() );
412 curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
413 curl_easy_setopt( curl, CURLOPT_POST, 1L );
414 // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
416 // response information
418 unique_ptr<string> httpData( new string() );
420 curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
421 curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
422 curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
424 struct curl_slist *headers = NULL; // needs to free this after easy perform
425 headers = curl_slist_append( headers, "Accept: application/json" );
426 headers = curl_slist_append( headers, "Content-Type: application/json" );
427 curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
429 cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
430 cout << "[INFO] HandOff request is " << msg << endl;
433 CURLcode res = curl_easy_perform( curl );
434 if( res != CURLE_OK ) {
435 cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
439 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
440 if( httpCode == 200 ) {
441 // ============== DO SOMETHING USEFUL HERE ===============
442 // Currently, we only print out the HandOff reply
443 rapidjson::Document document;
444 document.Parse( httpData.get()->c_str() );
445 rapidjson::StringBuffer s;
446 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
447 document.Accept( writer );
448 cout << "[INFO] HandOff reply is " << s.GetString() << endl;
451 } else if ( httpCode == 404 ) {
452 cout << "[ERROR] HTTP 404 Not Found: " << ts_control_ep << endl;
454 cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_ep << \
455 "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
460 curl_slist_free_all( headers );
461 curl_easy_cleanup( curl );
464 // sends a handover message to RC xApp through gRPC
465 void send_grpc_control_request() {
466 grpc::ClientContext context;
467 api::RicControlGrpcReq *request = api::RicControlGrpcReq().New();
468 api::RicControlGrpcRsp response;
470 api::RICE2APHeader *apHeader = api::RICE2APHeader().New();
471 api::RICControlHeader *ctrlHeader = api::RICControlHeader().New();
472 api::RICControlMessage *ctrlMsg = api::RICControlMessage().New();
474 request->set_e2nodeid("e2nodeid");
475 request->set_plmnid("plmnid");
476 request->set_ranname("ranname");
477 request->set_allocated_rice2apheaderdata(apHeader);
478 request->set_allocated_riccontrolheaderdata(ctrlHeader);
479 request->set_allocated_riccontrolmessagedata(ctrlMsg);
480 request->set_riccontrolackreqval(api::RIC_CONTROL_ACK_UNKWON); // not yet used in api.proto
482 grpc::Status status = rc_stub->SendRICControlReqServiceGrpc(&context, *request, &response);
486 TODO check if this is related to RICControlAckEnum
487 if yes, then ACK value should be 2 (RIC_CONTROL_ACK)
488 api.proto assumes that 0 is an ACK
490 if(response.rspcode() == 0) {
491 cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
493 cout << "[ERROR] Control Request failed with code=" << response.rspcode()
494 << ", description=" << response.description() << endl;
498 cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
499 << status.error_code() << ", error_msg=" << status.error_message() << endl;
502 // FIXME needs to check about memory likeage
505 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
509 static unsigned int seq_number = 0; // static counter, not thread-safe
511 int response_to = 0; // max timeout wating for a response
514 int rmtype; // received message type
515 int delay = 1000000; // mu-sec delay; default 1s
517 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
519 cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
520 cout << "[INFO] Payload is " << json << endl;
522 PredictionHandler handler;
525 StringStream ss(json.c_str());
526 reader.Parse(ss,handler);
528 cout << "[ERROR] Got an exception on stringstream read parse\n";
531 // We are only considering download throughput
532 unordered_map<string, int> throughput_map = handler.cell_pred_down;
534 // Decision about CONTROL message
535 // (1) Identify UE Id in Prediction message
536 // (2) Iterate through Prediction message.
537 // If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
538 // We assume the first cell in the prediction message is the serving cell
540 int serving_cell_throughput = 0;
541 int highest_throughput = 0;
542 string highest_throughput_cell_id;
544 // Getting the current serving cell throughput prediction
545 auto cell = throughput_map.find( handler.serving_cell_id );
546 serving_cell_throughput = cell->second;
548 // Iterating to identify the highest throughput prediction
549 for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
551 string curr_cellid = map_iter->first;
552 int curr_throughput = map_iter->second;
554 if ( highest_throughput < curr_throughput ) {
555 highest_throughput = curr_throughput;
556 highest_throughput_cell_id = curr_cellid;
561 if ( highest_throughput > serving_cell_throughput ) {
562 // building a handoff control message
563 now = time( nullptr );
564 str_now = ctime( &now );
565 str_now.pop_back(); // removing the \n character
567 seq_number++; // static counter, not thread-safe
569 rapidjson::StringBuffer s;
570 rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
571 writer.StartObject();
572 writer.Key( "command" );
573 writer.String( "HandOff" );
574 writer.Key( "seqNo" );
575 writer.Int( seq_number );
577 writer.String( handler.ue_id.c_str() );
578 writer.Key( "fromCell" );
579 writer.String( handler.serving_cell_id.c_str() );
580 writer.Key( "toCell" );
581 writer.String( highest_throughput_cell_id.c_str() );
582 writer.Key( "timestamp" );
583 writer.String( str_now.c_str() );
584 writer.Key( "reason" );
585 writer.String( "HandOff Control Request from TS xApp" );
589 // creates a message like
591 "command": "HandOff",
596 "timestamp": "Sat May 22 10:35:33 2021",
597 "reason": "HandOff Control Request from TS xApp",
601 // sending a control request message
602 if ( ts_control_api == TsControlApi::REST ) {
603 send_rest_control_request( s.GetString() );
605 send_grpc_control_request();
609 cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
612 // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
613 // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
616 void send_prediction_request( vector<string> ues_to_predict ) {
618 std::unique_ptr<Message> msg;
619 Msg_component payload; // special type of unique pointer to the payload
624 Msg_component send_payload;
626 msg = xfw->Alloc_msg( 2048 );
628 sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
630 fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
634 string ues_list = "[";
636 for (int i = 0; i < ues_to_predict.size(); i++) {
637 if (i == ues_to_predict.size() - 1) {
638 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
640 ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
644 string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
646 send_payload = msg->Get_payload(); // direct access to payload
647 snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
649 plen = strlen( (char *)send_payload.get() );
651 cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
653 // payload updated in place, nothing to copy from, so payload parm is nil
654 if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
655 fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
660 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
661 * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
662 * sends a prediction request to the QP Driver xApp.
664 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
665 string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
667 cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
668 cout << "[INFO] Payload is " << json << "\n";
670 AnomalyHandler handler;
672 StringStream ss(json.c_str());
673 reader.Parse(ss,handler);
675 // just sending ACK to the AD xApp
676 mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr ); // msg type 30004
678 // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
679 // if( handler.degradation < rsrp_threshold )
680 send_prediction_request(handler.prediction_ues);
683 extern int main( int argc, char** argv ) {
686 char* port = (char *) "4560";
687 shared_ptr<grpc::Channel> channel;
689 Config *config = new Config();
690 string api = config->Get_control_str("ts_control_api");
691 ts_control_ep = config->Get_control_str("ts_control_ep");
693 cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
696 if ( api.compare("rest") == 0 ) {
697 ts_control_api = TsControlApi::REST;
699 ts_control_api = TsControlApi::gRPC;
702 channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
703 rc_stub = api::MsgComm::NewStub(channel, grpc::StubOptions());
705 fprintf( stderr, "[TS xApp] listening on port %s\n", port );
706 xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
708 xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL ); // msg type 20010
709 xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL ); // msg type 30002
710 xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
712 xfw->Run( nthreads );