From: vlad shkapenyuk Date: Wed, 10 Feb 2021 21:15:37 +0000 (-0500) Subject: Updates to examples queries. Improve real-timeness of lfta aggregation X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=9fd1eb03e66522e79c94dec7ed26f68c17018fc1;p=com%2Fgs-lite.git Updates to examples queries. Improve real-timeness of lfta aggregation Signed-off-by: vlad shkapenyuk Change-Id: I0f6ac2c830677facf2fe068ddb1a42d6a5cd0701 --- diff --git a/demo/CSVEXAMPLE/README.md b/demo/CSVEXAMPLE/README.md new file mode 100644 index 0000000..f39cd6f --- /dev/null +++ b/demo/CSVEXAMPLE/README.md @@ -0,0 +1,17 @@ +This example demonstrates a query system reading from a file stream. + +gen_feed.py creates the file stream by replacing its contents +every second, checking first if the GS-lite instance has finished +processing the file. + +Additional features demonstrated: + - a reference to an interface set in example.gsql + - a reference to a library query in example2.gsql + - the output_spec.cfg has query example putting output on a stream, + while example2 periodically dumps output in files in directory output_dir + - The files in output_dir are in gdat format. Use gs-lite/bin/gdat2ascii + to extract their contents to ascii + - the runall script starts up query system, note the use of gsprintconsole + for both example and example2 + - the killall script ensures that gsprintconsole and gen_feed.py instances + are stopped. diff --git a/demo/CSVEXAMPLE/example.data b/demo/CSVEXAMPLE/example.data new file mode 100644 index 0000000..1634b13 --- /dev/null +++ b/demo/CSVEXAMPLE/example.data @@ -0,0 +1 @@ +1|2|3.1.1.1|1234:4321:5678:8765:9abc:cba9:def0:0def|s5string|TRUE|-7|-8|9.1 diff --git a/demo/CSVEXAMPLE/example.gsql b/demo/CSVEXAMPLE/example.gsql new file mode 100644 index 0000000..9a019cb --- /dev/null +++ b/demo/CSVEXAMPLE/example.gsql @@ -0,0 +1,10 @@ +DEFINE { +query_name 'example'; +} +select systemTime,uintInPosition1,ullongInPosition2,ipInPosition3, + ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7, + llongInPosition8,floatInPosition9,count(*) as Cnt +from [csv].CSV_EXAMPLE +group by systemTime,uintInPosition1,ullongInPosition2,ipInPosition3, + ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7, + llongInPosition8,floatInPosition9 diff --git a/demo/CSVEXAMPLE/example2.gsql b/demo/CSVEXAMPLE/example2.gsql new file mode 100644 index 0000000..27a1056 --- /dev/null +++ b/demo/CSVEXAMPLE/example2.gsql @@ -0,0 +1,6 @@ +DEFINE { +query_name 'example2'; +} +select systemTime,uintInPosition1,sum(Cnt) +from CSVEXAMPLELIB/ex2_src +group by systemTime,uintInPosition1 diff --git a/demo/CSVEXAMPLE/gen_feed.py b/demo/CSVEXAMPLE/gen_feed.py new file mode 100644 index 0000000..6807104 --- /dev/null +++ b/demo/CSVEXAMPLE/gen_feed.py @@ -0,0 +1,9 @@ +import os +import time + +while True: + while os.path.isfile("exampleCsv"): + time.sleep(1) + os.system("cp example.data exampleCsvX ; mv exampleCsvX exampleCsv") + time.sleep(1) + diff --git a/demo/CSVEXAMPLE/killexample b/demo/CSVEXAMPLE/killexample new file mode 100644 index 0000000..fd2301f --- /dev/null +++ b/demo/CSVEXAMPLE/killexample @@ -0,0 +1,6 @@ +#!/bin/sh +./stopit +killall gsprintconsole +killall gen_feed.py +killall -9 gsprintconsole +killall -9 gen_feed.py diff --git a/demo/CSVEXAMPLE/output_spec.cfg b/demo/CSVEXAMPLE/output_spec.cfg new file mode 100644 index 0000000..90606a9 --- /dev/null +++ b/demo/CSVEXAMPLE/output_spec.cfg @@ -0,0 +1,2 @@ +example,stream,,,,, +example2,file,,output_dir,1,,1 diff --git a/demo/CSVEXAMPLE/runall b/demo/CSVEXAMPLE/runall new file mode 100644 index 0000000..8a4422c --- /dev/null +++ b/demo/CSVEXAMPLE/runall @@ -0,0 +1,11 @@ +#!/bin/sh +./killexample +sleep 1 +./runit +python ./gen_feed.py & +sleep 10 + +../../bin/gsprintconsole `cat gshub.log` default example & +../../bin/gsprintconsole `cat gshub.log` default example2 & +sleep 1 +../../bin/start_processing diff --git a/demo/GDATEXAMPLE/README.md b/demo/GDATEXAMPLE/README.md new file mode 100644 index 0000000..1bb4891 --- /dev/null +++ b/demo/GDATEXAMPLE/README.md @@ -0,0 +1,15 @@ +This example demonstrates a query system reading from a file stream. + +gen_feed.py creates the file stream by replacing its contents +every second, checking first if the GS-lite instance has finished +processing the file. + +The source file is in gdat format, use + ../../bin/gdat2ascii example.gdat to see its contents + +Additional features demonstrated: + - a reference to an interface set in example.gsql + - the runall script starts up query system, note the use of gsprintconsole + to start processing + - the killall script ensures that gsprintconsole and gen_feed.py instances + are stopped. diff --git a/demo/GDATEXAMPLE/example.gdat b/demo/GDATEXAMPLE/example.gdat new file mode 100644 index 0000000..7537b52 Binary files /dev/null and b/demo/GDATEXAMPLE/example.gdat differ diff --git a/demo/GDATEXAMPLE/example.gsql b/demo/GDATEXAMPLE/example.gsql new file mode 100644 index 0000000..4ebc9f0 --- /dev/null +++ b/demo/GDATEXAMPLE/example.gsql @@ -0,0 +1,11 @@ +DEFINE { +query_name 'example'; +visibility 'external'; +} +select systemTime,uintInPosition1,ullongInPosition2,ipInPosition3, + ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7, + llongInPosition8,floatInPosition9,count(*) as cnt +from [gdat].GDAT_EXAMPLE +group by systemTime,uintInPosition1,ullongInPosition2,ipInPosition3, + ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7, + llongInPosition8,floatInPosition9 diff --git a/demo/GDATEXAMPLE/gen_feed.py b/demo/GDATEXAMPLE/gen_feed.py new file mode 100644 index 0000000..0e0213d --- /dev/null +++ b/demo/GDATEXAMPLE/gen_feed.py @@ -0,0 +1,9 @@ +import os +import time + +while True: + while os.path.isfile("exampleGdat"): + time.sleep(1) + os.system("cp example.gdat exampleGdatX ; mv exampleGdatX exampleGdat") + time.sleep(1) + diff --git a/demo/GDATEXAMPLE/killexample b/demo/GDATEXAMPLE/killexample new file mode 100644 index 0000000..022375b --- /dev/null +++ b/demo/GDATEXAMPLE/killexample @@ -0,0 +1,6 @@ +#!/bin/sh +./stopit +killall gsprintconsole +killall gen_feed.py +killall -9 gsprintconsole +killall -9 gen_feed.py diff --git a/demo/GDATEXAMPLE/output_spec.cfg b/demo/GDATEXAMPLE/output_spec.cfg new file mode 100644 index 0000000..8ffaaa9 --- /dev/null +++ b/demo/GDATEXAMPLE/output_spec.cfg @@ -0,0 +1 @@ +example,stream,,,,, diff --git a/demo/GDATEXAMPLE/runall b/demo/GDATEXAMPLE/runall new file mode 100644 index 0000000..1fce2d2 --- /dev/null +++ b/demo/GDATEXAMPLE/runall @@ -0,0 +1,11 @@ +#!/bin/sh +./killexample +sleep 1 +./runit +sleep 10 +python ./gen_feed.py & + +../../bin/gsprintconsole `cat gshub.log` default example & +sleep 10 +../../bin/start_processing + diff --git a/demo/PIPE/README.md b/demo/PIPE/README.md new file mode 100644 index 0000000..79891c8 --- /dev/null +++ b/demo/PIPE/README.md @@ -0,0 +1,8 @@ +This example demonstrates a query system reading from a single file. + +The input data file is csvsingle.dat, and it remains after processing. + +Additional features demonstrated: + - a reference to a specific interface + - The output is redirected into example1.csv and example2.csv + - THe last line is an end-of-data marker. diff --git a/demo/PIPE/gen_data.py b/demo/PIPE/gen_data.py new file mode 100644 index 0000000..63fd800 --- /dev/null +++ b/demo/PIPE/gen_data.py @@ -0,0 +1,29 @@ +import random +import time + +ipv4s = ["1.2.3.4", "3.4.5.6", "112.221.54.34", "66.77.88.99"] +ipv6s = ["1234:4321:5678:8765:9abc:cba9:def0:0def", + "2234:4321:5678:8765:9abc:cba9:def0:0de2", + "3234:4321:5678:8765:9abc:cba9:def0:0de3", + "4234:4321:5678:8765:9abc:cba9:def0:0de4" ] + +strings = ["foo", "bar", "zed", "flip", "flood"] +bools = ["TRUE", "FALSE"] + +tstart = int(time.time()) +tend = tstart+50 +rec_per_ts = 5 + +for ts in xrange(tstart, tend+1): + for rno in xrange(0, rec_per_ts): + v2 = str(random.randint(1,100000)) + v3 = ipv4s[random.randrange(0,len(ipv4s))] + v4 = ipv6s[random.randrange(0,len(ipv6s))] + v5 = strings[random.randrange(0,len(strings))] + v6 = bools[random.randrange(0,len(bools))] + v7 = str(random.randint(1,100000)) + v8 = str(random.randint(1,100000)) + v9 = str(random.uniform(1,100000)) + rec = [str(ts), v2, v3, v4, v5, v6, v7, v8, v9] + print ",".join(rec) + diff --git a/demo/PIPE/killexample b/demo/PIPE/killexample new file mode 100644 index 0000000..ba040e4 --- /dev/null +++ b/demo/PIPE/killexample @@ -0,0 +1,4 @@ +#!/bin/sh +./stopit +killall gsprintconsole +killall -9 gsprintconsole diff --git a/demo/PIPE/output_spec.cfg b/demo/PIPE/output_spec.cfg new file mode 100644 index 0000000..ac3adee --- /dev/null +++ b/demo/PIPE/output_spec.cfg @@ -0,0 +1,2 @@ +example1,stream,,,,, +example2,stream,,,,, diff --git a/demo/PIPE/queries.gsql b/demo/PIPE/queries.gsql new file mode 100644 index 0000000..2067042 --- /dev/null +++ b/demo/PIPE/queries.gsql @@ -0,0 +1,14 @@ +DEFINE{ query_name 'example1'; +} +select systemTime,uintInPosition1,ullongInPosition2, + count(*) +from CSV_SINGLE.SingleFile +group by systemTime,uintInPosition1,ullongInPosition2 +; + +DEFINE{ query_name 'example2'; +} +select systemTime,uintInPosition1,ullongInPosition2,ipInPosition3, + ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7, + llongInPosition8,floatInPosition9 +from CSV_SINGLE.SingleFile diff --git a/demo/PIPE/runall b/demo/PIPE/runall new file mode 100644 index 0000000..1d84261 --- /dev/null +++ b/demo/PIPE/runall @@ -0,0 +1,10 @@ +#!/bin/sh +./killexample +sleep 1 +./runit +sleep 10 + +../../bin/gsprintconsole `cat gshub.log` default example1 -s > example1.csv & +../../bin/gsprintconsole `cat gshub.log` default example2 -s > example2.csv & +sleep 1 +../../bin/start_processing diff --git a/src/ftacmp/generate_lfta_code.cc b/src/ftacmp/generate_lfta_code.cc index 61872c4..5ec4167 100644 --- a/src/ftacmp/generate_lfta_code.cc +++ b/src/ftacmp/generate_lfta_code.cc @@ -464,6 +464,7 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl, ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n"; // ret+="\tint bitmap_size;\n"; ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n"; + ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n"; ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n"; ret += "\tint max_windows; // max number of open windows.\n"; ret += "\tunsigned int generation; // initially zero, increment on\n"; @@ -2139,7 +2140,16 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr; } } + ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n"; + ret += "\t}else{\n"; + ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n"; + ret += "\t\tt->n_ticks++;\n"; + ret += "\t\tif(t->n_ticks == 2){\n"; + ret += "\t\t\tif(t->flush_posmax_aggrs) \n"; + ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n"; + ret += "\t\t}\n"; ret += "\t}\n\n"; + } @@ -2299,6 +2309,7 @@ string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *sc temporal_flush+="\t\t/* \t\tmark all groups as old */\n"; temporal_flush+="\t\tt->generation++;\n"; temporal_flush+="\t\tt->flush_pos = 0;\n"; + temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n"; // Now set the saved temporal value of the gb to the @@ -4161,6 +4172,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo if(is_aggr_query){ ret += "\tf->n_aggrs = 0;\n"; + ret += "\tf->n_ticks = 0; // for limiting slow flush\n"; ret += "\tf->max_aggrs = "; diff --git a/src/ftacmp/translate_fta.cc b/src/ftacmp/translate_fta.cc index 4e5531e..603a3cf 100644 --- a/src/ftacmp/translate_fta.cc +++ b/src/ftacmp/translate_fta.cc @@ -3275,13 +3275,13 @@ void generate_makefile(vector &input_file_names, int nfiles, "lfta.o: %s_lfta.cc\n" "\t$(CC) -o lfta.o -c %s_lfta.cc\n" "\n" -"%s_lfta.cc: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str()); +"%s_lfta.cc: %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str()); for(i=0;i &input_file_names, int nfiles, ); fprintf(outfl, -("\n" -"packet_schema.txt:\n" -"\tln -s "+root_path+"/cfg/packet_schema.txt .\n" -"\n" -"external_fcns.def:\n" -"\tln -s "+root_path+"/cfg/external_fcns.def .\n" "\n" "clean:\n" -"\trm -rf core rts *.o %s_lfta.cc external_fcns.def packet_schema.txt").c_str(),hostname.c_str()); +"\trm -rf core rts *.o %s_lfta.cc",hostname.c_str()); for(i=0;i