e94c92ba814c8faa3d5800ab2206222a488b308c
[ric-app/admin.git] / test / mock_a1_mediator.cc
1 /*
2 ==================================================================================
3
4         Copyright (c) 2018-2019 AT&T Intellectual Property.
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 /* Author : Ashwin Sridharan
20
21    A sample test client to demonstrate A1 functionality.
22    Sends different kind of policy requests (valid/invalid), create/update/delete and prints out response 
23 */
24
25 #include <limits>
26 #include <map>
27 #include <getopt.h>
28 #include <csignal>
29 #include <time.h>
30 #include <mutex>
31 #include <condition_variable>
32 #include <chrono>
33 #include <atomic>
34 #include <xapp_utils.hpp>
35 #include <vector>
36 #include <rmr/RIC_message_types.h>
37
38 #define MAX_TIMEOUTS 2
39
40 std::string gNodeB = "";
41 std::mutex notify_lock;
42 std::condition_variable notify_var;
43
44 bool rcv_message(rmr_mbuf_t *message){
45   switch(message->mtype){
46   case A1_POLICY_RESP:
47     {
48       std::lock_guard<std::mutex> lck(notify_lock);
49       std::cout <<"A1 Mediator received response = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << std::endl;
50     }
51     // released the lock. notify the sleep thread (if any)
52     notify_var.notify_all();
53     break;
54
55   default:
56     std::cout <<"Unknown RMR message of type " << message->mtype << " received" << std::endl;
57   }
58   
59   return false;
60 }
61
62
63 void usage(char *command){
64     std::cout <<"Usage : " << command << " ";
65     std::cout <<" --name[-n] xapp_instance_name ";
66     std::cout <<" --port[-p] port to listen on (default is tcp:4561) ";
67     std::cout <<"--schema[-s] schema file";
68     
69     std::cout << std::endl;
70 }
71
72
73 void msg_error(rmr_mbuf_t *message){
74   mdclog_write(MDCLOG_ERR, "Error sending message of length %d and type %d, Reason %d",  message->len,  message->mtype, errno );
75 };
76
77
78 int main(int argc, char *argv[]){
79
80   
81   char name[128] = "test_a1_client";
82   char port[16] = "tcp:9000";
83   unsigned int num_threads = 1;
84   std::unique_ptr<XaPP> my_xapp;
85   std::string schema_file;
86
87   enum OPERATIONS{CREATE, UPDATE, DELETE};
88   static const char * op_strings[] = {"CREATE", "UPDATE", "DELETE"};
89   
90   OPERATIONS op = CREATE;
91   std::string instance_id = "ac-xapp-1";
92   int class_id = 5;
93   int enforce = 0;
94   int blocking_rate = 90; // percentage
95   int window_length = 60; // seconds
96   int trigger_threshold = 40;
97
98   std::chrono::seconds time_out(1);
99   
100   // Parse command line options
101   static struct option long_options[] = 
102     {
103
104      /* Thse options require arguments */
105      {"name", required_argument, 0, 'n'},
106      {"port", required_argument, 0, 'p'},
107      {"window", required_argument, 0, 'w'},
108      {"blockrate", required_argument, 0, 'b'},
109      {"trigger", required_argument, 0, 't'},
110      {"class", required_argument, 0, 'c'},
111      {"op", required_argument, 0, 'o'},
112      {"instance", required_argument, 0, 'i'},
113      {"enforce", no_argument, &enforce, 1},
114
115     };
116
117
118   while(1) {
119
120     int option_index = 0;
121     char c = getopt_long(argc, argv, "n:p:w:b:t:c:o:i:", long_options, &option_index);
122
123     if(c == -1){
124       break;
125     }
126     
127     switch(c)
128       {
129        
130       case 0:
131         /* An option flag was set. 
132            Do nothing for now */
133         break;
134           
135       case 'n':
136         strcpy(name, optarg);
137         break;
138           
139       case 'p':
140         strcpy(port, optarg);
141         break;
142           
143       case 'w':
144         window_length = atoi(optarg);
145         break;
146
147       case 't':
148         trigger_threshold = atoi(optarg);
149         break;
150
151       case 'b':
152         blocking_rate = atof(optarg);
153         break;
154
155       case 'o':
156         op = static_cast<OPERATIONS>(atoi(optarg));
157         break;
158
159       case 'i':
160         instance_id.assign(optarg);
161         break;
162
163       case 'c':
164         class_id = atoi(optarg);
165         break;
166         
167       case 'h':
168         usage(argv[0]);
169         exit(0);
170           
171       default:
172         usage(argv[0]);
173         exit(1);
174       }
175   };
176
177   int log_level = MDCLOG_INFO;
178   init_logger(name, static_cast<mdclog_severity_t>(log_level));
179  
180   mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", name);
181   mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", port);
182
183    init_logger(name, MDCLOG_INFO);
184    
185    mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", name);
186    mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", port);
187
188    mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
189    my_xapp = std::make_unique<XaPP>(name, port, 16384);
190    
191    
192    // Start receiving loop ...
193    std::vector<int> thread_ids(num_threads);
194    for(unsigned int i = 0; i < num_threads; i++){
195      thread_ids[i] = (*my_xapp).StartThread(rcv_message, msg_error);
196      i++;
197    };
198    
199
200    char buffer[1024];
201    std::string message_string ;
202    std::stringstream policy;
203    std::stringstream msg;
204    bool res = false;
205    switch(op){
206      
207    case CREATE:
208    case UPDATE:
209      policy <<"{ " << "\"enforce\":true, " << "\"window_length\":" << window_length << " , \"trigger_threshold\":" << trigger_threshold << ",  \"blocking_rate\":" << blocking_rate << ", \"class\":" << class_id << " }" ;
210      
211      // Send a create/update
212      msg << "{ " << "\"policy_type_id\":" << 21000 << "," << "\"policy_instance_id\":\"" << instance_id << "\", \"operation\":\"" << op_strings[op]  << "\", \"payload\" :" << policy.str() << "}";
213      res = true;
214      break;
215      
216    case DELETE:
217      // send a delete 
218      msg << "{ " << "\"policy_type_id\":" << 21000 << "," << "\"policy_instance_id\": \"" << instance_id << "\", \"operation\": \"" << op_strings[op]  <<  "\" }";
219      res = true;
220      break;
221      
222    default:
223      std::cerr <<"Not yet supported " << std::endl;
224
225    }
226
227    if(res){
228      message_string = msg.str();
229      std::cout <<"Sending message = " << message_string << std::endl;
230      memcpy(buffer, message_string.c_str(), message_string.length());
231      my_xapp.get()->Send(A1_POLICY_REQ,  message_string.length(),  buffer, link_types::HIGH_RELIABILITY);
232    }
233
234    std::unique_lock<std::mutex> lck(notify_lock);
235    
236    // release lock and got to sleep waiting to be notified
237    notify_var.wait_for(lck,  std::chrono::seconds(5));
238      
239    // finish 
240    (*my_xapp).Stop();  
241    
242 }