Updates to examples queries. Improve real-timeness of lfta aggregation 30/5630/1
authorvlad shkapenyuk <vshkap@research.att.com>
Wed, 10 Feb 2021 21:15:37 +0000 (16:15 -0500)
committervlad shkapenyuk <vshkap@research.att.com>
Wed, 10 Feb 2021 21:15:37 +0000 (16:15 -0500)
Signed-off-by: vlad shkapenyuk <vshkap@research.att.com>
Change-Id: I0f6ac2c830677facf2fe068ddb1a42d6a5cd0701

23 files changed:
demo/CSVEXAMPLE/README.md [new file with mode: 0644]
demo/CSVEXAMPLE/example.data [new file with mode: 0644]
demo/CSVEXAMPLE/example.gsql [new file with mode: 0644]
demo/CSVEXAMPLE/example2.gsql [new file with mode: 0644]
demo/CSVEXAMPLE/gen_feed.py [new file with mode: 0644]
demo/CSVEXAMPLE/killexample [new file with mode: 0644]
demo/CSVEXAMPLE/output_spec.cfg [new file with mode: 0644]
demo/CSVEXAMPLE/runall [new file with mode: 0644]
demo/GDATEXAMPLE/README.md [new file with mode: 0644]
demo/GDATEXAMPLE/example.gdat [new file with mode: 0644]
demo/GDATEXAMPLE/example.gsql [new file with mode: 0644]
demo/GDATEXAMPLE/gen_feed.py [new file with mode: 0644]
demo/GDATEXAMPLE/killexample [new file with mode: 0644]
demo/GDATEXAMPLE/output_spec.cfg [new file with mode: 0644]
demo/GDATEXAMPLE/runall [new file with mode: 0644]
demo/PIPE/README.md [new file with mode: 0644]
demo/PIPE/gen_data.py [new file with mode: 0644]
demo/PIPE/killexample [new file with mode: 0644]
demo/PIPE/output_spec.cfg [new file with mode: 0644]
demo/PIPE/queries.gsql [new file with mode: 0644]
demo/PIPE/runall [new file with mode: 0644]
src/ftacmp/generate_lfta_code.cc
src/ftacmp/translate_fta.cc

diff --git a/demo/CSVEXAMPLE/README.md b/demo/CSVEXAMPLE/README.md
new file mode 100644 (file)
index 0000000..f39cd6f
--- /dev/null
@@ -0,0 +1,17 @@
+This example demonstrates a query system reading from a file stream.
+
+gen_feed.py creates the file stream by replacing its contents
+every second, checking first if the GS-lite instance has finished
+processing the file.
+
+Additional features demonstrated:
+  - a reference to an interface set in example.gsql
+  - a reference to a library query in example2.gsql
+  - the output_spec.cfg has query example putting output on a stream,
+    while example2 periodically dumps output in files in directory output_dir
+  - The files in output_dir are in gdat format.  Use gs-lite/bin/gdat2ascii
+    to extract their contents to ascii
+  - the runall script starts up query system, note the use of gsprintconsole
+    for both example and example2
+  - the killall script ensures that gsprintconsole and gen_feed.py instances
+    are stopped.
diff --git a/demo/CSVEXAMPLE/example.data b/demo/CSVEXAMPLE/example.data
new file mode 100644 (file)
index 0000000..1634b13
--- /dev/null
@@ -0,0 +1 @@
+1|2|3.1.1.1|1234:4321:5678:8765:9abc:cba9:def0:0def|s5string|TRUE|-7|-8|9.1
diff --git a/demo/CSVEXAMPLE/example.gsql b/demo/CSVEXAMPLE/example.gsql
new file mode 100644 (file)
index 0000000..9a019cb
--- /dev/null
@@ -0,0 +1,10 @@
+DEFINE {
+query_name 'example';
+}
+select systemTime,uintInPosition1,ullongInPosition2,ipInPosition3,
+       ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7,
+       llongInPosition8,floatInPosition9,count(*) as Cnt
+from [csv].CSV_EXAMPLE
+group by systemTime,uintInPosition1,ullongInPosition2,ipInPosition3,
+       ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7,
+       llongInPosition8,floatInPosition9
diff --git a/demo/CSVEXAMPLE/example2.gsql b/demo/CSVEXAMPLE/example2.gsql
new file mode 100644 (file)
index 0000000..27a1056
--- /dev/null
@@ -0,0 +1,6 @@
+DEFINE {
+query_name 'example2';
+}
+select systemTime,uintInPosition1,sum(Cnt)
+from CSVEXAMPLELIB/ex2_src
+group by systemTime,uintInPosition1
diff --git a/demo/CSVEXAMPLE/gen_feed.py b/demo/CSVEXAMPLE/gen_feed.py
new file mode 100644 (file)
index 0000000..6807104
--- /dev/null
@@ -0,0 +1,9 @@
+import os
+import time
+
+while True:
+       while os.path.isfile("exampleCsv"):
+               time.sleep(1)
+       os.system("cp example.data exampleCsvX ; mv exampleCsvX exampleCsv")
+       time.sleep(1)
+       
diff --git a/demo/CSVEXAMPLE/killexample b/demo/CSVEXAMPLE/killexample
new file mode 100644 (file)
index 0000000..fd2301f
--- /dev/null
@@ -0,0 +1,6 @@
+#!/bin/sh
+./stopit
+killall gsprintconsole 
+killall gen_feed.py
+killall -9 gsprintconsole
+killall -9 gen_feed.py
diff --git a/demo/CSVEXAMPLE/output_spec.cfg b/demo/CSVEXAMPLE/output_spec.cfg
new file mode 100644 (file)
index 0000000..90606a9
--- /dev/null
@@ -0,0 +1,2 @@
+example,stream,,,,,
+example2,file,,output_dir,1,,1
diff --git a/demo/CSVEXAMPLE/runall b/demo/CSVEXAMPLE/runall
new file mode 100644 (file)
index 0000000..8a4422c
--- /dev/null
@@ -0,0 +1,11 @@
+#!/bin/sh
+./killexample
+sleep 1
+./runit
+python ./gen_feed.py &
+sleep 10
+
+../../bin/gsprintconsole `cat gshub.log` default example &
+../../bin/gsprintconsole `cat gshub.log` default example2 &
+sleep 1
+../../bin/start_processing
diff --git a/demo/GDATEXAMPLE/README.md b/demo/GDATEXAMPLE/README.md
new file mode 100644 (file)
index 0000000..1bb4891
--- /dev/null
@@ -0,0 +1,15 @@
+This example demonstrates a query system reading from a file stream.
+
+gen_feed.py creates the file stream by replacing its contents
+every second, checking first if the GS-lite instance has finished
+processing the file.
+
+The source file is in gdat format, use
+  ../../bin/gdat2ascii example.gdat to see its contents
+
+Additional features demonstrated:
+  - a reference to an interface set in example.gsql
+  - the runall script starts up query system, note the use of gsprintconsole
+    to start processing
+  - the killall script ensures that gsprintconsole and gen_feed.py instances
+    are stopped.
diff --git a/demo/GDATEXAMPLE/example.gdat b/demo/GDATEXAMPLE/example.gdat
new file mode 100644 (file)
index 0000000..7537b52
Binary files /dev/null and b/demo/GDATEXAMPLE/example.gdat differ
diff --git a/demo/GDATEXAMPLE/example.gsql b/demo/GDATEXAMPLE/example.gsql
new file mode 100644 (file)
index 0000000..4ebc9f0
--- /dev/null
@@ -0,0 +1,11 @@
+DEFINE {
+query_name 'example';
+visibility 'external';
+}
+select systemTime,uintInPosition1,ullongInPosition2,ipInPosition3,
+       ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7,
+       llongInPosition8,floatInPosition9,count(*) as cnt
+from [gdat].GDAT_EXAMPLE
+group by systemTime,uintInPosition1,ullongInPosition2,ipInPosition3,
+       ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7,
+       llongInPosition8,floatInPosition9
diff --git a/demo/GDATEXAMPLE/gen_feed.py b/demo/GDATEXAMPLE/gen_feed.py
new file mode 100644 (file)
index 0000000..0e0213d
--- /dev/null
@@ -0,0 +1,9 @@
+import os
+import time
+
+while True:
+       while os.path.isfile("exampleGdat"):
+               time.sleep(1)
+       os.system("cp example.gdat exampleGdatX ; mv exampleGdatX exampleGdat")
+       time.sleep(1)
+       
diff --git a/demo/GDATEXAMPLE/killexample b/demo/GDATEXAMPLE/killexample
new file mode 100644 (file)
index 0000000..022375b
--- /dev/null
@@ -0,0 +1,6 @@
+#!/bin/sh
+./stopit
+killall gsprintconsole 
+killall gen_feed.py 
+killall -9 gsprintconsole
+killall -9 gen_feed.py
diff --git a/demo/GDATEXAMPLE/output_spec.cfg b/demo/GDATEXAMPLE/output_spec.cfg
new file mode 100644 (file)
index 0000000..8ffaaa9
--- /dev/null
@@ -0,0 +1 @@
+example,stream,,,,,
diff --git a/demo/GDATEXAMPLE/runall b/demo/GDATEXAMPLE/runall
new file mode 100644 (file)
index 0000000..1fce2d2
--- /dev/null
@@ -0,0 +1,11 @@
+#!/bin/sh
+./killexample
+sleep 1
+./runit
+sleep 10
+python ./gen_feed.py &
+
+../../bin/gsprintconsole `cat gshub.log` default example &
+sleep 10
+../../bin/start_processing
+
diff --git a/demo/PIPE/README.md b/demo/PIPE/README.md
new file mode 100644 (file)
index 0000000..79891c8
--- /dev/null
@@ -0,0 +1,8 @@
+This example demonstrates a query system reading from a single file.
+
+The input data file is csvsingle.dat, and it remains after processing.
+
+Additional features demonstrated:
+  - a reference to a specific interface
+  - The output is redirected into example1.csv and example2.csv
+  - THe last line is an end-of-data marker.
diff --git a/demo/PIPE/gen_data.py b/demo/PIPE/gen_data.py
new file mode 100644 (file)
index 0000000..63fd800
--- /dev/null
@@ -0,0 +1,29 @@
+import random
+import time
+
+ipv4s = ["1.2.3.4", "3.4.5.6", "112.221.54.34", "66.77.88.99"]
+ipv6s = ["1234:4321:5678:8765:9abc:cba9:def0:0def",
+        "2234:4321:5678:8765:9abc:cba9:def0:0de2",
+       "3234:4321:5678:8765:9abc:cba9:def0:0de3",
+       "4234:4321:5678:8765:9abc:cba9:def0:0de4" ]
+
+strings = ["foo", "bar", "zed", "flip", "flood"]
+bools = ["TRUE", "FALSE"]
+
+tstart = int(time.time())
+tend = tstart+50
+rec_per_ts = 5
+
+for ts in xrange(tstart, tend+1):
+       for rno in xrange(0, rec_per_ts):
+               v2 = str(random.randint(1,100000))
+               v3 = ipv4s[random.randrange(0,len(ipv4s))]
+               v4 = ipv6s[random.randrange(0,len(ipv6s))]
+               v5 = strings[random.randrange(0,len(strings))]
+               v6 = bools[random.randrange(0,len(bools))]
+               v7 = str(random.randint(1,100000))
+               v8 = str(random.randint(1,100000))
+               v9 = str(random.uniform(1,100000))
+               rec = [str(ts), v2, v3, v4, v5, v6, v7, v8, v9]
+               print ",".join(rec)
+
diff --git a/demo/PIPE/killexample b/demo/PIPE/killexample
new file mode 100644 (file)
index 0000000..ba040e4
--- /dev/null
@@ -0,0 +1,4 @@
+#!/bin/sh
+./stopit
+killall gsprintconsole 
+killall -9 gsprintconsole
diff --git a/demo/PIPE/output_spec.cfg b/demo/PIPE/output_spec.cfg
new file mode 100644 (file)
index 0000000..ac3adee
--- /dev/null
@@ -0,0 +1,2 @@
+example1,stream,,,,,
+example2,stream,,,,,
diff --git a/demo/PIPE/queries.gsql b/demo/PIPE/queries.gsql
new file mode 100644 (file)
index 0000000..2067042
--- /dev/null
@@ -0,0 +1,14 @@
+DEFINE{ query_name 'example1';
+}
+select systemTime,uintInPosition1,ullongInPosition2,
+       count(*)
+from CSV_SINGLE.SingleFile
+group by systemTime,uintInPosition1,ullongInPosition2
+;
+
+DEFINE{ query_name 'example2';
+}
+select systemTime,uintInPosition1,ullongInPosition2,ipInPosition3,
+       ipv6InPosition4,stringInPosition5,boolInPosition6,intInPosition7,
+       llongInPosition8,floatInPosition9
+from CSV_SINGLE.SingleFile
diff --git a/demo/PIPE/runall b/demo/PIPE/runall
new file mode 100644 (file)
index 0000000..1d84261
--- /dev/null
@@ -0,0 +1,10 @@
+#!/bin/sh
+./killexample
+sleep 1
+./runit
+sleep 10
+
+../../bin/gsprintconsole `cat gshub.log` default example1 -s > example1.csv &
+../../bin/gsprintconsole `cat gshub.log` default example2 -s > example2.csv &
+sleep 1
+../../bin/start_processing
index 61872c4..5ec4167 100644 (file)
@@ -464,6 +464,7 @@ string generate_fta_struct(string node_name, gb_table *gb_tbl,
                ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
 //             ret+="\tint bitmap_size;\n";
                ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
+               ret += "\tgs_int32_t n_ticks; // for limiting slow flush\n";
                ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
                ret += "\tint max_windows; // max number of open windows.\n";
                ret += "\tunsigned int generation; // initially zero, increment on\n";
@@ -2139,7 +2140,16 @@ string generate_fta_clock(string node_name, table_list *schema, unsigned time_co
                        sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);                        ret += tmpstr;
                      }
                 }
+               ret += "\t\tt->n_ticks = 0; // reset clock tick counter, limit slow flush\n";
+               ret += "\t}else{\n";
+               ret += "//\tLimit slow flush, do a full flush at two clock ticks past the change in generation.\n";
+               ret += "\t\tt->n_ticks++;\n";
+               ret += "\t\tif(t->n_ticks == 2){\n";
+               ret += "\t\t\tif(t->flush_pos<t->max_aggrs) \n";
+               ret += "\t\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
+               ret += "\t\t}\n";
                ret += "\t}\n\n";
+               
 
        }
 
@@ -2299,6 +2309,7 @@ string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *sc
                  temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
                  temporal_flush+="\t\tt->generation++;\n";
                  temporal_flush+="\t\tt->flush_pos = 0;\n";
+                 temporal_flush+="\t\tt->n_ticks = 0; // reset clock tick counter, to limit slow flush\n";
 
 
 //                             Now set the saved temporal value of the gb to the
@@ -4161,6 +4172,7 @@ string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, boo
 
        if(is_aggr_query){
                ret += "\tf->n_aggrs = 0;\n";
+               ret += "\tf->n_ticks = 0; // for limiting slow flush\n";
 
                ret += "\tf->max_aggrs = ";
 
index 4e5531e..603a3cf 100644 (file)
@@ -3275,13 +3275,13 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
 "lfta.o: %s_lfta.cc\n"
 "\t$(CC) -o lfta.o -c %s_lfta.cc\n"
 "\n"
-"%s_lfta.cc: external_fcns.def %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
+"%s_lfta.cc: %s ",hostname.c_str(), hostname.c_str(), hostname.c_str(),schema_file_name.c_str());
        for(i=0;i<nfiles;++i)
                fprintf(outfl," %s",input_file_names[i].c_str());
        if(hostname == ""){
-               fprintf(outfl,"\n\t%s/bin/translate_fta %s %s ",root_path.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
+               fprintf(outfl,"\n\t%s/bin/translate_fta -f -N -c -M -R %s %s -l %s/qlib packet_schema.txt ",root_path.c_str(), root_path.c_str(), config_dir_path.c_str(), root_path.c_str());
        }else{
-               fprintf(outfl,"\n\t%s/bin/translate_fta -h %s %s %s ", root_path.c_str(), hostname.c_str(), config_dir_path.c_str(),schema_file_name.c_str());
+               fprintf(outfl,"\n\t%s/bin/translate_fta -f -N -h %s -c -M -R %s %s -l %s/qlib packet_schema.txt ", root_path.c_str(), hostname.c_str(), root_path.c_str(), config_dir_path.c_str(), root_path.c_str());
        }
        for(i=0;i<nfiles;++i)
                fprintf(outfl," %s",input_file_names[i].c_str());
@@ -3303,15 +3303,9 @@ void generate_makefile(vector<string> &input_file_names, int nfiles,
                );
 
        fprintf(outfl,
-("\n"
-"packet_schema.txt:\n"
-"\tln -s "+root_path+"/cfg/packet_schema.txt .\n"
-"\n"
-"external_fcns.def:\n"
-"\tln -s "+root_path+"/cfg/external_fcns.def .\n"
 "\n"
 "clean:\n"
-"\trm -rf core rts *.o  %s_lfta.cc  external_fcns.def packet_schema.txt").c_str(),hostname.c_str());
+"\trm -rf core rts *.o  %s_lfta.cc",hostname.c_str());
        for(i=0;i<hfta_names.size();++i)
                fprintf(outfl," %s %s.cc",hfta_names[i].c_str(),hfta_names[i].c_str());
        fprintf(outfl,"\n");