Modified: 21 May 2021 (Alexandre Huff)
Update for traffic steering use case in release D.
+ 07 Dec 2021 (Alexandre Huff)
+ Update for traffic steering use case in release E.
*/
#include <stdio.h>
#include <curl/curl.h>
#include <rmr/RIC_message_types.h>
#include "ricxfcpp/xapp.hpp"
+#include "ricxfcpp/config.hpp"
+/*
+ FIXME unfortunately this RMR flag has to be disabled
+ due to name resolution conflicts.
+ RC xApp defines the same name for gRPC control messages.
+*/
+#undef RIC_CONTROL_ACK
-// Defines env name for the endpoint to POST handoff control messages
-#define ENV_CONTROL_URL "TS_CONTROL_URL"
+#include <grpc/grpc.h>
+#include <grpcpp/channel.h>
+#include <grpcpp/client_context.h>
+#include <grpcpp/create_channel.h>
+#include <grpcpp/security/credentials.h>
+#include "../../ext/protobuf/api.grpc.pb.h"
using namespace rapidjson;
// ----------------------------------------------------------
-
-// Stores the the URL to POST handoff control messages
-const char *ts_control_url;
-
std::unique_ptr<Xapp> xfw;
+std::unique_ptr<api::MsgComm::Stub> rc_stub;
int rsrp_threshold = 0;
+// scoped enum to identify which API is used to send control messages
+enum class TsControlApi { REST, gRPC };
+TsControlApi ts_control_api; // api to send control messages
+string ts_control_ep; // api target endpoint
+
/* struct UEData {
string serving_cell;
int serving_cell_rsrp;
int response_to = 0; // max timeout wating for a response
int rmtype; // received message type
- fprintf(stderr, "[INFO] Policy Callback got a message, type=%d, length=%d\n", mtype, len);
-
- const char *arg = (const char*)payload.get();
+ string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
- fprintf(stderr, "[INFO] Payload is %s\n", arg);
+ cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length="<< len << "\n";
+ cout << "[INFO] Payload is " << arg << endl;
PolicyHandler handler;
Reader reader;
- StringStream ss(arg);
+ StringStream ss(arg.c_str());
reader.Parse(ss,handler);
//Set the threshold value
if (handler.found_threshold) {
- fprintf(stderr, "[INFO] Setting RSRP Threshold to A1-P value: %d\n", handler.threshold);
+ cout << "[INFO] Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
rsrp_threshold = handler.threshold;
}
}
// sends a handover message through REST
-void send_handoff_request( string msg ) {
+void send_rest_control_request( string msg ) {
CURL *curl = curl_easy_init();
- curl_easy_setopt( curl, CURLOPT_URL, ts_control_url );
+ curl_easy_setopt( curl, CURLOPT_URL, ts_control_ep.c_str() );
curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
curl_easy_setopt( curl, CURLOPT_POST, 1L );
// curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
headers = curl_slist_append( headers, "Content-Type: application/json" );
curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
- cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_url << "\"\n";
+ cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
cout << "[INFO] HandOff request is " << msg << endl;
// sending request
} else if ( httpCode == 404 ) {
- cout << "[ERROR] HTTP 404 Not Found: " << ts_control_url << endl;
+ cout << "[ERROR] HTTP 404 Not Found: " << ts_control_ep << endl;
} else {
- cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_url << \
+ cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_ep << \
"\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
}
curl_easy_cleanup( curl );
}
+// sends a handover message to RC xApp through gRPC
+void send_grpc_control_request() {
+ grpc::ClientContext context;
+ api::RicControlGrpcReq *request = api::RicControlGrpcReq().New();
+ api::RicControlGrpcRsp response;
+
+ api::RICE2APHeader *apHeader = api::RICE2APHeader().New();
+ api::RICControlHeader *ctrlHeader = api::RICControlHeader().New();
+ api::RICControlMessage *ctrlMsg = api::RICControlMessage().New();
+
+ request->set_e2nodeid("e2nodeid");
+ request->set_plmnid("plmnid");
+ request->set_ranname("ranname");
+ request->set_allocated_rice2apheaderdata(apHeader);
+ request->set_allocated_riccontrolheaderdata(ctrlHeader);
+ request->set_allocated_riccontrolmessagedata(ctrlMsg);
+ request->set_riccontrolackreqval(api::RIC_CONTROL_ACK_UNKWON); // not yet used in api.proto
+
+ grpc::Status status = rc_stub->SendRICControlReqServiceGrpc(&context, *request, &response);
+
+ if(status.ok()) {
+ /*
+ TODO check if this is related to RICControlAckEnum
+ if yes, then ACK value should be 2 (RIC_CONTROL_ACK)
+ api.proto assumes that 0 is an ACK
+ */
+ if(response.rspcode() == 0) {
+ cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
+ } else {
+ cout << "[ERROR] Control Request failed with code=" << response.rspcode()
+ << ", description=" << response.description() << endl;
+ }
+
+ } else {
+ cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
+ << status.error_code() << ", error_msg=" << status.error_message() << endl;
+ }
+
+ // FIXME needs to check about memory likeage
+}
+
void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
time_t now;
int send_mtype = 0;
int rmtype; // received message type
- int delay = 1000000; // mu-sec delay; default 1s
+ int delay = 1000000; // mu-sec delay; default 1s
+
+ string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
- cout << "[INFO] Payload is " << payload.get() << endl;
+ cout << "[INFO] Payload is " << json << endl;
- const char* arg = (const char*)payload.get();
PredictionHandler handler;
-
try {
Reader reader;
- StringStream ss(arg);
+ StringStream ss(json.c_str());
reader.Parse(ss,handler);
} catch (...) {
cout << "[ERROR] Got an exception on stringstream read parse\n";
} */
// sending a control request message
- send_handoff_request( s.GetString() );
+ if ( ts_control_api == TsControlApi::REST ) {
+ send_rest_control_request( s.GetString() );
+ } else {
+ send_grpc_control_request();
+ }
} else {
cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
- const char *body = message_body.c_str();
-
send_payload = msg->Get_payload(); // direct access to payload
- snprintf( (char *) send_payload.get(), 2048, "%s", body );
+ snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
- /*
- we are sending a string, so we have to include the nil byte in the RMR message
- to keep things simple in the receiver side
- */
- plen = strlen( (char *) send_payload.get() ) + 1;
+ plen = strlen( (char *)send_payload.get() );
cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
* sends a prediction request to the QP Driver xApp.
*/
void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
- const char *json = (const char *) payload.get();
+ string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
cout << "[INFO] Payload is " << json << "\n";
AnomalyHandler handler;
Reader reader;
- StringStream ss(json);
+ StringStream ss(json.c_str());
reader.Parse(ss,handler);
// just sending ACK to the AD xApp
int nthreads = 1;
char* port = (char *) "4560";
-
- // ts_control_url = "http://127.0.0.1:5000/api/echo"; // echo-server in test/app/ directory
- if ( ( ts_control_url = getenv( ENV_CONTROL_URL ) ) == nullptr ) {
- cout << "[ERROR] TS_CONTROL_URL is not defined to POST handoff control messages" << endl;
- return 1;
+ shared_ptr<grpc::Channel> channel;
+
+ Config *config = new Config();
+ string api = config->Get_control_str("ts_control_api");
+ ts_control_ep = config->Get_control_str("ts_control_ep");
+ if ( api.empty() ) {
+ cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
+ exit(1);
}
+ if ( api.compare("rest") == 0 ) {
+ ts_control_api = TsControlApi::REST;
+ } else {
+ ts_control_api = TsControlApi::gRPC;
+ }
+
+ channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
+ rc_stub = api::MsgComm::NewStub(channel, grpc::StubOptions());
fprintf( stderr, "[TS xApp] listening on port %s\n", port );
xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );