From 9fd1eb03e66522e79c94dec7ed26f68c17018fc1 Mon Sep 17 00:00:00 2001 From: vlad shkapenyuk Date: Wed, 10 Feb 2021 16:15:37 -0500 Subject: [PATCH] Updates to examples queries. Improve real-timeness of lfta aggregation Signed-off-by: vlad shkapenyuk Change-Id: I0f6ac2c830677facf2fe068ddb1a42d6a5cd0701 --- demo/CSVEXAMPLE/README.md | 17 +++++++++++++++++ demo/CSVEXAMPLE/example.data | 1 + demo/CSVEXAMPLE/example.gsql | 10 ++++++++++ demo/CSVEXAMPLE/example2.gsql | 6 ++++++ demo/CSVEXAMPLE/gen_feed.py | 9 +++++++++ demo/CSVEXAMPLE/killexample | 6 ++++++ demo/CSVEXAMPLE/output_spec.cfg | 2 ++ demo/CSVEXAMPLE/runall | 11 +++++++++++ demo/GDATEXAMPLE/README.md | 15 +++++++++++++++ demo/GDATEXAMPLE/example.gdat | Bin 0 -> 6300 bytes demo/GDATEXAMPLE/example.gsql | 11 +++++++++++ demo/GDATEXAMPLE/gen_feed.py | 9 +++++++++ demo/GDATEXAMPLE/killexample | 6 ++++++ demo/GDATEXAMPLE/output_spec.cfg | 1 + demo/GDATEXAMPLE/runall | 11 +++++++++++ demo/PIPE/README.md | 8 ++++++++ demo/PIPE/gen_data.py | 29 +++++++++++++++++++++++++++++ demo/PIPE/killexample | 4 ++++ demo/PIPE/output_spec.cfg | 2 ++ demo/PIPE/queries.gsql | 14 ++++++++++++++ demo/PIPE/runall | 10 ++++++++++ src/ftacmp/generate_lfta_code.cc | 12 ++++++++++++ src/ftacmp/translate_fta.cc | 14 ++++---------- 23 files changed, 198 insertions(+), 10 deletions(-) create mode 100644 demo/CSVEXAMPLE/README.md create mode 100644 demo/CSVEXAMPLE/example.data create mode 100644 demo/CSVEXAMPLE/example.gsql create mode 100644 demo/CSVEXAMPLE/example2.gsql create mode 100644 demo/CSVEXAMPLE/gen_feed.py create mode 100644 demo/CSVEXAMPLE/killexample create mode 100644 demo/CSVEXAMPLE/output_spec.cfg create mode 100644 demo/CSVEXAMPLE/runall create mode 100644 demo/GDATEXAMPLE/README.md create mode 100644 demo/GDATEXAMPLE/example.gdat create mode 100644 demo/GDATEXAMPLE/example.gsql create mode 100644 demo/GDATEXAMPLE/gen_feed.py create mode 100644 demo/GDATEXAMPLE/killexample create mode 100644 demo/GDATEXAMPLE/output_spec.cfg create mode 100644 demo/GDATEXAMPLE/runall create mode 100644 demo/PIPE/README.md create mode 100644 demo/PIPE/gen_data.py create mode 100644 demo/PIPE/killexample create mode 100644 demo/PIPE/output_spec.cfg create mode 100644 demo/PIPE/queries.gsql create mode 100644 demo/PIPE/runall diff --git a/demo/CSVEXAMPLE/README.md b/demo/CSVEXAMPLE/README.md new file mode 100644 index 0000000..f39cd6f --- /dev/null +++ b/demo/CSVEXAMPLE/README.md @@ -0,0 +1,17 @@ +This example demonstrates a query system reading from a file stream. + +gen_feed.py creates the file stream by replacing its contents +every second, checking first if the GS-lite instance has finished +processing the file. + +Additional features demonstrated: + - a reference to an interface set in example.gsql + - a reference to a library query in example2.gsql + - the output_spec.cfg has query example putting output on a stream, + while example2 periodically dumps output in files in directory output_dir + - The files in output_dir are in gdat format. Use gs-lite/bin/gdat2ascii + to extract their contents to ascii + - the runall script starts up query system, note the use of gsprintconsole + for both example and example2 + - the killall script ensures that gsprintconsole and gen_feed.py instances + are stopped. diff --git a/demo/CSVEXAMPLE/example.data b/demo/CSVEXAMPLE/example.data new file mode 100644 index 0000000..1634b13 --- /dev/null +++ b/demo/CSVEXAMPLE/example.data @@ -0,0 +1 @@ +1|2|3.1.1.1|1234:4321:5678:8765:9abc:cba9:def0:0def|s5string|TRUE|-7|-8|9.1 diff --git a/demo/CSVEXAMPLE/example.gsql b/demo/CSVEXAMPLE/example.gsql new file mode 100644 index 0000000..9a019cb --- /dev/null +++ b/demo/CSVEXAMPLE/example.gsql @@ -0,0 +1,10 @@ +DEFINE { +query_name 'example'; +} +select systemTime,uintInPosition1,ullongInPosition2,ipInPosition3, + ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7, + llongInPosition8,floatInPosition9,count(*) as Cnt +from [csv].CSV_EXAMPLE +group by systemTime,uintInPosition1,ullongInPosition2,ipInPosition3, + ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7, + llongInPosition8,floatInPosition9 diff --git a/demo/CSVEXAMPLE/example2.gsql b/demo/CSVEXAMPLE/example2.gsql new file mode 100644 index 0000000..27a1056 --- /dev/null +++ b/demo/CSVEXAMPLE/example2.gsql @@ -0,0 +1,6 @@ +DEFINE { +query_name 'example2'; +} +select systemTime,uintInPosition1,sum(Cnt) +from CSVEXAMPLELIB/ex2_src +group by systemTime,uintInPosition1 diff --git a/demo/CSVEXAMPLE/gen_feed.py b/demo/CSVEXAMPLE/gen_feed.py new file mode 100644 index 0000000..6807104 --- /dev/null +++ b/demo/CSVEXAMPLE/gen_feed.py @@ -0,0 +1,9 @@ +import os +import time + +while True: + while os.path.isfile("exampleCsv"): + time.sleep(1) + os.system("cp example.data exampleCsvX ; mv exampleCsvX exampleCsv") + time.sleep(1) + diff --git a/demo/CSVEXAMPLE/killexample b/demo/CSVEXAMPLE/killexample new file mode 100644 index 0000000..fd2301f --- /dev/null +++ b/demo/CSVEXAMPLE/killexample @@ -0,0 +1,6 @@ +#!/bin/sh +./stopit +killall gsprintconsole +killall gen_feed.py +killall -9 gsprintconsole +killall -9 gen_feed.py diff --git a/demo/CSVEXAMPLE/output_spec.cfg b/demo/CSVEXAMPLE/output_spec.cfg new file mode 100644 index 0000000..90606a9 --- /dev/null +++ b/demo/CSVEXAMPLE/output_spec.cfg @@ -0,0 +1,2 @@ +example,stream,,,,, +example2,file,,output_dir,1,,1 diff --git a/demo/CSVEXAMPLE/runall b/demo/CSVEXAMPLE/runall new file mode 100644 index 0000000..8a4422c --- /dev/null +++ b/demo/CSVEXAMPLE/runall @@ -0,0 +1,11 @@ +#!/bin/sh +./killexample +sleep 1 +./runit +python ./gen_feed.py & +sleep 10 + +../../bin/gsprintconsole `cat gshub.log` default example & +../../bin/gsprintconsole `cat gshub.log` default example2 & +sleep 1 +../../bin/start_processing diff --git a/demo/GDATEXAMPLE/README.md b/demo/GDATEXAMPLE/README.md new file mode 100644 index 0000000..1bb4891 --- /dev/null +++ b/demo/GDATEXAMPLE/README.md @@ -0,0 +1,15 @@ +This example demonstrates a query system reading from a file stream. + +gen_feed.py creates the file stream by replacing its contents +every second, checking first if the GS-lite instance has finished +processing the file. + +The source file is in gdat format, use + ../../bin/gdat2ascii example.gdat to see its contents + +Additional features demonstrated: + - a reference to an interface set in example.gsql + - the runall script starts up query system, note the use of gsprintconsole + to start processing + - the killall script ensures that gsprintconsole and gen_feed.py instances + are stopped. diff --git a/demo/GDATEXAMPLE/example.gdat b/demo/GDATEXAMPLE/example.gdat new file mode 100644 index 0000000000000000000000000000000000000000..7537b5216b9663f688d6391c2cffa1022dd88ba4 GIT binary patch literal 6300 zcmb`KNpI6Y6vrDl)e;x(NCTI&QdD(G_XD_Tl1576Do%4j$a0%WWyy6?$7vfvDTPAW z_qBk4vX=e8fdk)w55SEp2M+WIr}D;0BmcG#$^54RwVFdZbuZu1S)b%|zuCp$qy<$)uzvwN50`SyHZ+9XgXWW+<7a zj#e!$gt~z9rkThKPUGmKwaM^VJ8MShnNCSEUCy@QZS1;SB^o>Bc{OP7CvG*oC-$6 z(xNmPjV032n$$X5p>|cXbZCoUqh*5N)NERyHnpZ`UNZ{%6fKkB;XEzq71KGw1an5& zm^4hosWP8KZA&*Hz9>oKaz;+o;7p|^opSiblT~gMB0>WZNDCp*2wTzZs>EPOXdoc@ zI)~+boxvzVDD;mmVHYg6PN=CdKqYhp%*X$0`rYY#kq{BqFu*ioEYw_ZP>Gce5p^Qf zra7G*P~CRMfnOPH-s4=&%WbV|y zNL?i?c!xr=^@@dKL@_3@|4<7Y%KYr-OaM-Hu@M}BKw$UYaAbdOVU@0Z`MU9abI&i9 zOS#$l*PpGet>5rrM~{0PIK&t& example1.csv & +../../bin/gsprintconsole `cat gshub.log` default example2 -s > example2.csv & +sleep 1 +../../bin/start_processing diff --git a/src/ftacmp/generate_lfta_code.cc b/src/ftacmp/generate_lfta_code.cc index 61872c4..5ec4167 100644 --- a/src/ftacmp/generate_lfta_code.cc +++ b/src/ftacmp/generate_lfta_code.cc @@ -464,6 +464,7 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl, ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n"; // ret+="\tint bitmap_size;\n"; ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n"; + ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n"; ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n"; ret += "\tint max_windows; // max number of open windows.\n"; ret += "\tunsigned int generation; // initially zero, increment on\n"; @@ -2139,7 +2140,16 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr; } } + ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n"; + ret += "\t}else{\n"; + ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n"; + ret += "\t\tt->n_ticks++;\n"; + ret += "\t\tif(t->n_ticks == 2){\n"; + ret += "\t\t\tif(t->flush_posmax_aggrs) \n"; + ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n"; + ret += "\t\t}\n"; ret += "\t}\n\n"; + } @@ -2299,6 +2309,7 @@ string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *sc temporal_flush+="\t\t/* \t\tmark all groups as old */\n"; temporal_flush+="\t\tt->generation++;\n"; temporal_flush+="\t\tt->flush_pos = 0;\n"; + temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n"; // Now set the saved temporal value of the gb to the @@ -4161,6 +4172,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo if(is_aggr_query){ ret += "\tf->n_aggrs = 0;\n"; + ret += "\tf->n_ticks = 0; // for limiting slow flush\n"; ret += "\tf->max_aggrs = "; diff --git a/src/ftacmp/translate_fta.cc b/src/ftacmp/translate_fta.cc index 4e5531e..603a3cf 100644 --- a/src/ftacmp/translate_fta.cc +++ b/src/ftacmp/translate_fta.cc @@ -3275,13 +3275,13 @@ void generate_makefile(vector &input_file_names, int nfiles, "lfta.o: %s_lfta.cc\n" "\t$(CC) -o lfta.o -c %s_lfta.cc\n" "\n" -"%s_lfta.cc: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str()); +"%s_lfta.cc: %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str()); for(i=0;i &input_file_names, int nfiles, ); fprintf(outfl, -("\n" -"packet_schema.txt:\n" -"\tln -s "+root_path+"/cfg/packet_schema.txt .\n" -"\n" -"external_fcns.def:\n" -"\tln -s "+root_path+"/cfg/external_fcns.def .\n" "\n" "clean:\n" -"\trm -rf core rts *.o %s_lfta.cc external_fcns.def packet_schema.txt").c_str(),hostname.c_str()); +"\trm -rf core rts *.o %s_lfta.cc",hostname.c_str()); for(i=0;i