/* Author : Ashwin Sridharan
A sample test client to demonstrate A1 functionality.
- Sends different kind of policy requests (valid/invalid) and prints out response
+ Sends different kind of policy requests (valid/invalid), create/update/delete and prints out response
*/
#include <limits>
#include <getopt.h>
#include <csignal>
#include <time.h>
+#include <mutex>
+#include <condition_variable>
+#include <chrono>
+#include <atomic>
#include <xapp_utils.hpp>
#include <vector>
#include <rmr/RIC_message_types.h>
+#define MAX_TIMEOUTS 2
+
std::string gNodeB = "";
+std::mutex notify_lock;
+std::condition_variable notify_var;
bool rcv_message(rmr_mbuf_t *message){
- std::string response;
switch(message->mtype){
- case DC_ADM_INT_CONTROL_ACK:
- std::cout <<"Received response = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << "Actual = " << message->len << std::endl;
+ case A1_POLICY_RESP:
+ {
+ std::lock_guard<std::mutex> lck(notify_lock);
+ std::cout <<"A1 Mediator received response = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << std::endl;
+ }
+ // released the lock. notify the sleep thread (if any)
+ notify_var.notify_all();
break;
- case DC_ADM_GET_POLICY_ACK:
- std::cout <<"Received Policy = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << "Actual = " << message->len << std::endl;
- break;
-
default:
std::cout <<"Unknown RMR message of type " << message->mtype << " received" << std::endl;
}
int main(int argc, char *argv[]){
+
char name[128] = "test_a1_client";
- char port[16] = "tcp:4560";
+ char port[16] = "tcp:9000";
unsigned int num_threads = 1;
std::unique_ptr<XaPP> my_xapp;
+ std::string schema_file;
+
+ enum OPERATIONS{CREATE, UPDATE, DELETE};
+ static const char * op_strings[] = {"CREATE", "UPDATE", "DELETE"};
+
+ OPERATIONS op = CREATE;
+ std::string instance_id = "ac-xapp-1";
+ int class_id = 5;
+ int enforce = 0;
+ int blocking_rate = 90; // percentage
+ int window_length = 60; // seconds
+ int trigger_threshold = 40;
+
+ std::chrono::seconds time_out(1);
// Parse command line options
static struct option long_options[] =
{
- /* Thse options require arguments */
- {"name", required_argument, 0, 'n'},
- {"port", required_argument, 0, 'p'},
-
- };
-
+ /* Thse options require arguments */
+ {"name", required_argument, 0, 'n'},
+ {"port", required_argument, 0, 'p'},
+ {"window", required_argument, 0, 'w'},
+ {"blockrate", required_argument, 0, 'b'},
+ {"trigger", required_argument, 0, 't'},
+ {"class", required_argument, 0, 'c'},
+ {"op", required_argument, 0, 'o'},
+ {"instance", required_argument, 0, 'i'},
+ {"enforce", no_argument, &enforce, 1},
- while(1) {
+ };
- int option_index = 0;
- char c = getopt_long(argc, argv, "n:p:", long_options, &option_index);
- if(c == -1){
- break;
- }
+ while(1) {
- switch(c)
- {
+ int option_index = 0;
+ char c = getopt_long(argc, argv, "n:p:w:b:t:c:o:i:", long_options, &option_index);
- case 0:
- /* An option flag was set.
- Do nothing for now */
- break;
-
- case 'n':
- strcpy(name, optarg);
- break;
+ if(c == -1){
+ break;
+ }
+
+ switch(c)
+ {
+
+ case 0:
+ /* An option flag was set.
+ Do nothing for now */
+ break;
- case 'p':
- strcpy(port, optarg);
- break;
+ case 'n':
+ strcpy(name, optarg);
+ break;
+ case 'p':
+ strcpy(port, optarg);
+ break;
- case 'h':
- usage(argv[0]);
- exit(0);
+ case 'w':
+ window_length = atoi(optarg);
+ break;
+
+ case 't':
+ trigger_threshold = atoi(optarg);
+ break;
+
+ case 'b':
+ blocking_rate = atof(optarg);
+ break;
+
+ case 'o':
+ op = static_cast<OPERATIONS>(atoi(optarg));
+ break;
+
+ case 'i':
+ instance_id.assign(optarg);
+ break;
+
+ case 'c':
+ class_id = atoi(optarg);
+ break;
+
+ case 'h':
+ usage(argv[0]);
+ exit(0);
- default:
- usage(argv[0]);
- exit(1);
- }
- };
+ default:
+ usage(argv[0]);
+ exit(1);
+ }
+ };
+
+ int log_level = MDCLOG_INFO;
+ init_logger(name, static_cast<mdclog_severity_t>(log_level));
+
+ mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", name);
+ mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", port);
init_logger(name, MDCLOG_INFO);
mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", port);
mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto");
- my_xapp = std::make_unique<XaPP>(name, port, 1024, 1);
+ my_xapp = std::make_unique<XaPP>(name, port, 16384);
// Start receiving loop ...
std::vector<int> thread_ids(num_threads);
for(unsigned int i = 0; i < num_threads; i++){
- thread_ids[i] = (*my_xapp).StartThread(&rcv_message, msg_error);
+ thread_ids[i] = (*my_xapp).StartThread(rcv_message, msg_error);
i++;
};
- bool enforce = true;
- int block_rate = 2;
- int window_length = 20;
- int trigger_threshold = 40;
+
char buffer[1024];
- while(1){
+ std::string message_string ;
+ std::stringstream policy;
+ std::stringstream msg;
+ bool res = false;
+ switch(op){
- // Send a valid config
- std::string message_string ;
- std::string start = "{";
- std::string end = "}";
-
- message_string = start + "\"enforce\":" + (enforce? "true":"false") + ",";
- message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ",";
- message_string += std::string("\"window_length\":") + std::to_string(window_length) + ",";
- message_string += std::string("\"trigger_threshold\":") + std::to_string(trigger_threshold) + end;
- memcpy(buffer, message_string.c_str(), message_string.length());
- my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer);
-
+ case CREATE:
+ case UPDATE:
+ policy <<"{ " << "\"enforce\":true, " << "\"window_length\":" << window_length << " , \"trigger_threshold\":" << trigger_threshold << ", \"blocking_rate\":" << blocking_rate << ", \"class\":" << class_id << " }" ;
- sleep(2);
+ // Send a create/update
+ msg << "{ " << "\"policy_type_id\":" << 21000 << "," << "\"policy_instance_id\":\"" << instance_id << "\", \"operation\":\"" << op_strings[op] << "\", \"payload\" :" << policy.str() << "}";
+ res = true;
+ break;
- // // Send an invalid config
- message_string = start + "\"enfce\":" + (enforce? "true":"false") + ",";
- message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ",";
- message_string += std::string("\"window_length\":") + std::to_string(window_length) + end;
- memcpy(buffer, message_string.c_str(), message_string.length());
- my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer);
- sleep(2);
+ case DELETE:
+ // send a delete
+ msg << "{ " << "\"policy_type_id\":" << 21000 << "," << "\"policy_instance_id\": \"" << instance_id << "\", \"operation\": \"" << op_strings[op] << "\" }";
+ res = true;
+ break;
- // Send invalid JSON
- message_string.assign("\"enforce\":false,");
- message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ",";
- message_string += std::string("\"window_length\":") + std::to_string(window_length) + end;
- memcpy(buffer, message_string.c_str(), message_string.length());
- my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer);
-
- sleep(2);
-
+ default:
+ std::cerr <<"Not yet supported " << std::endl;
- // Send request for policy
- // we don't care about contents of request for now ...
- std::cout <<"Sending request to get policy" << std::endl;
- my_xapp.get()->Send(DC_ADM_GET_POLICY, message_string.length(), buffer);
- sleep(2);
+ }
- window_length += 1;
-
+ if(res){
+ message_string = msg.str();
+ std::cout <<"Sending message = " << message_string << std::endl;
+ memcpy(buffer, message_string.c_str(), message_string.length());
+ my_xapp.get()->Send(A1_POLICY_REQ, message_string.length(), buffer, link_types::HIGH_RELIABILITY);
}
+
+ std::unique_lock<std::mutex> lck(notify_lock);
+ // release lock and got to sleep waiting to be notified
+ notify_var.wait_for(lck, std::chrono::seconds(5));
+
+ // finish
(*my_xapp).Stop();
}