Update TS xApp for Release E use case
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*
21         Mnemonic:       ts_xapp.cpp
22         Abstract:       Traffic Steering xApp
23                    1. Receives A1 Policy
24                                2. Receives anomaly detection
25                                3. Requests prediction for UE throughput on current and neighbor cells
26                                4. Receives prediction
27                                5. Optionally exercises Traffic Steering action over E2
28
29         Date:     22 April 2020
30         Author:         Ron Shacham
31
32   Modified: 21 May 2021 (Alexandre Huff)
33             Update for traffic steering use case in release D.
34             07 Dec 2021 (Alexandre Huff)
35             Update for traffic steering use case in release E.
36 */
37
38 #include <stdio.h>
39 #include <string.h>
40 #include <unistd.h>
41
42 #include <thread>
43 #include <iostream>
44 #include <memory>
45
46 #include <set>
47 #include <map>
48 #include <vector>
49 #include <string>
50 #include <unordered_map>
51
52 #include <rapidjson/document.h>
53 #include <rapidjson/writer.h>
54 #include <rapidjson/stringbuffer.h>
55 #include <rapidjson/schema.h>
56 #include <rapidjson/reader.h>
57 #include <rapidjson/prettywriter.h>
58
59 #include <curl/curl.h>
60 #include <rmr/RIC_message_types.h>
61 #include "ricxfcpp/xapp.hpp"
62 #include "ricxfcpp/config.hpp"
63
64 /*
65   FIXME unfortunately this RMR flag has to be disabled
66   due to name resolution conflicts.
67   RC xApp defines the same name for gRPC control messages.
68 */
69 #undef RIC_CONTROL_ACK
70
71 #include <grpc/grpc.h>
72 #include <grpcpp/channel.h>
73 #include <grpcpp/client_context.h>
74 #include <grpcpp/create_channel.h>
75 #include <grpcpp/security/credentials.h>
76 #include "../../ext/protobuf/api.grpc.pb.h"
77
78
79 using namespace rapidjson;
80 using namespace std;
81 using namespace xapp;
82
83 using Namespace = std::string;
84 using Key = std::string;
85 using Data = std::vector<uint8_t>;
86 using DataMap = std::map<Key, Data>;
87 using Keys = std::set<Key>;
88
89
90 // ----------------------------------------------------------
91 std::unique_ptr<Xapp> xfw;
92 std::unique_ptr<api::MsgComm::Stub> rc_stub;
93
94 int rsrp_threshold = 0;
95
96 // scoped enum to identify which API is used to send control messages
97 enum class TsControlApi { REST, gRPC };
98 TsControlApi ts_control_api;  // api to send control messages
99 string ts_control_ep;         // api target endpoint
100
101 /* struct UEData {
102   string serving_cell;
103   int serving_cell_rsrp;
104 }; */
105
106 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
107   unordered_map<string, string> cell_pred;
108   std::string ue_id;
109   bool ue_id_found = false;
110   string curr_key = "";
111   string curr_value = "";
112   int policy_type_id;
113   int policy_instance_id;
114   int threshold;
115   std::string operation;
116   bool found_threshold = false;
117
118   bool Null() { return true; }
119   bool Bool(bool b) { return true; }
120   bool Int(int i) {
121
122     if (curr_key.compare("policy_type_id") == 0) {
123       policy_type_id = i;
124     } else if (curr_key.compare("policy_instance_id") == 0) {
125       policy_instance_id = i;
126     } else if (curr_key.compare("threshold") == 0) {
127       found_threshold = true;
128       threshold = i;
129     }
130
131     return true;
132   }
133   bool Uint(unsigned u) {
134
135     if (curr_key.compare("policy_type_id") == 0) {
136       policy_type_id = u;
137     } else if (curr_key.compare("policy_instance_id") == 0) {
138       policy_instance_id = u;
139     } else if (curr_key.compare("threshold") == 0) {
140       found_threshold = true;
141       threshold = u;
142     }
143
144     return true;
145   }
146   bool Int64(int64_t i) {  return true; }
147   bool Uint64(uint64_t u) {  return true; }
148   bool Double(double d) {  return true; }
149   bool String(const char* str, SizeType length, bool copy) {
150
151     if (curr_key.compare("operation") != 0) {
152       operation = str;
153     }
154
155     return true;
156   }
157   bool StartObject() {
158
159     return true;
160   }
161   bool Key(const char* str, SizeType length, bool copy) {
162
163     curr_key = str;
164
165     return true;
166   }
167   bool EndObject(SizeType memberCount) {  return true; }
168   bool StartArray() {  return true; }
169   bool EndArray(SizeType elementCount) {  return true; }
170
171 };
172
173 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
174   unordered_map<string, int> cell_pred_down;
175   unordered_map<string, int> cell_pred_up;
176   std::string ue_id;
177   bool ue_id_found = false;
178   string curr_key = "";
179   string curr_value = "";
180   string serving_cell_id;
181   bool down_val = true;
182   bool Null() {  return true; }
183   bool Bool(bool b) {  return true; }
184   bool Int(int i) {  return true; }
185   bool Uint(unsigned u) {
186     // Currently, we assume the first cell in the prediction message is the serving cell
187     if ( serving_cell_id.empty() ) {
188       serving_cell_id = curr_key;
189     }
190
191     if (down_val) {
192       cell_pred_down[curr_key] = u;
193       down_val = false;
194     } else {
195       cell_pred_up[curr_key] = u;
196       down_val = true;
197     }
198
199     return true;
200
201   }
202   bool Int64(int64_t i) {  return true; }
203   bool Uint64(uint64_t u) {  return true; }
204   bool Double(double d) {  return true; }
205   bool String(const char* str, SizeType length, bool copy) {
206
207     return true;
208   }
209   bool StartObject() {  return true; }
210   bool Key(const char* str, SizeType length, bool copy) {
211     if (!ue_id_found) {
212
213       ue_id = str;
214       ue_id_found = true;
215     } else {
216       curr_key = str;
217     }
218     return true;
219   }
220   bool EndObject(SizeType memberCount) {  return true; }
221   bool StartArray() {  return true; }
222   bool EndArray(SizeType elementCount) {  return true; }
223 };
224
225 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
226   /*
227     Assuming we receive the following payload from AD
228     [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
229   */
230   vector<string> prediction_ues;
231   string curr_key = "";
232
233   bool Key(const Ch* str, SizeType len, bool copy) {
234     curr_key = str;
235     return true;
236   }
237
238   bool String(const Ch* str, SizeType len, bool copy) {
239     // We are only interested in the "ue-id"
240     if ( curr_key.compare( "ue-id") == 0 ) {
241       prediction_ues.push_back( str );
242     }
243     return true;
244   }
245 };
246
247
248 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
249   unordered_map<string, string> cell_pred;
250   std::string serving_cell_id;
251   int serving_cell_rsrp;
252   int serving_cell_rsrq;
253   int serving_cell_sinr;
254   bool in_serving_array = false;
255   int rf_meas_index = 0;
256
257   bool in_serving_report_object = false;
258
259   string curr_key = "";
260   string curr_value = "";
261   bool Null() { return true; }
262   bool Bool(bool b) { return true; }
263   bool Int(int i) {
264
265     return true;
266   }
267
268   bool Uint(unsigned i) {
269
270     if (in_serving_report_object) {
271       if (curr_key.compare("rsrp") == 0) {
272         serving_cell_rsrp = i;
273       } else if (curr_key.compare("rsrq") == 0) {
274         serving_cell_rsrq = i;
275       } else if (curr_key.compare("rssinr") == 0) {
276         serving_cell_sinr = i;
277       }
278     }
279
280     return true; }
281   bool Int64(int64_t i) {
282
283     return true; }
284   bool Uint64(uint64_t i) {
285
286     return true; }
287   bool Double(double d) { return true; }
288   bool String(const char* str, SizeType length, bool copy) {
289
290     if (curr_key.compare("ServingCellID") == 0) {
291       serving_cell_id = str;
292     }
293
294     return true;
295   }
296   bool StartObject() {
297     if (curr_key.compare("ServingCellRF") == 0) {
298       in_serving_report_object = true;
299     }
300
301     return true; }
302   bool Key(const char* str, SizeType length, bool copy) {
303
304     curr_key = str;
305     return true;
306   }
307   bool EndObject(SizeType memberCount) {
308     if (curr_key.compare("ServingCellRF") == 0) {
309       in_serving_report_object = false;
310     }
311     return true; }
312   bool StartArray() {
313
314     if (curr_key.compare("ServingCellRF") == 0) {
315       in_serving_array = true;
316     }
317
318     return true;
319   }
320   bool EndArray(SizeType elementCount) {
321
322     if (curr_key.compare("servingCellRF") == 0) {
323       in_serving_array = false;
324       rf_meas_index = 0;
325     }
326
327     return true; }
328 }; */
329
330
331 /* unordered_map<string, UEData> get_sdl_ue_data() {
332
333   fprintf(stderr, "In get_sdl_ue_data()\n");
334
335   unordered_map<string, string> ue_data;
336
337   unordered_map<string, UEData> return_ue_data_map;
338
339   std::string prefix3="";
340   Keys K2 = sdl->findKeys(nsu, prefix3);
341   DataMap Dk2 = sdl->get(nsu, K2);
342
343   string ue_json;
344   string ue_id;
345
346   for(auto si=K2.begin();si!=K2.end();++si){
347     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
348     char val[val_v.size()+1];                               // from Data
349     int i;
350
351     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
352     val[i]='\0';
353       ue_id.assign((std::string)*si);
354
355       ue_json.assign(val);
356       ue_data[ue_id] =  ue_json;
357   }
358
359   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
360     UEDataHandler handler;
361     Reader reader;
362     StringStream ss(map_iter->second.c_str());
363     reader.Parse(ss,handler);
364
365     string ueID = map_iter->first;
366     string serving_cell_id = handler.serving_cell_id;
367     int serv_rsrp = handler.serving_cell_rsrp;
368
369     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
370
371   }
372
373   return return_ue_data_map;
374 } */
375
376 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
377
378   int response_to = 0;   // max timeout wating for a response
379   int rmtype;           // received message type
380
381   string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
382
383   cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length="<< len << "\n";
384   cout << "[INFO] Payload is " << arg << endl;
385
386   PolicyHandler handler;
387   Reader reader;
388   StringStream ss(arg.c_str());
389   reader.Parse(ss,handler);
390
391   //Set the threshold value
392   if (handler.found_threshold) {
393     cout << "[INFO] Setting RSRP Threshold to A1-P value: " << handler.threshold << endl;
394     rsrp_threshold = handler.threshold;
395   }
396
397   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );  // validate that we can use the same buffer for 2 rts calls
398   mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
399 }
400
401 // callback to handle handover reply (json http response)
402 size_t handoff_reply_callback( const char *in, size_t size, size_t num, string *out ) {
403   const size_t totalBytes( size * num );
404   out->append( in, totalBytes );
405   return totalBytes;
406 }
407
408 // sends a handover message through REST
409 void send_rest_control_request( string msg ) {
410   CURL *curl = curl_easy_init();
411   curl_easy_setopt( curl, CURLOPT_URL, ts_control_ep.c_str() );
412   curl_easy_setopt( curl, CURLOPT_TIMEOUT, 10 );
413   curl_easy_setopt( curl, CURLOPT_POST, 1L );
414   // curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
415
416   // response information
417   long httpCode( 0 );
418   unique_ptr<string> httpData( new string() );
419
420   curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, handoff_reply_callback );
421   curl_easy_setopt( curl, CURLOPT_WRITEDATA, httpData.get());
422   curl_easy_setopt( curl, CURLOPT_POSTFIELDS, msg.c_str() );
423
424   struct curl_slist *headers = NULL;  // needs to free this after easy perform
425   headers = curl_slist_append( headers, "Accept: application/json" );
426   headers = curl_slist_append( headers, "Content-Type: application/json" );
427   curl_easy_setopt( curl, CURLOPT_HTTPHEADER, headers );
428
429   cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
430   cout << "[INFO] HandOff request is " << msg << endl;
431
432   // sending request
433   CURLcode res = curl_easy_perform( curl );
434   if( res != CURLE_OK ) {
435     cout << "[ERROR] curl_easy_perform() failed: " << curl_easy_strerror( res ) << endl;
436
437   } else {
438
439     curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
440     if( httpCode == 200 ) {
441       // ============== DO SOMETHING USEFUL HERE ===============
442       // Currently, we only print out the HandOff reply
443       rapidjson::Document document;
444       document.Parse( httpData.get()->c_str() );
445       rapidjson::StringBuffer s;
446             rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
447       document.Accept( writer );
448       cout << "[INFO] HandOff reply is " << s.GetString() << endl;
449
450
451     } else if ( httpCode == 404 ) {
452       cout << "[ERROR] HTTP 404 Not Found: " << ts_control_ep << endl;
453     } else {
454       cout << "[ERROR] Unexpected HTTP code " << httpCode << " from " << ts_control_ep << \
455               "\n[ERROR] HTTP payload is " << httpData.get()->c_str() << endl;
456     }
457
458   }
459
460   curl_slist_free_all( headers );
461   curl_easy_cleanup( curl );
462 }
463
464 // sends a handover message to RC xApp through gRPC
465 void send_grpc_control_request() {
466   grpc::ClientContext context;
467   api::RicControlGrpcReq *request = api::RicControlGrpcReq().New();
468   api::RicControlGrpcRsp response;
469
470   api::RICE2APHeader *apHeader = api::RICE2APHeader().New();
471   api::RICControlHeader *ctrlHeader = api::RICControlHeader().New();
472   api::RICControlMessage *ctrlMsg = api::RICControlMessage().New();
473
474   request->set_e2nodeid("e2nodeid");
475   request->set_plmnid("plmnid");
476   request->set_ranname("ranname");
477   request->set_allocated_rice2apheaderdata(apHeader);
478   request->set_allocated_riccontrolheaderdata(ctrlHeader);
479   request->set_allocated_riccontrolmessagedata(ctrlMsg);
480   request->set_riccontrolackreqval(api::RIC_CONTROL_ACK_UNKWON);  // not yet used in api.proto
481
482   grpc::Status status = rc_stub->SendRICControlReqServiceGrpc(&context, *request, &response);
483
484   if(status.ok()) {
485     /*
486       TODO check if this is related to RICControlAckEnum
487       if yes, then ACK value should be 2 (RIC_CONTROL_ACK)
488       api.proto assumes that 0 is an ACK
489     */
490     if(response.rspcode() == 0) {
491       cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
492     } else {
493       cout << "[ERROR] Control Request failed with code=" << response.rspcode()
494            << ", description=" << response.description() << endl;
495     }
496
497   } else {
498     cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
499          << status.error_code() << ", error_msg=" << status.error_message() << endl;
500   }
501
502   // FIXME needs to check about memory likeage
503 }
504
505 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
506
507   time_t now;
508   string str_now;
509   static unsigned int seq_number = 0; // static counter, not thread-safe
510
511   int response_to = 0;   // max timeout wating for a response
512
513   int send_mtype = 0;
514   int rmtype;                                                   // received message type
515   int delay = 1000000;          // mu-sec delay; default 1s
516
517   string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
518
519   cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
520   cout << "[INFO] Payload is " << json << endl;
521
522   PredictionHandler handler;
523   try {
524     Reader reader;
525     StringStream ss(json.c_str());
526     reader.Parse(ss,handler);
527   } catch (...) {
528     cout << "[ERROR] Got an exception on stringstream read parse\n";
529   }
530
531   // We are only considering download throughput
532   unordered_map<string, int> throughput_map = handler.cell_pred_down;
533
534   // Decision about CONTROL message
535   // (1) Identify UE Id in Prediction message
536   // (2) Iterate through Prediction message.
537   //     If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
538   //     We assume the first cell in the prediction message is the serving cell
539
540   int serving_cell_throughput = 0;
541   int highest_throughput = 0;
542   string highest_throughput_cell_id;
543
544   // Getting the current serving cell throughput prediction
545   auto cell = throughput_map.find( handler.serving_cell_id );
546   serving_cell_throughput = cell->second;
547
548    // Iterating to identify the highest throughput prediction
549   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
550
551     string curr_cellid = map_iter->first;
552     int curr_throughput = map_iter->second;
553
554     if ( highest_throughput < curr_throughput ) {
555       highest_throughput = curr_throughput;
556       highest_throughput_cell_id = curr_cellid;
557     }
558
559   }
560
561   if ( highest_throughput > serving_cell_throughput ) {
562     // building a handoff control message
563     now = time( nullptr );
564     str_now = ctime( &now );
565     str_now.pop_back(); // removing the \n character
566
567     seq_number++;       // static counter, not thread-safe
568
569     rapidjson::StringBuffer s;
570           rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
571     writer.StartObject();
572     writer.Key( "command" );
573     writer.String( "HandOff" );
574     writer.Key( "seqNo" );
575     writer.Int( seq_number );
576     writer.Key( "ue" );
577     writer.String( handler.ue_id.c_str() );
578     writer.Key( "fromCell" );
579     writer.String( handler.serving_cell_id.c_str() );
580     writer.Key( "toCell" );
581     writer.String( highest_throughput_cell_id.c_str() );
582     writer.Key( "timestamp" );
583     writer.String( str_now.c_str() );
584     writer.Key( "reason" );
585     writer.String( "HandOff Control Request from TS xApp" );
586     writer.Key( "ttl" );
587     writer.Int( 10 );
588     writer.EndObject();
589     // creates a message like
590     /* {
591       "command": "HandOff",
592       "seqNo": 1,
593       "ue": "ueid-here",
594       "fromCell": "CID1",
595       "toCell": "CID3",
596       "timestamp": "Sat May 22 10:35:33 2021",
597       "reason": "HandOff Control Request from TS xApp",
598       "ttl": 10
599     } */
600
601     // sending a control request message
602     if ( ts_control_api == TsControlApi::REST ) {
603       send_rest_control_request( s.GetString() );
604     } else {
605       send_grpc_control_request();
606     }
607
608   } else {
609     cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
610   }
611
612   // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" );       // validate that we can use the same buffer for 2 rts calls
613   // mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
614 }
615
616 void send_prediction_request( vector<string> ues_to_predict ) {
617
618   std::unique_ptr<Message> msg;
619   Msg_component payload;                                // special type of unique pointer to the payload
620
621   int sz;
622   int i;
623   size_t plen;
624   Msg_component send_payload;
625
626   msg = xfw->Alloc_msg( 2048 );
627
628   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
629   if( sz < 2048 ) {
630     fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
631     exit( 1 );
632   }
633
634   string ues_list = "[";
635
636   for (int i = 0; i < ues_to_predict.size(); i++) {
637     if (i == ues_to_predict.size() - 1) {
638       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
639     } else {
640       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
641     }
642   }
643
644   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
645
646   send_payload = msg->Get_payload(); // direct access to payload
647   snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
648
649   plen = strlen( (char *)send_payload.get() );
650
651   cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
652
653   // payload updated in place, nothing to copy from, so payload parm is nil
654   if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
655     fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
656   }
657
658 }
659
660 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
661  * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
662  * sends a prediction request to the QP Driver xApp.
663  */
664 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
665   string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
666
667   cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
668   cout << "[INFO] Payload is " << json << "\n";
669
670   AnomalyHandler handler;
671   Reader reader;
672   StringStream ss(json.c_str());
673   reader.Parse(ss,handler);
674
675   // just sending ACK to the AD xApp
676   mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
677
678   // TODO should we use the threshold received in the A1_POLICY_REQ message and compare with Degradation in TS_ANOMALY_UPDATE?
679   // if( handler.degradation < rsrp_threshold )
680   send_prediction_request(handler.prediction_ues);
681 }
682
683 extern int main( int argc, char** argv ) {
684
685   int nthreads = 1;
686   char* port = (char *) "4560";
687   shared_ptr<grpc::Channel> channel;
688
689   Config *config = new Config();
690   string api = config->Get_control_str("ts_control_api");
691   ts_control_ep = config->Get_control_str("ts_control_ep");
692   if ( api.empty() ) {
693     cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
694     exit(1);
695   }
696   if ( api.compare("rest") == 0 ) {
697     ts_control_api = TsControlApi::REST;
698   } else {
699     ts_control_api = TsControlApi::gRPC;
700   }
701
702   channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
703   rc_stub = api::MsgComm::NewStub(channel, grpc::StubOptions());
704
705   fprintf( stderr, "[TS xApp] listening on port %s\n", port );
706   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
707
708   xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL );          // msg type 20010
709   xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL );  // msg type 30002
710   xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
711
712   xfw->Run( nthreads );
713
714 }