02a88fef3e55be8d8eb993e0377171fec414c9e5
[ric-app/ts.git] / src / ts_xapp / ts_xapp.cpp
1 // vi: ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2020 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 /*
21         Mnemonic:       ts_xapp.cpp
22         Abstract:       Traffic Steering xApp
23                    1. Receives A1 Policy
24                                2. Receives anomaly detection
25                                3. Requests prediction for UE throughput on current and neighbor cells
26                                4. Receives prediction
27                                5. Optionally exercises Traffic Steering action over E2
28
29         Date:     22 April 2020
30         Author:         Ron Shacham
31
32   Modified: 21 May 2021 (Alexandre Huff)
33             Update for traffic steering use case in release D.
34             07 Dec 2021 (Alexandre Huff)
35             Update for traffic steering use case in release E.
36 */
37
38 #include <stdio.h>
39 #include <string.h>
40 #include <unistd.h>
41
42 #include <thread>
43 #include <iostream>
44 #include <memory>
45
46 #include <set>
47 #include <map>
48 #include <vector>
49 #include <string>
50 #include <unordered_map>
51
52 #include <rapidjson/document.h>
53 #include <rapidjson/writer.h>
54 #include <rapidjson/stringbuffer.h>
55 #include <rapidjson/schema.h>
56 #include <rapidjson/reader.h>
57 #include <rapidjson/prettywriter.h>
58
59 #include <rmr/RIC_message_types.h>
60 #include <ricxfcpp/xapp.hpp>
61 #include <ricxfcpp/config.hpp>
62
63 /*
64   FIXME unfortunately this RMR flag has to be disabled
65   due to name resolution conflicts.
66   RC xApp defines the same name for gRPC control messages.
67 */
68 #undef RIC_CONTROL_ACK
69
70 #include <grpc/grpc.h>
71 #include <grpcpp/channel.h>
72 #include <grpcpp/client_context.h>
73 #include <grpcpp/create_channel.h>
74 #include <grpcpp/security/credentials.h>
75 #include "protobuf/rc.grpc.pb.h"
76
77 #include "utils/restclient.hpp"
78
79
80 using namespace rapidjson;
81 using namespace std;
82 using namespace xapp;
83
84 using Namespace = std::string;
85 using Key = std::string;
86 using Data = std::vector<uint8_t>;
87 using DataMap = std::map<Key, Data>;
88 using Keys = std::set<Key>;
89
90
91 // ----------------------------------------------------------
92 std::unique_ptr<Xapp> xfw;
93 std::unique_ptr<rc::MsgComm::Stub> rc_stub;
94
95 int downlink_threshold = 0;  // A1 policy type 20008 (in percentage)
96
97 // scoped enum to identify which API is used to send control messages
98 enum class TsControlApi { REST, gRPC };
99 TsControlApi ts_control_api;  // api to send control messages
100 string ts_control_ep;         // api target endpoint
101
102 typedef struct nodeb {
103   string ran_name;
104   struct {
105     string plmn_id;
106     string nb_id;
107   } global_nb_id;
108 } nodeb_t;
109
110 unordered_map<string, shared_ptr<nodeb_t>> cell_map; // maps each cell to its nodeb
111
112 /* struct UEData {
113   string serving_cell;
114   int serving_cell_rsrp;
115 }; */
116
117 struct PolicyHandler : public BaseReaderHandler<UTF8<>, PolicyHandler> {
118   /*
119     Assuming we receive the following payload from A1 Mediator
120     {"operation": "CREATE", "policy_type_id": 20008, "policy_instance_id": "tsapolicy145", "payload": {"threshold": 5}}
121   */
122   unordered_map<string, string> cell_pred;
123   std::string ue_id;
124   bool ue_id_found = false;
125   string curr_key = "";
126   string curr_value = "";
127   int policy_type_id;
128   int policy_instance_id;
129   int threshold;
130   std::string operation;
131   bool found_threshold = false;
132
133   bool Null() { return true; }
134   bool Bool(bool b) { return true; }
135   bool Int(int i) {
136
137     if (curr_key.compare("policy_type_id") == 0) {
138       policy_type_id = i;
139     } else if (curr_key.compare("policy_instance_id") == 0) {
140       policy_instance_id = i;
141     } else if (curr_key.compare("threshold") == 0) {
142       found_threshold = true;
143       threshold = i;
144     }
145
146     return true;
147   }
148   bool Uint(unsigned u) {
149
150     if (curr_key.compare("policy_type_id") == 0) {
151       policy_type_id = u;
152     } else if (curr_key.compare("policy_instance_id") == 0) {
153       policy_instance_id = u;
154     } else if (curr_key.compare("threshold") == 0) {
155       found_threshold = true;
156       threshold = u;
157     }
158
159     return true;
160   }
161   bool Int64(int64_t i) {  return true; }
162   bool Uint64(uint64_t u) {  return true; }
163   bool Double(double d) {  return true; }
164   bool String(const char* str, SizeType length, bool copy) {
165
166     if (curr_key.compare("operation") != 0) {
167       operation = str;
168     }
169
170     return true;
171   }
172   bool StartObject() {
173
174     return true;
175   }
176   bool Key(const char* str, SizeType length, bool copy) {
177
178     curr_key = str;
179
180     return true;
181   }
182   bool EndObject(SizeType memberCount) {  return true; }
183   bool StartArray() {  return true; }
184   bool EndArray(SizeType elementCount) {  return true; }
185
186 };
187
188 struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
189   unordered_map<string, int> cell_pred_down;
190   unordered_map<string, int> cell_pred_up;
191   std::string ue_id;
192   bool ue_id_found = false;
193   string curr_key = "";
194   string curr_value = "";
195   string serving_cell_id;
196   bool down_val = true;
197   bool Null() {  return true; }
198   bool Bool(bool b) {  return true; }
199   bool Int(int i) {  return true; }
200   bool Uint(unsigned u) {
201     // Currently, we assume the first cell in the prediction message is the serving cell
202     if ( serving_cell_id.empty() ) {
203       serving_cell_id = curr_key;
204     }
205
206     if (down_val) {
207       cell_pred_down[curr_key] = u;
208       down_val = false;
209     } else {
210       cell_pred_up[curr_key] = u;
211       down_val = true;
212     }
213
214     return true;
215
216   }
217   bool Int64(int64_t i) {  return true; }
218   bool Uint64(uint64_t u) {  return true; }
219   bool Double(double d) {  return true; }
220   bool String(const char* str, SizeType length, bool copy) {
221
222     return true;
223   }
224   bool StartObject() {  return true; }
225   bool Key(const char* str, SizeType length, bool copy) {
226     if (!ue_id_found) {
227
228       ue_id = str;
229       ue_id_found = true;
230     } else {
231       curr_key = str;
232     }
233     return true;
234   }
235   bool EndObject(SizeType memberCount) {  return true; }
236   bool StartArray() {  return true; }
237   bool EndArray(SizeType elementCount) {  return true; }
238 };
239
240 struct AnomalyHandler : public BaseReaderHandler<UTF8<>, AnomalyHandler> {
241   /*
242     Assuming we receive the following payload from AD
243     [{"du-id": 1010, "ue-id": "Train passenger 2", "measTimeStampRf": 1620835470108, "Degradation": "RSRP RSSINR"}]
244   */
245   vector<string> prediction_ues;
246   string curr_key = "";
247
248   bool Key(const Ch* str, SizeType len, bool copy) {
249     curr_key = str;
250     return true;
251   }
252
253   bool String(const Ch* str, SizeType len, bool copy) {
254     // We are only interested in the "ue-id"
255     if ( curr_key.compare( "ue-id") == 0 ) {
256       prediction_ues.push_back( str );
257     }
258     return true;
259   }
260 };
261
262 struct NodebListHandler : public BaseReaderHandler<UTF8<>, NodebListHandler> {
263   vector<string> nodeb_list;
264   string curr_key = "";
265
266   bool Key(const Ch* str, SizeType length, bool copy) {
267     curr_key = str;
268     return true;
269   }
270
271   bool String(const Ch* str, SizeType length, bool copy) {
272     if( curr_key.compare( "inventoryName" ) == 0 ) {
273       nodeb_list.push_back( str );
274     }
275     return true;
276   }
277 };
278
279 struct NodebHandler : public BaseReaderHandler<UTF8<>, NodebHandler> {
280   string curr_key = "";
281   shared_ptr<nodeb_t> nodeb = make_shared<nodeb_t>();
282
283   bool Key(const Ch* str, SizeType length, bool copy) {
284     curr_key = str;
285     return true;
286   }
287
288   bool String(const Ch* str, SizeType length, bool copy) {
289     if( curr_key.compare( "ranName" ) == 0 ) {
290       nodeb->ran_name = str;
291     } else if( curr_key.compare( "plmnId" ) == 0 ) {
292       nodeb->global_nb_id.plmn_id = str;
293     } else if( curr_key.compare( "nbId" ) == 0 ) {
294       nodeb->global_nb_id.nb_id = str;
295     } else if( curr_key.compare( "cellId" ) == 0 ) {
296       cell_map[str] = nodeb;
297     }
298     return true;
299   }
300
301 };
302
303
304 /* struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
305   unordered_map<string, string> cell_pred;
306   std::string serving_cell_id;
307   int serving_cell_rsrp;
308   int serving_cell_rsrq;
309   int serving_cell_sinr;
310   bool in_serving_array = false;
311   int rf_meas_index = 0;
312
313   bool in_serving_report_object = false;
314
315   string curr_key = "";
316   string curr_value = "";
317   bool Null() { return true; }
318   bool Bool(bool b) { return true; }
319   bool Int(int i) {
320
321     return true;
322   }
323
324   bool Uint(unsigned i) {
325
326     if (in_serving_report_object) {
327       if (curr_key.compare("rsrp") == 0) {
328         serving_cell_rsrp = i;
329       } else if (curr_key.compare("rsrq") == 0) {
330         serving_cell_rsrq = i;
331       } else if (curr_key.compare("rssinr") == 0) {
332         serving_cell_sinr = i;
333       }
334     }
335
336     return true; }
337   bool Int64(int64_t i) {
338
339     return true; }
340   bool Uint64(uint64_t i) {
341
342     return true; }
343   bool Double(double d) { return true; }
344   bool String(const char* str, SizeType length, bool copy) {
345
346     if (curr_key.compare("ServingCellID") == 0) {
347       serving_cell_id = str;
348     }
349
350     return true;
351   }
352   bool StartObject() {
353     if (curr_key.compare("ServingCellRF") == 0) {
354       in_serving_report_object = true;
355     }
356
357     return true; }
358   bool Key(const char* str, SizeType length, bool copy) {
359
360     curr_key = str;
361     return true;
362   }
363   bool EndObject(SizeType memberCount) {
364     if (curr_key.compare("ServingCellRF") == 0) {
365       in_serving_report_object = false;
366     }
367     return true; }
368   bool StartArray() {
369
370     if (curr_key.compare("ServingCellRF") == 0) {
371       in_serving_array = true;
372     }
373
374     return true;
375   }
376   bool EndArray(SizeType elementCount) {
377
378     if (curr_key.compare("servingCellRF") == 0) {
379       in_serving_array = false;
380       rf_meas_index = 0;
381     }
382
383     return true; }
384 }; */
385
386
387 /* unordered_map<string, UEData> get_sdl_ue_data() {
388
389   fprintf(stderr, "In get_sdl_ue_data()\n");
390
391   unordered_map<string, string> ue_data;
392
393   unordered_map<string, UEData> return_ue_data_map;
394
395   std::string prefix3="";
396   Keys K2 = sdl->findKeys(nsu, prefix3);
397   DataMap Dk2 = sdl->get(nsu, K2);
398
399   string ue_json;
400   string ue_id;
401
402   for(auto si=K2.begin();si!=K2.end();++si){
403     std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
404     char val[val_v.size()+1];                               // from Data
405     int i;
406
407     for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
408     val[i]='\0';
409       ue_id.assign((std::string)*si);
410
411       ue_json.assign(val);
412       ue_data[ue_id] =  ue_json;
413   }
414
415   for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
416     UEDataHandler handler;
417     Reader reader;
418     StringStream ss(map_iter->second.c_str());
419     reader.Parse(ss,handler);
420
421     string ueID = map_iter->first;
422     string serving_cell_id = handler.serving_cell_id;
423     int serv_rsrp = handler.serving_cell_rsrp;
424
425     return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
426
427   }
428
429   return return_ue_data_map;
430 } */
431
432 void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
433   string arg ((const char*)payload.get(), len); // RMR payload might not have a nil terminanted char
434
435   cout << "[INFO] Policy Callback got a message, type=" << mtype << ", length=" << len << "\n";
436   cout << "[INFO] Payload is " << arg << endl;
437
438   PolicyHandler handler;
439   Reader reader;
440   StringStream ss(arg.c_str());
441   reader.Parse(ss,handler);
442
443   //Set the threshold value
444   if (handler.found_threshold) {
445     cout << "[INFO] Setting Threshold for A1-P value: " << handler.threshold << "%\n";
446     downlink_threshold = handler.threshold;
447   }
448
449 }
450
451 // sends a handover message through REST
452 void send_rest_control_request( string ue_id, string serving_cell_id, string target_cell_id ) {
453   time_t now;
454   string str_now;
455   static unsigned int seq_number = 0; // static counter, not thread-safe
456
457   // building a handoff control message
458   now = time( nullptr );
459   str_now = ctime( &now );
460   str_now.pop_back(); // removing the \n character
461
462   seq_number++;       // static counter, not thread-safe
463
464   rapidjson::StringBuffer s;
465   rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
466   writer.StartObject();
467   writer.Key( "command" );
468   writer.String( "HandOff" );
469   writer.Key( "seqNo" );
470   writer.Int( seq_number );
471   writer.Key( "ue" );
472   writer.String( ue_id.c_str() );
473   writer.Key( "fromCell" );
474   writer.String( serving_cell_id.c_str() );
475   writer.Key( "toCell" );
476   writer.String( target_cell_id.c_str() );
477   writer.Key( "timestamp" );
478   writer.String( str_now.c_str() );
479   writer.Key( "reason" );
480   writer.String( "HandOff Control Request from TS xApp" );
481   writer.Key( "ttl" );
482   writer.Int( 10 );
483   writer.EndObject();
484   // creates a message like
485   /* {
486     "command": "HandOff",
487     "seqNo": 1,
488     "ue": "ueid-here",
489     "fromCell": "CID1",
490     "toCell": "CID3",
491     "timestamp": "Sat May 22 10:35:33 2021",
492     "reason": "HandOff Control Request from TS xApp",
493     "ttl": 10
494   } */
495
496   string msg = s.GetString();
497
498   cout << "[INFO] Sending a HandOff CONTROL message to \"" << ts_control_ep << "\"\n";
499   cout << "[INFO] HandOff request is " << msg << endl;
500
501   try {
502     // sending request
503     restclient::RestClient client( ts_control_ep );
504     restclient::response_t resp = client.do_post( "", msg ); // we already have the full path in ts_control_ep
505
506     if( resp.status_code == 200 ) {
507         // ============== DO SOMETHING USEFUL HERE ===============
508         // Currently, we only print out the HandOff reply
509         rapidjson::Document document;
510         document.Parse( resp.body.c_str() );
511         rapidjson::StringBuffer s;
512         rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
513         document.Accept( writer );
514         cout << "[INFO] HandOff reply is " << s.GetString() << endl;
515
516     } else {
517         cout << "[ERROR] Unexpected HTTP code " << resp.status_code << " from " << \
518                 client.getBaseUrl() << \
519                 "\n[ERROR] HTTP payload is " << resp.body.c_str() << endl;
520     }
521
522   } catch( const restclient::RestClientException &e ) {
523     cout << "[ERROR] " << e.what() << endl;
524
525   }
526
527 }
528
529 // sends a handover message to RC xApp through gRPC
530 void send_grpc_control_request( string ue_id, string target_cell_id ) {
531   grpc::ClientContext context;
532
533   rc::RicControlGrpcRsp response;
534   shared_ptr<rc::RicControlGrpcReq> request = make_shared<rc::RicControlGrpcReq>();
535
536   rc::RICE2APHeader *apHeader = request->mutable_rice2apheaderdata();
537   apHeader->set_ranfuncid( 300 );
538   apHeader->set_ricrequestorid( 1001 );
539
540   rc::RICControlHeader *ctrlHeader = request->mutable_riccontrolheaderdata();
541   ctrlHeader->set_controlstyle( 3 );
542   ctrlHeader->set_controlactionid( 1 );
543   ctrlHeader->set_ueid( ue_id );
544
545   rc::RICControlMessage *ctrlMsg = request->mutable_riccontrolmessagedata();
546   ctrlMsg->set_riccontrolcelltypeval( rc::RIC_CONTROL_CELL_UNKWON );
547   ctrlMsg->set_targetcellid( target_cell_id );
548
549   auto data = cell_map.find( target_cell_id );
550   if( data != cell_map.end() ) {
551     request->set_e2nodeid( data->second->global_nb_id.nb_id );
552     request->set_plmnid( data->second->global_nb_id.plmn_id );
553     request->set_ranname( data->second->ran_name );
554   } else {
555     request->set_e2nodeid( "unknown_e2nodeid" );
556     request->set_plmnid( "unknown_plmnid" );
557     request->set_ranname( "unknown_ranname" );
558   }
559   request->set_riccontrolackreqval( rc::RIC_CONTROL_ACK_UNKWON );  // not yet used in api.proto
560
561   cout << "[INFO] Sending gRPC control request to " << ts_control_ep << "\n" << request->DebugString();
562
563   grpc::Status status = rc_stub->SendRICControlReqServiceGrpc( &context, *request, &response );
564
565   if( status.ok() ) {
566     if( response.rspcode() == 0 ) {
567       cout << "[INFO] Control Request succeeded with code=0, description=" << response.description() << endl;
568     } else {
569       cout << "[ERROR] Control Request failed with code=" << response.rspcode()
570            << ", description=" << response.description() << endl;
571     }
572
573   } else {
574     cout << "[ERROR] failed to send a RIC Control Request message to RC xApp, error_code="
575          << status.error_code() << ", error_msg=" << status.error_message() << endl;
576   }
577
578 }
579
580 void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload,  void* data ) {
581   string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
582
583   cout << "[INFO] Prediction Callback got a message, type=" << mtype << ", length=" << len << "\n";
584   cout << "[INFO] Payload is " << json << endl;
585
586   PredictionHandler handler;
587   try {
588     Reader reader;
589     StringStream ss(json.c_str());
590     reader.Parse(ss,handler);
591   } catch (...) {
592     cout << "[ERROR] Got an exception on stringstream read parse\n";
593   }
594
595   // We are only considering download throughput
596   unordered_map<string, int> throughput_map = handler.cell_pred_down;
597
598   // Decision about CONTROL message
599   // (1) Identify UE Id in Prediction message
600   // (2) Iterate through Prediction message.
601   //     If one of the cells has a higher throughput prediction than serving cell, send a CONTROL request
602   //     We assume the first cell in the prediction message is the serving cell
603
604   int serving_cell_throughput = 0;
605   int highest_throughput = 0;
606   string highest_throughput_cell_id;
607
608   // Getting the current serving cell throughput prediction
609   auto cell = throughput_map.find( handler.serving_cell_id );
610   serving_cell_throughput = cell->second;
611
612    // Iterating to identify the highest throughput prediction
613   for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
614
615     string curr_cellid = map_iter->first;
616     int curr_throughput = map_iter->second;
617
618     if ( highest_throughput < curr_throughput ) {
619       highest_throughput = curr_throughput;
620       highest_throughput_cell_id = curr_cellid;
621     }
622
623   }
624
625   float thresh = 0;
626   if( downlink_threshold > 0 ) {  // we also take into account the threshold in A1 policy type 20008
627     thresh = serving_cell_throughput * (downlink_threshold / 100.0);
628   }
629
630   if ( highest_throughput > ( serving_cell_throughput + thresh ) ) {
631
632     // sending a control request message
633     if ( ts_control_api == TsControlApi::REST ) {
634       send_rest_control_request( handler.ue_id, handler.serving_cell_id, highest_throughput_cell_id );
635     } else {
636       send_grpc_control_request( handler.ue_id, highest_throughput_cell_id );
637     }
638
639   } else {
640     cout << "[INFO] The current serving cell \"" << handler.serving_cell_id << "\" is the best one" << endl;
641   }
642
643 }
644
645 void send_prediction_request( vector<string> ues_to_predict ) {
646   std::unique_ptr<Message> msg;
647   Msg_component payload;           // special type of unique pointer to the payload
648
649   int sz;
650   int i;
651   size_t plen;
652   Msg_component send_payload;
653
654   msg = xfw->Alloc_msg( 2048 );
655
656   sz = msg->Get_available_size();  // we'll reuse a message if we received one back; ensure it's big enough
657   if( sz < 2048 ) {
658     fprintf( stderr, "[ERROR] message returned did not have enough size: %d [%d]\n", sz, i );
659     exit( 1 );
660   }
661
662   string ues_list = "[";
663
664   for (int i = 0; i < ues_to_predict.size(); i++) {
665     if (i == ues_to_predict.size() - 1) {
666       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"]";
667     } else {
668       ues_list = ues_list + "\"" + ues_to_predict.at(i) + "\"" + ",";
669     }
670   }
671
672   string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
673
674   send_payload = msg->Get_payload(); // direct access to payload
675   snprintf( (char *) send_payload.get(), 2048, "%s", message_body.c_str() );
676
677   plen = strlen( (char *)send_payload.get() );
678
679   cout << "[INFO] Prediction Request length=" << plen << ", payload=" << send_payload.get() << endl;
680
681   // payload updated in place, nothing to copy from, so payload parm is nil
682   if ( ! msg->Send_msg( TS_UE_LIST, Message::NO_SUBID, plen, NULL )) { // msg type 30000
683     fprintf( stderr, "[ERROR] send failed: %d\n", msg->Get_state() );
684   }
685
686 }
687
688 /* This function works with Anomaly Detection(AD) xApp. It is invoked when anomalous UEs are send by AD xApp.
689  * It parses the payload received from AD xApp, sends an ACK with same UEID as payload to AD xApp, and
690  * sends a prediction request to the QP Driver xApp.
691  */
692 void ad_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
693   string json ((char *)payload.get(), len); // RMR payload might not have a nil terminanted char
694
695   cout << "[INFO] AD Callback got a message, type=" << mtype << ", length=" << len << "\n";
696   cout << "[INFO] Payload is " << json << "\n";
697
698   AnomalyHandler handler;
699   Reader reader;
700   StringStream ss(json.c_str());
701   reader.Parse(ss,handler);
702
703   // just sending ACK to the AD xApp
704   mbuf.Send_response( TS_ANOMALY_ACK, Message::NO_SUBID, len, nullptr );  // msg type 30004
705
706   send_prediction_request(handler.prediction_ues);
707 }
708
709 vector<string> get_nodeb_list( restclient::RestClient& client ) {
710
711   restclient::response_t response = client.do_get( "/v1/nodeb/states" );
712
713   NodebListHandler handler;
714   if( response.status_code == 200 ) {
715     Reader reader;
716     StringStream ss( response.body.c_str() );
717     reader.Parse( ss, handler );
718
719     cout << "[INFO] nodeb list is " << response.body.c_str() << endl;
720
721   } else {
722     if( response.body.empty() ) {
723       cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() << endl;
724     } else {
725       cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << client.getBaseUrl() <<
726               ". HTTP payload is " << response.body.c_str() << endl;
727     }
728   }
729
730   return handler.nodeb_list;
731 }
732
733 bool build_cell_mapping() {
734   string base_url;
735   char *data = getenv( "SERVICE_E2MGR_HTTP_BASE_URL" );
736   if ( data == NULL ) {
737     base_url = "http://service-ricplt-e2mgr-http.ricplt:3800";
738   } else {
739     base_url = string( data );
740   }
741
742   try {
743     restclient::RestClient client( base_url );
744
745     vector<string> nb_list = get_nodeb_list( client );
746
747     for( string nb : nb_list ) {
748       string full_path = string("/v1/nodeb/") + nb;
749       restclient::response_t response = client.do_get( full_path );
750       if( response.status_code != 200 ) {
751         if( response.body.empty() ) {
752           cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
753                   client.getBaseUrl() + full_path << endl;
754         } else {
755           cout << "[ERROR] Unexpected HTTP code " << response.status_code << " from " << \
756                 client.getBaseUrl() + full_path << ". HTTP payload is " << response.body.c_str() << endl;
757         }
758         return false;
759       }
760
761       try {
762         NodebHandler handler;
763         Reader reader;
764         StringStream ss( response.body.c_str() );
765         reader.Parse( ss, handler );
766       } catch (...) {
767         cout << "[ERROR] Got an exception on parsing nodeb (stringstream read parse)\n";
768         return false;
769       }
770     }
771
772   } catch( const restclient::RestClientException &e ) {
773     cout << "[ERROR] " << e.what() << endl;
774     return false;
775   }
776
777   return true;
778 }
779
780 extern int main( int argc, char** argv ) {
781   int nthreads = 1;
782   char* port = (char *) "4560";
783   shared_ptr<grpc::Channel> channel;
784
785   Config *config = new Config();
786   string api = config->Get_control_str("ts_control_api");
787   ts_control_ep = config->Get_control_str("ts_control_ep");
788   if ( api.empty() ) {
789     cout << "[ERROR] a control api (rest/grpc) is required in xApp descriptor\n";
790     exit(1);
791   }
792   if ( api.compare("rest") == 0 ) {
793     ts_control_api = TsControlApi::REST;
794   } else {
795     ts_control_api = TsControlApi::gRPC;
796
797     if( !build_cell_mapping() ) {
798       cout << "[ERROR] unable to map cells to nodeb\n";
799     }
800
801     channel = grpc::CreateChannel(ts_control_ep, grpc::InsecureChannelCredentials());
802     rc_stub = rc::MsgComm::NewStub(channel, grpc::StubOptions());
803   }
804
805   fprintf( stderr, "[INFO] listening on port %s\n", port );
806   xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) );
807
808   xfw->Add_msg_cb( A1_POLICY_REQ, policy_callback, NULL );          // msg type 20010
809   xfw->Add_msg_cb( TS_QOE_PREDICTION, prediction_callback, NULL );  // msg type 30002
810   xfw->Add_msg_cb( TS_ANOMALY_UPDATE, ad_callback, NULL ); /*Register a callback function for msg type 30003*/
811
812   xfw->Run( nthreads );
813
814 }