X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fmock_a1_mediator.cc;h=12f943ea881873039193d81033be497860e9a74d;hb=0a168f272a81ac4c3afe42e014f8032f2a159d97;hp=4eae5df47d309f2712fda4dd398065d4afb9dcf9;hpb=dc68642f5676cdd49f3ef92f983b319e21f16afa;p=ric-app%2Fadmin.git diff --git a/test/mock_a1_mediator.cc b/test/mock_a1_mediator.cc index 4eae5df..12f943e 100644 --- a/test/mock_a1_mediator.cc +++ b/test/mock_a1_mediator.cc @@ -19,7 +19,7 @@ /* Author : Ashwin Sridharan A sample test client to demonstrate A1 functionality. - Sends different kind of policy requests (valid/invalid) and prints out response + Sends different kind of policy requests (valid/invalid), create/update/delete and prints out response */ #include @@ -27,23 +27,31 @@ #include #include #include +#include +#include +#include +#include #include #include #include +#define MAX_TIMEOUTS 2 + std::string gNodeB = ""; +std::mutex notify_lock; +std::condition_variable notify_var; bool rcv_message(rmr_mbuf_t *message){ - std::string response; switch(message->mtype){ - case DC_ADM_INT_CONTROL_ACK: - std::cout <<"Received response = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << "Actual = " << message->len << std::endl; + case A1_POLICY_RESP: + { + std::lock_guard lck(notify_lock); + std::cout <<"A1 Mediator received response = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << std::endl; + } + // released the lock. notify the sleep thread (if any) + notify_var.notify_all(); break; - case DC_ADM_GET_POLICY_ACK: - std::cout <<"Received Policy = " << (char *)message->payload << " of len = " << strlen((char *)message->payload) << "Actual = " << message->len << std::endl; - break; - default: std::cout <<"Unknown RMR message of type " << message->mtype << " received" << std::endl; } @@ -53,11 +61,16 @@ bool rcv_message(rmr_mbuf_t *message){ void usage(char *command){ - std::cout <<"Usage : " << command << " "; - std::cout <<" --name[-n] xapp_instance_name "; - std::cout <<" --port[-p] port to listen on (default is tcp:4561) "; - std::cout <<"--schema[-s] schema file"; - + std::cout <<"Usage : " << command << " " << std::endl; + std::cout <<" --name[-n] xapp_instance_name " << std::endl; + std::cout <<" --port[-p] port to listen on (default is tcp:4561) " << std::endl; + std::cout << " --op[-o] operation mode from {0, 1, 2} : 0 is CREATE, 1 is UPDATE, 2 is DELETE" << std::endl; + std::cout << " --window [-w] window size in seconds (default is 60 seconds)" << std::endl; + std::cout << " --blockrate [-b] blocking rate percentage (default is 90%)" << std::endl; + std::cout << " --trigger[-t] trigger threshold (default is 40 requests in window)" << std::endl; + std::cout << " --enforce set policy to enforce if flag prvoided (default is 0 i.e not enforce)" << std::endl; + std::cout << " --class [-c] subscriber profile id to which policy must be applied (default is 5)" << std::endl; + std::cout << " --instance[-i] policy instance id (default is ac-xapp-1)" << std::endl; std::cout << std::endl; } @@ -69,57 +82,108 @@ void msg_error(rmr_mbuf_t *message){ int main(int argc, char *argv[]){ + char name[128] = "test_a1_client"; - char port[16] = "tcp:4560"; + char port[16] = "tcp:9000"; unsigned int num_threads = 1; std::unique_ptr my_xapp; + std::string schema_file; + + enum OPERATIONS{CREATE, UPDATE, DELETE}; + static const char * op_strings[] = {"CREATE", "UPDATE", "DELETE"}; + + OPERATIONS op = CREATE; + std::string instance_id = "ac-xapp-1"; + int class_id = 5; + int enforce = 0; + int blocking_rate = 90; // percentage + int window_length = 60; // seconds + int trigger_threshold = 40; + + std::chrono::seconds time_out(1); // Parse command line options static struct option long_options[] = { - /* Thse options require arguments */ - {"name", required_argument, 0, 'n'}, - {"port", required_argument, 0, 'p'}, - - }; + /* Thse options require arguments */ + {"name", required_argument, 0, 'n'}, + {"port", required_argument, 0, 'p'}, + {"window", required_argument, 0, 'w'}, + {"blockrate", required_argument, 0, 'b'}, + {"trigger", required_argument, 0, 't'}, + {"class", required_argument, 0, 'c'}, + {"op", required_argument, 0, 'o'}, + {"instance", required_argument, 0, 'i'}, + {"enforce", no_argument, &enforce, 1}, + }; - while(1) { - - int option_index = 0; - char c = getopt_long(argc, argv, "n:p:", long_options, &option_index); - if(c == -1){ - break; - } + while(1) { - switch(c) - { + int option_index = 0; + char c = getopt_long(argc, argv, "n:p:w:b:t:c:o:i:", long_options, &option_index); - case 0: - /* An option flag was set. - Do nothing for now */ - break; - - case 'n': - strcpy(name, optarg); - break; + if(c == -1){ + break; + } + + switch(c) + { + + case 0: + /* An option flag was set. + Do nothing for now */ + break; - case 'p': - strcpy(port, optarg); - break; + case 'n': + strcpy(name, optarg); + break; + case 'p': + strcpy(port, optarg); + break; - case 'h': - usage(argv[0]); - exit(0); + case 'w': + window_length = atoi(optarg); + break; + + case 't': + trigger_threshold = atoi(optarg); + break; + + case 'b': + blocking_rate = atof(optarg); + break; + + case 'o': + op = static_cast(atoi(optarg)); + break; + + case 'i': + instance_id.assign(optarg); + break; + + case 'c': + class_id = atoi(optarg); + break; + + case 'h': + usage(argv[0]); + exit(0); - default: - usage(argv[0]); - exit(1); - } - }; + default: + usage(argv[0]); + exit(1); + } + }; + + int log_level = MDCLOG_INFO; + init_logger(name, static_cast(log_level)); + + mdclog_write(MDCLOG_INFO, "XaPP name specified = %s", name); + mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", port); init_logger(name, MDCLOG_INFO); @@ -127,66 +191,57 @@ int main(int argc, char *argv[]){ mdclog_write(MDCLOG_INFO, "XaPP port specified = %s", port); mdclog_write(MDCLOG_INFO,"XaPP listener threads specified = auto"); - my_xapp = std::make_unique(name, port, 1024, 1); + my_xapp = std::make_unique(name, port, 16384); // Start receiving loop ... std::vector thread_ids(num_threads); for(unsigned int i = 0; i < num_threads; i++){ - thread_ids[i] = (*my_xapp).StartThread(&rcv_message, msg_error); + thread_ids[i] = (*my_xapp).StartThread(rcv_message, msg_error); i++; }; - bool enforce = true; - int block_rate = 2; - int window_length = 20; - int trigger_threshold = 40; + char buffer[1024]; - while(1){ + std::string message_string ; + std::stringstream policy; + std::stringstream msg; + bool res = false; + switch(op){ - // Send a valid config - std::string message_string ; - std::string start = "{"; - std::string end = "}"; - - message_string = start + "\"enforce\":" + (enforce? "true":"false") + ","; - message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ","; - message_string += std::string("\"window_length\":") + std::to_string(window_length) + ","; - message_string += std::string("\"trigger_threshold\":") + std::to_string(trigger_threshold) + end; - memcpy(buffer, message_string.c_str(), message_string.length()); - my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer); - + case CREATE: + case UPDATE: + policy <<"{ " << "\"enforce\":true, " << "\"window_length\":" << window_length << " , \"trigger_threshold\":" << trigger_threshold << ", \"blocking_rate\":" << blocking_rate << ", \"class\":" << class_id << " }" ; - sleep(2); + // Send a create/update + msg << "{ " << "\"policy_type_id\":" << 21000 << "," << "\"policy_instance_id\":\"" << instance_id << "\", \"operation\":\"" << op_strings[op] << "\", \"payload\" :" << policy.str() << "}"; + res = true; + break; - // // Send an invalid config - message_string = start + "\"enfce\":" + (enforce? "true":"false") + ","; - message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ","; - message_string += std::string("\"window_length\":") + std::to_string(window_length) + end; - memcpy(buffer, message_string.c_str(), message_string.length()); - my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer); - sleep(2); + case DELETE: + // send a delete + msg << "{ " << "\"policy_type_id\":" << 21000 << "," << "\"policy_instance_id\": \"" << instance_id << "\", \"operation\": \"" << op_strings[op] << "\" }"; + res = true; + break; - // Send invalid JSON - message_string.assign("\"enforce\":false,"); - message_string += std::string("\"blocking_rate\":") + std::to_string(block_rate) + ","; - message_string += std::string("\"window_length\":") + std::to_string(window_length) + end; - memcpy(buffer, message_string.c_str(), message_string.length()); - my_xapp.get()->Send(DC_ADM_INT_CONTROL, message_string.length(), buffer); - - sleep(2); - + default: + std::cerr <<"Not yet supported " << std::endl; - // Send request for policy - // we don't care about contents of request for now ... - std::cout <<"Sending request to get policy" << std::endl; - my_xapp.get()->Send(DC_ADM_GET_POLICY, message_string.length(), buffer); - sleep(2); + } - window_length += 1; - + if(res){ + message_string = msg.str(); + std::cout <<"Sending message = " << message_string << std::endl; + memcpy(buffer, message_string.c_str(), message_string.length()); + my_xapp.get()->Send(A1_POLICY_REQ, message_string.length(), buffer, link_types::HIGH_RELIABILITY); } + + std::unique_lock lck(notify_lock); + // release lock and got to sleep waiting to be notified + notify_var.wait_for(lck, std::chrono::seconds(5)); + + // finish (*my_xapp).Stop(); }