1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
34 // default value for correlation between the interface card and
36 #define DEFAULT_TIME_CORR 16
41 extern string hash_nums[NRANDS];
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
98 // ----------------------------------------------
99 // Data extracted from the query plan node
100 // for use by code generation.
102 static cplx_lit_table *complex_literals; //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl; // Table of all referenced parameters.
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
109 static gb_table *gb_tbl; // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl; // Table of all referenced aggregates.
112 static bool packed_return; // unpack using structyure, not fcns
113 static nic_property *nicprop; // nic properties for this interface.
114 static int global_id;
117 // The partial_fcns vector can now refer to
118 // partial functions, or expensive functions
119 // which can be cached (if there are multiple refs). A couple
120 // of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
130 // These vectors are for combinable predicates.
131 static vector<int> pred_class; // identifies the group
132 static vector<int> pred_pos; // position in the group.
136 static char tmpstr[1000];
138 //////////////////////////////////////////////////////////////////////
139 /// Various utilities
141 string generate_fta_name(string node_name){
142 string ret = normalize_name(node_name);
152 string generate_aggr_struct_name(string node_name){
153 string ret = normalize_name(node_name);
157 ret += "_aggr_struct";
162 string generate_fj_struct_name(string node_name){
163 string ret = normalize_name(node_name);
172 string generate_watchlist_element_name(string node_name){
173 string ret = normalize_name(node_name);
182 string generate_watchlist_struct_name(string node_name){
183 string ret = normalize_name(node_name);
187 ret += "__wl_struct";
192 string generate_watchlist_name(string node_name){
193 string ret = normalize_name(node_name);
202 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
205 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
206 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
208 if(!schema->get_modifier_list(schref,field)->contains_key("required"))
209 ret += "\tif(retval) goto "+end_goto+";\n";
212 // TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
213 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
214 if(dt.is_buffer_type()){
215 if(dt.get_type() != v_str_t){
216 ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
217 ret += "\t\tgoto "+end_goto+";\n";
218 ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
219 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
220 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
222 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
226 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
227 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
233 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
234 string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
237 for(g=0;g<gb_tbl->size();g++){
238 sprintf(tmpstr,"gb_var%d",g);
239 ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
243 for(a=0;a<aggr_tbl->size();a++){
245 sprintf(tmpstr,"aggr_var%d",a);
246 if(aggr_tbl->is_builtin(a))
247 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
249 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
253 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
262 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
265 if(fs->use_bloom == false){ // uses hash table instead
266 ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
268 for(k=0;k<fs->hash_eq.size();++k){
269 sprintf(tmpstr,"key_var%d",k);
270 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
272 ret += "\tlong long int ts;\n";
279 string generate_watchlist_structs(string node_name, table_def *tbl,
280 std::string filename, int refresh_interval){
283 ret += "struct "+generate_watchlist_element_name(node_name)+"{\n";
284 vector<field_entry *> fields = tbl->get_fields();
285 for(int f=0;f<fields.size();++f){
286 data_type dt(fields[f]->get_type());
287 ret += "\t"+dt.make_cvar(fields[f]->get_name())+";\n";
289 ret += "\tgs_uint64_t hashval;\n";
290 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next;\n";
293 ret += "char *"+generate_watchlist_name(node_name)+"__fstr = \""+filename+"\";\n";
294 ret += "struct "+generate_watchlist_struct_name(node_name)+"{\n";
295 ret += "\tstruct "+ generate_watchlist_element_name(node_name)+" **ht;\n";
296 ret += "\tgs_uint32_t ht_size;\n";
297 ret += "\tgs_uint32_t n_elem;\n";
298 ret += "\tgs_uint32_t refresh_interval;\n";
299 ret += "\ttime_t next_refresh;\n";
300 ret += "\ttime_t last_mtime;\n";
301 ret += "\tchar *filename;\n";
302 ret += "} "+generate_watchlist_name(node_name)+" = { NULL, 0, 0, "+std::to_string(refresh_interval)+", 0, 0, NULL};\n\n";
307 string generate_watchlist_load(string node_name, table_def *tbl, vector<string> keys){
309 string tgt = generate_watchlist_name(node_name);
310 vector<field_entry *> fields = tbl->get_fields();
312 ret += "void reload_watchlist__"+node_name+"(){\n";
313 ret += "\tgs_uint32_t i;\n";
314 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *ptr = NULL;\n";
315 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *next = NULL;\n";
316 ret += "\tFILE *fl;\n";
317 ret += "\tchar buf[10000];\n";
318 ret += "\tgs_uint32_t buflen = 10000;\n";
319 ret += "\tchar *flds["+std::to_string(fields.size())+"];\n";
320 ret += "\tgs_uint32_t pos, f, linelen, malformed;\n";
321 ret += "\tgs_uint32_t n_malformed, short_lines, toolong_lines, ok;\n";
322 ret += "\tgs_uint64_t hash, bucket;\n";
323 ret += "\tstruct "+generate_watchlist_element_name(node_name)+" *rec;\n\n";
325 ret += "// make sure watchlist file has changed since the last time we loaded it\n";
326 ret += "\tstruct stat file_stat;\n";
327 ret += "\tint err = stat(" + tgt + ".filename, &file_stat);\n";
328 ret += "\tif (err) {\n";
329 ret += "\t\tgslog(LOG_INFO,\"Warning, unable to stat() watchlist file %s to reload " + node_name + ", continue using old version\\n\", " + tgt + ".filename);\n";
330 ret += "\t\treturn;\n";
332 ret += "\tif (file_stat.st_mtime <= " + tgt + ".last_mtime && file_stat.st_ctime <= " + tgt + ".last_mtime) // watchlist file hasn't changed since last time\n";
333 ret += "\t\treturn;\n";
334 ret += "\t" + tgt + ".last_mtime = (file_stat.st_mtime>file_stat.st_ctime)?file_stat.st_mtime:file_stat.st_ctime;\n\n";
336 ret += "// Delete old entries.\n";
337 ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
338 ret += "\t\tptr="+tgt+".ht[i];\n";
339 ret += "\t\twhile(ptr!=NULL){\n";
340 for(int f=0;f<fields.size();++f){
341 data_type dt(fields[f]->get_type());
342 if(dt.is_buffer_type()){
343 ret += "\t\t\t"+dt.get_buffer_destroy()+"(&(ptr->"+fields[f]->get_name()+"));\n";
346 ret += "\t\t\tnext = ptr->next;\n";
347 ret += "\t\t\tfree(ptr);\n";
348 ret += "\t\t\tptr = next;\n";
351 ret += "\n// prepare new table. \n";
352 ret += "\tif("+tgt+".n_elem > "+tgt+".ht_size || "+tgt+".ht_size==0){\n";
353 ret += "\t\tif("+tgt+".ht)\n";
354 ret += "\t\t\tfree("+tgt+".ht);\n";
355 ret += "\t\tif("+tgt+".ht_size == 0)\n";
356 ret += "\t\t\t"+tgt+".ht_size = 100000;\n";
358 ret += "\t\t\t"+tgt+".ht_size = "+tgt+".n_elem;\n";
359 ret += "\t\t"+tgt+".ht = (struct "+generate_watchlist_element_name(node_name)+" **)malloc("+tgt+".ht_size * sizeof(struct "+generate_watchlist_element_name(node_name)+" *));\n";
361 ret += "\tfor(i=0;i<"+tgt+".ht_size;++i){\n";
362 ret += "\t\t"+tgt+".ht[i] = NULL;\n";
364 ret += "\n// load new table\n";
365 ret += "\t"+tgt+".n_elem = 0;\n";
366 ret += "\tfl = fopen("+tgt+".filename, \"r\");\n";
367 ret += "\tif(fl==NULL){\n";
368 ret += "\t\tgslog(LOG_INFO,\"Warning, can't open file %s for watchlist "+node_name+"\\n\","+tgt+".filename);\n";
369 ret += "\t\treturn;\n";
371 ret += "\tmalformed = 0;\n";
372 ret += "\tshort_lines = 0;\n";
373 ret += "\ttoolong_lines = 0;\n";
374 ret += "\twhile(fgets(buf, buflen, fl) != NULL){\n";
375 ret += "\t\tlinelen = strlen(buf);\n";
376 ret += "\t\tbuf[linelen-1]='\\0'; // strip off trailing newline\n";
377 ret += "\t\tlinelen--;\n";
379 ret += "\t\tpos=0;\n";
380 ret += "\t\tmalformed=0;\n";
381 ret += "\t\tok=1;\n";
382 ret += "\t\tflds[0] = buf;\n";
383 ret += "\t\tfor(f=1;pos < linelen && f<"+std::to_string(fields.size())+";++f){\n";
384 ret += "\t\t\tfor(;pos < linelen && buf[pos]!=',' && buf[pos]!='\\n';++pos);\n";
385 ret += "\t\t\tif(pos >= linelen){\n";
386 ret += "\t\t\t\tmalformed = 1;\n";
387 ret += "\t\t\t\tbreak;\n";
389 ret += "\t\t\tbuf[pos]='\\0';\n";
390 ret += "\t\t\tpos++;\n";
391 ret += "\t\t\tflds[f]=buf+pos;\n";
393 ret += "\t\tif(malformed){\n";
394 ret += "\t\t\tok=0;\n";
395 ret += "\t\t\tn_malformed++;\n";
397 ret += "\t\tif(f<"+std::to_string(fields.size())+"){\n";
398 ret += "\t\t\tok=0;\n";
399 ret += "\t\t\tshort_lines++;\n";
401 ret += "\t\tif(pos && (pos<linelen)){\n";
402 ret += "\t\t\tok=0;\n";
403 ret += "\t\t\ttoolong_lines++;\n";
405 ret += "\t\tif(f>="+std::to_string(fields.size())+"){\n";
406 ret += "\t\t\trec = (struct "+generate_watchlist_element_name(node_name)+" *)malloc(sizeof(struct "+generate_watchlist_element_name(node_name)+"));\n";
408 for(int f=0;f<fields.size();++f){
409 data_type dt(fields[f]->get_type());
410 ret += "\t\t\t"+dt.get_wl_extract_fcn()+"(flds["+std::to_string(f)+"], &(rec->"+fields[f]->get_name()+"));\n";
412 // Compute the hash value
413 ret += "\t\t\thash=0;\n";
414 for(int k=0;k<keys.size();++k){
415 string key_fld = keys[k];
417 for(f=0;f<fields.size();++f){
418 if(fields[f]->get_name() == key_fld)
421 data_type dt(fields[f]->get_type());
424 "\t\t\thash ^= (("+hash_nums[f%NRANDS]+" * lfta_"+
425 dt.get_type_str()+"_to_hash(rec->"+fields[f]->get_name()+")));\n";
427 ret += "\t\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
428 ret += "\t\t\trec->hashval = hash;\n";
429 ret += "\t\t\trec->next = "+tgt+".ht[bucket];\n";
430 ret += "\t\t\t"+tgt+".ht[bucket] = rec;\n";
431 ret += "\t\t\t"+tgt+".n_elem++;\n";
435 ret += "\tif(n_malformed+toolong_lines > 0){\n";
436 ret += "\t\tgslog(LOG_INFO,\"Errors reading data for watchlist "+node_name+" from file %s: malformed=%d, too short=%d, too long=%d\\n\","+tgt+".filename, malformed, short_lines, toolong_lines);\n";
446 string generate_fta_struct(string node_name, gb_table *gb_tbl,
447 aggregate_table *aggr_tbl, param_table *param_tbl,
448 cplx_lit_table *complex_literals,
449 vector<handle_param_tbl_entry *> ¶m_handle_table,
450 bool is_aggr_query, bool is_fj, bool is_wj, bool uses_bloom,
453 string ret = "struct " + generate_fta_name(node_name) + "{\n";
454 ret += "\tstruct FTA f;\n";
456 //-------------------------------------------------------------
457 // Aggregate-specific fields
461 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
463 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
464 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
465 // ret+="\tint bitmap_size;\n";
466 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
467 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
468 ret += "\tint max_windows; // max number of open windows.\n";
469 ret += "\tunsigned int generation; // initially zero, increment on\n";
470 ret += "\t // every hash table flush - whether regular or induced.\n";
471 ret += "\t // Old groups are identified by a generation mismatch.\n";
472 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
473 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
478 bool uses_temporal_flush = false;
479 for(g=0;g<gb_tbl->size();g++){
480 data_type *dt = gb_tbl->get_data_type(g);
481 if(dt->is_temporal()){
483 fprintf(stderr,"group by attribute %s is temporal, ",
484 gb_tbl->get_name(g).c_str());
485 if(dt->is_increasing()){
486 fprintf(stderr,"increasing.\n");
488 fprintf(stderr,"decreasing.\n");
491 data_type *gdt = gb_tbl->get_data_type(g);
492 if(gdt->is_buffer_type()){
493 fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
495 sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
497 sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
499 sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
501 uses_temporal_flush = true;
507 if(! uses_temporal_flush){
508 fprintf(stderr,"Warning: no temporal flush.\n");
512 // ---------------------------------------------------------
513 // Filter-join specific fields
518 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
519 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
520 "\tint first_exec;\n"
521 "\tlong long int last_bin;\n"
522 "\tint last_bloom_pos;\n"
525 }else{ // limited hash table
527 " struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
534 // --------------------------------------------
535 // watchlist-join specific
537 ret += "\ttime_t ux_time;\n";
540 //--------------------------------------------------------
543 // Create places to hold the parameters.
545 vector<string> param_vec = param_tbl->get_param_names();
546 for(p=0;p<param_vec.size();p++){
547 data_type *dt = param_tbl->get_data_type(param_vec[p]);
548 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
549 param_vec[p].c_str());
551 if(param_tbl->handle_access(param_vec[p])){
552 ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
556 // Create places to hold complex literals.
558 for(cl=0;cl<complex_literals->size();cl++){
559 literal_t *l = complex_literals->get_literal(cl);
560 data_type *dtl = new data_type( l->get_type() );
561 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
565 // Create places to hold the pass-by-handle parameters.
566 for(p=0;p<param_handle_table.size();++p){
567 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
571 // Create places to hold the last values of temporal
572 // attributes referenced in select clause
573 // we also need to store values of the temoral attributed
574 // of last flushed tuple in aggr queries
575 // to make sure we generate the cirrect temporal tuple
576 // in the presense of slow flushes
579 col_id_set temp_cids; // col ids of temp attributes in select clause
582 col_id_set::iterator csi;
584 for(s=0;s<sl_list.size();s++){
585 data_type *sdt = sl_list[s]->get_data_type();
586 if (sdt->is_temporal()) {
587 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
591 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
592 int tblref = (*csi).tblvar_ref;
593 int schref = (*csi).schema_ref;
594 string field = (*csi).field;
595 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
596 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
600 ret += "\tgs_uint64_t trace_id;\n\n";
602 // Fields to store the runtime stats
604 ret += "\tgs_uint32_t in_tuple_cnt;\n";
605 ret += "\tgs_uint32_t out_tuple_cnt;\n";
606 ret += "\tgs_uint32_t out_tuple_sz;\n";
607 ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
608 ret += "\tgs_uint64_t cycle_cnt;\n";
609 ret += "\tgs_uint32_t collision_cnt;\n";
610 ret += "\tgs_uint32_t eviction_cnt;\n";
611 ret += "\tgs_float_t sampling_rate;\n";
620 //------------------------------------------------------------
621 // Set colref tblvars to 0..
622 // (special processing for join-like operators in an lfta).
624 void reset_se_col_ids_tblvars(scalarexp_t *se, gb_table *gtbl){
625 vector<scalarexp_t *> operands;
631 switch(se->get_operator_type()){
637 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
640 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
641 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
645 se->get_colref()->set_tablevar_ref(0);
648 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
651 reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
657 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
660 operands = se->get_operands();
661 for(o=0;o<operands.size();o++){
662 reset_se_col_ids_tblvars(operands[o], gtbl);
666 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
667 se->get_lineno(), se->get_charno(),se->get_operator_type());
673 // reset column tblvars accessed in this pr.
675 void reset_pr_col_ids_tblvars(predicate_t *pr, gb_table *gtbl){
676 vector<scalarexp_t *> op_list;
679 switch(pr->get_operator_type()){
681 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
684 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
685 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
688 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
691 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
692 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
695 op_list = pr->get_op_list();
696 for(o=0;o<op_list.size();++o){
697 reset_se_col_ids_tblvars(op_list[o],gtbl) ;
701 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
702 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
709 // Generate code that makes reference
710 // to the tuple, and not to any aggregates.
711 static string generate_se_code(scalarexp_t *se,table_list *schema){
713 data_type *ldt, *rdt;
715 vector<scalarexp_t *> operands;
718 switch(se->get_operator_type()){
720 if(se->is_handle_ref()){
721 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
725 if(se->get_literal()->is_cpx_lit()){
726 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
730 return(se->get_literal()->to_C_code("")); // not complex, no constructor
732 if(se->is_handle_ref()){
733 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
738 ret += se->get_param_name();
741 ldt = se->get_left_se()->get_data_type();
742 if(ldt->complex_operator(se->get_op()) ){
743 ret += ldt->get_complex_operator(se->get_op());
745 ret += generate_se_code(se->get_left_se(),schema);
750 ret += generate_se_code(se->get_left_se(),schema);
755 ldt = se->get_left_se()->get_data_type();
756 rdt = se->get_right_se()->get_data_type();
758 if(ldt->complex_operator(rdt, se->get_op()) ){
759 ret += ldt->get_complex_operator(rdt, se->get_op());
761 ret += generate_se_code(se->get_left_se(),schema);
763 ret += generate_se_code(se->get_right_se(),schema);
767 ret += generate_se_code(se->get_left_se(),schema);
769 ret += generate_se_code(se->get_right_se(),schema);
774 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet unpacked ...
775 // so return the defining code.
776 ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
779 sprintf(tmpstr,"unpack_var_%s_%d",
780 se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
785 // Should not be ref'ing any aggr here.
786 if(se->get_aggr_ref() >= 0){
787 fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
788 return("ERROR in generate_se_code");
791 if(se->is_partial()){
792 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
796 operands = se->get_operands();
797 for(o=0;o<operands.size();o++){
799 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
801 ret += generate_se_code(operands[o], schema);
807 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
808 se->get_lineno(), se->get_charno(),se->get_operator_type());
809 return("ERROR in generate_se_code");
813 // generate code that refers only to aggregate data and constants.
814 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
817 data_type *ldt, *rdt;
819 vector<scalarexp_t *> operands;
822 switch(se->get_operator_type()){
824 if(se->is_handle_ref()){
825 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
829 if(se->get_literal()->is_cpx_lit()){
830 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
834 return(se->get_literal()->to_C_code("")); // not complex no constructor
836 if(se->is_handle_ref()){
837 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
842 ret += se->get_param_name();
845 ldt = se->get_left_se()->get_data_type();
846 if(ldt->complex_operator(se->get_op()) ){
847 ret += ldt->get_complex_operator(se->get_op());
849 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
854 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
859 ldt = se->get_left_se()->get_data_type();
860 rdt = se->get_right_se()->get_data_type();
862 if(ldt->complex_operator(rdt, se->get_op()) ){
863 ret += ldt->get_complex_operator(rdt, se->get_op());
865 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
867 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
871 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
873 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
878 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet
879 // unpacked ... so return the defining code.
880 sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
884 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
885 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
886 se->get_lineno(), se->get_charno());
892 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
897 if(se->get_aggr_ref() >= 0){
898 sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
903 if(se->is_partial()){
904 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
908 operands = se->get_operands();
909 for(o=0;o<operands.size();o++){
911 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
913 ret += generate_se_code_fm_aggr(operands[o], var, schema);
919 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
920 se->get_lineno(), se->get_charno(),se->get_operator_type());
921 return("ERROR in generate_se_code");
927 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
930 vector<scalarexp_t *> operands;
933 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
934 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
935 se->get_lineno(), se->get_charno());
936 return("ERROR in generate_se_code");
939 ret = "\tretval = " + se->get_op() + "( ";
940 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
943 operands = se->get_operands();
944 for(o=0;o<operands.size();o++){
946 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
948 ret += generate_se_code_fm_aggr(operands[o], var, schema);
955 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
958 vector<scalarexp_t *> operands;
960 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
961 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
962 se->get_lineno(), se->get_charno());
963 return("ERROR in generate_se_code");
966 ret = se->get_op() + "( ";
968 operands = se->get_operands();
969 for(o=0;o<operands.size();o++){
971 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
973 ret += generate_se_code(operands[o], schema);
982 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
985 vector<scalarexp_t *> operands;
988 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
989 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
990 se->get_lineno(), se->get_charno());
991 return("ERROR in generate_se_code");
994 ret = "\tretval = " + se->get_op() + "( ",
995 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
998 operands = se->get_operands();
999 for(o=0;o<operands.size();o++){
1001 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
1003 ret += generate_se_code(operands[o], schema);
1014 static string generate_C_comparison_op(string op){
1015 if(op == "=") return("==");
1016 if(op == "<>") return("!=");
1020 static string generate_C_boolean_op(string op){
1021 if( (op == "AND") || (op == "And") || (op == "and") ){
1024 if( (op == "OR") || (op == "Or") || (op == "or") ){
1027 if( (op == "NOT") || (op == "Not") || (op == "not") ){
1031 fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
1032 return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
1036 static string generate_predicate_code(predicate_t *pr,table_list *schema){
1038 vector<literal_t *> litv;
1040 data_type *ldt, *rdt;
1041 vector<scalarexp_t *> op_list;
1043 unsigned int bitmask;
1045 switch(pr->get_operator_type()){
1047 ldt = pr->get_left_se()->get_data_type();
1050 litv = pr->get_lit_vec();
1051 for(i=0;i<litv.size();i++){
1052 if(i>0) ret += " || ";
1055 if(ldt->complex_comparison(ldt) ){
1056 ret += ldt->get_equals_fcn(ldt) ;
1058 if(ldt->is_buffer_type() ) ret += "&";
1059 ret += generate_se_code(pr->get_left_se(), schema);
1061 if(ldt->is_buffer_type() ) ret += "&";
1062 if(litv[i]->is_cpx_lit()){
1063 sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
1066 ret += litv[i]->to_C_code("");
1070 ret += generate_se_code(pr->get_left_se(), schema);
1072 ret += litv[i]->to_C_code("");
1081 ldt = pr->get_left_se()->get_data_type();
1082 rdt = pr->get_right_se()->get_data_type();
1085 if(ldt->complex_comparison(rdt) ){
1086 // TODO can use get_equals_fcn if op is "=" ?
1087 ret += ldt->get_comparison_fcn(rdt);
1089 if(ldt->is_buffer_type() ) ret += "&";
1090 ret += generate_se_code(pr->get_left_se(),schema);
1092 if(rdt->is_buffer_type() ) ret += "&";
1093 ret += generate_se_code(pr->get_right_se(),schema);
1095 ret += generate_C_comparison_op(pr->get_op());
1098 ret += generate_se_code(pr->get_left_se(),schema);
1099 ret += generate_C_comparison_op(pr->get_op());
1100 ret += generate_se_code(pr->get_right_se(),schema);
1106 ret += generate_C_boolean_op(pr->get_op());
1107 ret += generate_predicate_code(pr->get_left_pr(),schema);
1110 case PRED_BINARY_OP:
1112 ret += generate_predicate_code(pr->get_left_pr(),schema);
1113 ret += generate_C_boolean_op(pr->get_op());
1114 ret += generate_predicate_code(pr->get_right_pr(),schema);
1118 op_list = pr->get_op_list();
1119 cref = pr->get_combinable_ref();
1120 if(cref >= 0){ // predicate is a combinable pred reference
1121 // Trust, but verify
1122 if(pred_class.size() >= cref && pred_class[cref] >= 0){
1123 ppos = pred_pos[cref];
1124 bitmask = 1 << ppos % 32;
1125 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
1131 ret = pr->get_op() + "(";
1132 if (pr->is_sampling_fcn) {
1133 ret += "t->sampling_rate";
1134 if (!op_list.empty())
1137 for(o=0;o<op_list.size();++o){
1138 if(o>0) ret += ", ";
1139 if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
1141 ret += generate_se_code(op_list[o],schema);
1146 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
1147 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
1148 return("ERROR in generate_predicate_code");
1153 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
1156 if(dt->complex_comparison(dt) ){
1157 ret += dt->get_equals_fcn(dt);
1159 if(dt->is_buffer_type() ) ret += "&";
1162 if(dt->is_buffer_type() ) ret += "&";
1174 //static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
1177 // if(dt->complex_comparison(dt) ){
1178 // ret += dt->get_equals_fcn(dt);
1180 // if(dt->is_buffer_type() ) ret += "&";
1183 // if(dt->is_buffer_type() ) ret += "&";
1195 // Here I assume that only MIN and MAX aggregates can be computed
1196 // over BUFFER data types.
1198 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
1199 string retval = "\t\t";
1200 string op = atbl->get_op(aidx);
1203 if(! atbl->is_builtin(aidx)) {
1205 retval += op+"_LFTA_AGGR_UPDATE_(";
1206 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1207 retval+="("+var+")";
1208 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1209 for(o=0;o<opl.size();++o){
1211 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1213 retval += generate_se_code(opl[o], schema);
1220 // Built-in aggregate processing.
1222 data_type *dt = atbl->get_data_type(aidx);
1226 retval.append("++;\n");
1231 retval.append(" += ");
1232 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1233 retval.append(";\n");
1237 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1238 retval.append(tmpstr);
1239 if(dt->complex_comparison(dt)){
1240 if(dt->is_buffer_type())
1241 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1243 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1245 sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1247 retval.append(tmpstr);
1248 if(dt->is_buffer_type()){
1249 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1251 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1253 retval.append(tmpstr);
1258 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1259 retval.append(tmpstr);
1260 if(dt->complex_comparison(dt)){
1261 if(dt->is_buffer_type())
1262 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1264 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1266 sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1268 retval.append(tmpstr);
1269 if(dt->is_buffer_type()){
1270 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1272 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1274 retval.append(tmpstr);
1279 if(op == "AND_AGGR"){
1281 retval.append(" &= ");
1282 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1283 retval.append(";\n");
1286 if(op == "OR_AGGR"){
1288 retval.append(" |= ");
1289 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1290 retval.append(";\n");
1293 if(op == "XOR_AGGR"){
1295 retval.append(" ^= ");
1296 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1297 retval.append(";\n");
1300 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1301 return("ERROR: aggregate not recognized: "+op);
1307 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1309 string op = atbl->get_op(aidx);
1312 if(! atbl->is_builtin(aidx)) {
1314 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1315 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1316 retval+="("+var+"));\n";
1318 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1319 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1320 retval+="("+var+")";
1321 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1322 for(o=0;o<opl.size();++o){
1324 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1326 retval += generate_se_code(opl[o],schema);
1332 // Built-in aggregate processing.
1335 data_type *dt = atbl->get_data_type(aidx);
1338 retval = "\t\t"+var;
1339 retval.append(" = 1;\n");
1343 if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1344 op == "OR_AGGR" || op == "XOR_AGGR"){
1345 if(dt->is_buffer_type()){
1346 sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1347 retval.append(tmpstr);
1348 sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1349 retval.append(tmpstr);
1351 retval = "\t\t"+var;
1353 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1354 retval.append(";\n");
1359 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1360 return("ERROR: aggregate not recognized: "+op);
1364 ////////////////////////////////////////////////////////////
1367 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1368 std::string &node_name, std::string &schema_embed_str){
1369 // Include these only once, not once per lfta
1370 // string ret = "#include \"rts.h\"\n";
1371 // ret += "#include \"fta.h\"\n\n");
1373 string ret = "#ifndef LFTA_IN_NIC\n";
1374 ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1375 ret += "#include<stdio.h>\n";
1376 ret += "#include <limits.h>\n";
1377 ret += "#include <float.h>\n";
1378 ret += "#include <sys/stat.h>\n";
1379 ret += "#include \"rdtsc.h\"\n";
1388 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1390 // need to create and output the tuple.
1391 string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1392 // Check for any UDAFs with LFTA_BAILOUT
1393 ret += "\tlfta_bailout = 0;\n";
1394 for(a=0;a<aggr_tbl->size();a++){
1395 if(aggr_tbl->has_bailout(a)){
1396 ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1397 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1398 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1401 ret += "\tif(! lfta_bailout){\n";
1403 // First, compute the size of the tuple.
1405 // Unpack UDAF return values
1406 for(a=0;a<aggr_tbl->size();a++){
1407 if(! aggr_tbl->is_builtin(a)){
1408 ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1409 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1410 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1416 // Unpack partial fcns ref'd by the select clause.
1417 if(sl_fcns_start != sl_fcns_end){
1418 ret += "\t\tunpack_failed = 0;\n";
1419 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1420 if(is_partial_fcn[p]){
1421 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1422 "t->aggr_table["+idx+"].",schema);
1423 ret += "\t\tif(retval) unpack_failed = 1;\n";
1426 // BEGIN don't allocate tuple if
1427 ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1430 // Unpack any BUFFER type selections into temporaries
1431 // so that I can compute their size and not have
1432 // to recompute their value during tuple packing.
1433 // I can use regular assignment here because
1434 // these temporaries are non-persistent.
1436 for(s=0;s<sl_list.size();s++){
1437 data_type *sdt = sl_list[s]->get_data_type();
1438 if(sdt->is_buffer_type()){
1439 sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1441 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1447 // The size of the tuple is the size of the tuple struct plus the
1448 // size of the buffers to be copied in.
1450 ret += "\t\t\ttuple_size = sizeof( struct ";
1451 ret += generate_tuple_name(node_name);
1453 for(s=0;s<sl_list.size();s++){
1454 data_type *sdt = sl_list[s]->get_data_type();
1455 if(sdt->is_buffer_type()){
1456 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1463 ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1464 ret += "\t\t\tif( tuple != NULL){\n";
1467 // Test passed, make assignments to the tuple.
1469 ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1470 ret += generate_tuple_name(node_name) ;
1473 // Mark tuple as REGULAR_TUPLE
1474 ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1476 for(s=0;s<sl_list.size();s++){
1477 data_type *sdt = sl_list[s]->get_data_type();
1478 if(sdt->is_buffer_type()){
1479 sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1481 sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1484 sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1486 // if(sdt->needs_hn_translation())
1487 // ret += sdt->hton_translation() +"( ";
1488 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1489 // if(sdt->needs_hn_translation())
1496 ret += "\t\t\t\tpost_tuple(tuple);\n";
1497 ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1498 ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1499 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1500 ret += "\t\t\t\t#endif\n\n";
1503 if(sl_fcns_start != sl_fcns_end) // END don't allocate tuple if
1504 ret += "\t\t}\n"; // unpack failed.
1507 // Need to release memory held by BUFFER types.
1510 for(g=0;g<gb_tbl->size();g++){
1511 data_type *gdt = gb_tbl->get_data_type(g);
1512 if(gdt->is_buffer_type()){
1513 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1517 for(a=0;a<aggr_tbl->size();a++){
1518 if(aggr_tbl->is_builtin(a)){
1519 data_type *adt = aggr_tbl->get_data_type(a);
1520 if(adt->is_buffer_type()){
1521 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1525 ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1526 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1527 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1531 ret += "\t\tt->n_aggrs--;\n";
1537 string generate_gb_match_test(string idx){
1539 string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") && IS_NEW(t->aggr_table_bitmap,"+idx+")";
1540 if(gb_tbl->size()>0){
1541 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1544 // Next, scan list for a match on the group-by attributes.
1545 string rhs_op, lhs_op;
1546 for(g=0;g<gb_tbl->size();g++){
1549 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1550 sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1551 ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1561 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1565 ret += "/*\t\tMatch found : update in place.\t*/\n";
1568 for(a=0;a<aggr_tbl->size();a++){
1569 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1570 ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1571 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1574 // garbage collect copied buffer type gb attrs.
1575 for(g=0;g<gb_tbl->size();g++){
1576 data_type *gdt = gb_tbl->get_data_type(g);
1577 if(gdt->is_buffer_type()){
1578 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1585 bool first_udaf = true;
1588 for(a=0;a<aggr_tbl->size();a++){
1589 if(! aggr_tbl->is_builtin(a)){
1590 if(! first_udaf)ret += " || ";
1591 else first_udaf = false;
1592 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1593 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1594 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1598 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1599 ret += generate_tuple_from_aggr(node_name,schema,idx);
1600 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1607 string generate_init_group( table_list *schema, string idx){
1609 string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1610 // Fill up the aggregate block.
1611 for(g=0;g<gb_tbl->size();g++){
1612 sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1615 for(a=0;a<aggr_tbl->size();a++){
1616 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1617 ret += generate_aggr_init(tmpstr, aggr_tbl,a, schema);
1619 ret+="\t\tt->n_aggrs++;\n";
1624 string generate_fta_flush(string node_name, table_list *schema,
1625 ext_fcn_list *Ext_fcns){
1628 string select_var_defs ;
1631 // Flush from previous epoch
1633 ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1635 ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1636 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1637 ret += "\tint i, lfta_bailout;\n";
1638 ret += "\tunsigned int gen_val;\n";
1640 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1641 ret += generate_fta_name(node_name)+" *) f;\n";
1646 // Variables needed to store selected attributes of BUFFER type
1647 // temporarily, in order to compute their size for storage
1648 // in an output tuple.
1650 select_var_defs = "";
1651 for(s=0;s<sl_list.size();s++){
1652 data_type *sdt = sl_list[s]->get_data_type();
1653 if(sdt->is_buffer_type()){
1654 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1655 select_var_defs.append(tmpstr);
1658 if(select_var_defs != ""){
1659 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1660 ret += select_var_defs;
1664 // Variables to store results of partial functions.
1665 if(sl_fcns_start != sl_fcns_end){
1666 ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1667 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1668 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1669 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1672 ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1675 // Variables for udaf output temporaries
1676 bool no_udaf = true;
1678 for(a=0;a<aggr_tbl->size();a++){
1679 if(! aggr_tbl->is_builtin(a)){
1681 ret+="/*\t\tUDAF output vars.\t*/\n";
1684 int afcn_id = aggr_tbl->get_fcn_id(a);
1685 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1686 sprintf(tmpstr,"udaf_ret%d", a);
1687 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1692 // ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1694 ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1695 ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1696 ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1699 for(g=0;g<gb_tbl->size();g++){
1700 data_type *gdt = gb_tbl->get_data_type(g);
1701 if(gdt->is_temporal()){
1702 if(first_g) first_g=false; else ret+=" || ";
1703 ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1707 ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1709 "#ifdef LFTA_STATS\n"
1710 "\t\t\tt->eviction_cnt++;\n"
1715 ret+=generate_tuple_from_aggr(node_name,schema,"i");
1717 // ret+="\t\t\tt->n_aggrs--;\n"; // done in generate_tuple_from_aggr
1718 ret+="\t\t\tnflush--;\n";
1721 ret+="\tt->flush_pos=i;\n";
1722 ret+="\tif(t->n_aggrs == 0) {\n";
1723 ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1726 ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1728 for(int g=0;g<gb_tbl->size();g++){
1729 data_type *dt = gb_tbl->get_data_type(g);
1730 if(dt->is_temporal()){
1731 data_type *gdt = gb_tbl->get_data_type(g);
1732 if(!gdt->is_buffer_type()){
1733 sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1738 ret += "\t}\n}\n\n";
1743 // TODO Remove sprintf to perform string catenation
1744 string generate_fta_load_params(string node_name){
1746 vector<string> param_names = param_tbl->get_param_names();
1748 string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1749 ret += " *t, int sz, void *value, int initial_call){\n";
1750 ret += "\tint pos=0;\n";
1751 ret += "\tint data_pos;\n";
1753 for(p=0;p<param_names.size();p++){
1754 data_type *dt = param_tbl->get_data_type(param_names[p]);
1755 if(dt->is_buffer_type()){
1756 sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1758 sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1765 ret += "\n\tdata_pos = ";
1766 for(p=0;p<param_names.size();p++){
1767 if(p>0) ret += " + ";
1768 data_type *dt = param_tbl->get_data_type(param_names[p]);
1770 ret += dt->get_tuple_cvar_type();
1774 ret += "\tif(data_pos > sz) return 1;\n\n";
1777 for(p=0;p<param_names.size();p++){
1778 data_type *dt = param_tbl->get_data_type(param_names[p]);
1779 if(dt->is_buffer_type()){
1780 sprintf(tmpstr,"\taccess_var_%s = *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1782 switch( dt->get_type() ){
1784 // ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n"; // ntoh conversion
1785 // ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n"; // ntoh conversion
1786 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1788 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1790 sprintf(tmpstr,"\ttmp_var_%s.length = access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1794 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1798 // First, destroy the old
1799 ret += "\tif(! initial_call)\n";
1800 sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1802 // Next, create the new.
1803 sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1806 // if(dt->needs_hn_translation()){
1807 // sprintf(tmpstr,"\tt->param_%s = %s( *( (%s *)( (char *)value+pos) ) );\n",
1808 // param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1810 sprintf(tmpstr,"\tt->param_%s = *( (%s *)( (char *)value+pos) );\n",
1811 param_names[p].c_str(), dt->get_cvar_type().c_str() );
1815 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1819 // Register the pass-by-handle parameters
1821 ret += "/* register and de-register the pass-by-handle parameters */\n";
1824 for(ph=0;ph<param_handle_table.size();++ph){
1825 data_type pdt(param_handle_table[ph]->type_name);
1826 switch(param_handle_table[ph]->val_type){
1832 ret += "\tif(! initial_call)\n";
1833 sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1834 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1836 sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1839 if(pdt.is_buffer_type()) ret += "&(";
1840 ret += "t->param_"+param_handle_table[ph]->param_name;
1841 if(pdt.is_buffer_type()) ret += ")";
1845 sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1846 fprintf(stderr,"%s\n",tmpstr);
1851 ret+="\treturn 0;\n";
1860 string generate_fta_free(string node_name, bool is_aggr_query){
1862 string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1863 ret+= "\tstruct "+generate_fta_name(node_name)+
1864 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1865 ret += "\tint i;\n";
1868 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1869 ret+="\t/* \t\tmark all groups as old */\n";
1870 ret+="\tt->generation++;\n";
1871 ret+="\tt->flush_pos = 0;\n";
1872 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1875 // Deregister the pass-by-handle parameters
1876 ret += "/* de-register the pass-by-handle parameters */\n";
1878 for(ph=0;ph<param_handle_table.size();++ph){
1879 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1880 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1885 ret += "\treturn 0;\n}\n\n";
1890 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1891 string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f, gs_int32_t command, gs_int32_t sz, void *value){\n";
1892 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1893 ret += generate_fta_name(node_name)+" *) f;\n\n";
1897 ret += "\t/* temp status tuple */\n";
1898 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1899 ret += "\tgs_int32_t tuple_size;\n";
1903 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1905 ret+="\t\tif (!t->n_aggrs) {\n";
1906 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1907 ret+="\t\t\tif( tuple != NULL)\n";
1908 ret+="\t\t\t\tpost_tuple(tuple);\n";
1910 ret+="\t\t}else{\n";
1912 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1913 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1914 ret +="\t\tt->generation++;\n";
1915 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1916 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1917 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1918 ret+="\t\t\tt->flush_pos = 0;\n";
1919 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1924 if(param_tbl->size() > 0){
1926 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1927 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1928 "#ifndef LFTA_IN_NIC\n"
1929 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1936 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1937 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1941 ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1944 ret+="\t\tif (t->n_aggrs) {\n";
1945 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1946 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1947 ret +="\t\tt->generation++;\n";
1948 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1949 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1950 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1951 ret+="\t\t\tt->flush_pos = 0;\n";
1952 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1956 ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1957 ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1958 ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1960 /* mark tuple as EOF_TUPLE */
1961 ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1962 ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1963 ret += "\t\tpost_tuple(tuple);\n";
1966 ret += "\treturn 0;\n}\n\n";
1971 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query, bool advance_uxtime){
1972 string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1973 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1974 ret += generate_fta_name(node_name)+" *) f;\n\n";
1976 ret += "\t/* Create a temp status tuple */\n";
1977 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1978 ret += "\tgs_int32_t tuple_size;\n";
1979 ret += "\tunsigned int i;\n";
1980 ret += "\ttime_t cur_time;\n";
1981 ret += "\tint time_advanced;\n";
1982 ret += "\tstruct fta_stat stats;\n";
1986 /* copy the last seen values of temporal attributes */
1987 col_id_set temp_cids; // col ids of temp attributes in select clause
1990 /* HACK: in order to reuse the SE generation code, we need to copy
1991 * the last values of the temp attributes into new variables
1992 * which have names unpack_var_XXX_XXX
1996 col_id_set::iterator csi;
1998 for(s=0;s<sl_list.size();s++){
1999 data_type *sdt = sl_list[s]->get_data_type();
2000 if (sdt->is_temporal()) {
2001 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2005 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2006 int tblref = (*csi).tblvar_ref;
2007 int schref = (*csi).schema_ref;
2008 string field = (*csi).field;
2009 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2010 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
2014 if (is_aggr_query) {
2015 for(g=0;g<gb_tbl->size();g++){
2016 data_type *gdt = gb_tbl->get_data_type(g);
2017 if(gdt->is_temporal()){
2018 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2020 data_type *gdt = gb_tbl->get_data_type(g);
2021 if(gdt->is_buffer_type()){
2022 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
2030 ret += "\ttime_advanced = 0;\n";
2032 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2033 int tblref = (*csi).tblvar_ref;
2034 int schref = (*csi).schema_ref;
2035 string field = (*csi).field;
2036 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2038 // update last seen value with the value seen
2039 ret += "\t#ifdef PREFILTER_DEFINED\n";
2040 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
2041 field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
2043 ret += "\t\ttime_advanced = 1;\n\t}\n";
2044 ret += "\t#endif\n";
2046 // we need to pay special attention to time fields
2047 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
2048 ret += "\tcur_time = time(&cur_time);\n";
2050 if (field == "time") {
2051 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
2054 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
2055 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2056 } else if (field == "timestamp_ms") {
2057 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
2060 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
2061 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2063 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
2064 field.c_str(), tblref, time_corr);
2066 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
2067 field.c_str(), tblref, field.c_str(), tblref, time_corr);
2071 ret += "\t\ttime_advanced = 1;\n";
2074 sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
2075 field.c_str(), tblref, field.c_str(), tblref);
2078 sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
2079 field.c_str(), tblref, field.c_str(), tblref);
2086 ret += "\tt->ux_time = time(&(t->ux_time));\n";
2089 // for aggregation lftas we need to check if the time was advanced beyond the current epoch
2090 if (is_aggr_query) {
2093 bool first_one = true;
2094 for(g=0;g<gb_tbl->size();g++){
2095 data_type *gdt = gb_tbl->get_data_type(g);
2096 if(gdt->is_temporal()){
2097 // To perform the test, first need to compute the value
2098 // of the temporal gb attrs.
2099 if(gdt->is_buffer_type()){
2100 // NOTE : if the SE defining the gb is anything
2101 // other than a ref to a variable, this will generate
2102 // illegal code. To be resolved with Spatch.
2103 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2104 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2106 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2107 gdt->get_buffer_assign_copy().c_str(), g, g);
2109 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2113 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2114 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2115 if(first_one){first_one = false;} else {change_test.append(") && (");}
2116 change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
2120 ret += "\n\tif( time_advanced && !( (";
2124 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2125 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
2126 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2128 ret += "\t\t/* \t\tmark all groups as old */\n";
2129 ret +="\t\tt->generation++;\n";
2130 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
2131 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
2132 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
2133 ret += "\t\tt->flush_pos = 0;\n";
2135 for(g=0;g<gb_tbl->size();g++){
2136 data_type *gdt = gb_tbl->get_data_type(g);
2137 if(gdt->is_temporal()){
2138 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
2139 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
2146 ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
2147 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2148 ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
2151 for(s=0;s<sl_list.size();s++){
2152 data_type *sdt = sl_list[s]->get_data_type();
2153 if(sdt->is_temporal()){
2155 if (sl_list[s]->is_gb()) {
2156 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
2160 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2162 // if(sdt->needs_hn_translation())
2163 // ret += sdt->hton_translation() +"( ";
2164 if (sl_list[s]->is_gb()) {
2165 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
2168 ret += generate_se_code(sl_list[s],schema);
2170 // if(sdt->needs_hn_translation())
2176 /* mark tuple as temporal */
2177 ret += "\n\t/* Mark tuple as temporal */\n";
2178 ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
2180 ret += "\n\t/* Copy trace id */\n";
2181 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
2183 ret += "\n\t/* Populate runtime stats */\n";
2184 ret += "\tstats.ftaid = f->ftaid;\n";
2185 ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
2186 ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
2187 ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
2188 ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
2189 ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
2190 ret += "\tstats.collision_cnt = t->collision_cnt;\n";
2191 ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
2192 ret += "\tstats.sampling_rate = t->sampling_rate;\n";
2194 ret += "\n#ifdef LFTA_PROFILE\n";
2195 ret += "\n\t/* Print stats */\n";
2196 ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
2197 ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
2198 ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
2199 ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
2200 ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
2201 ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
2202 ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
2203 ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
2204 ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
2205 ret += "\n#endif\n";
2208 ret += "\n\t/* Copy stats */\n";
2209 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2210 ret+="\tpost_tuple(tuple);\n";
2212 ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2213 ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2215 ret += "\n\t/* Reset runtime stats */\n";
2216 ret += "\tt->in_tuple_cnt = 0;\n";
2217 ret += "\tt->out_tuple_cnt = 0;\n";
2218 ret += "\tt->out_tuple_sz = 0;\n";
2219 ret += "\tt->accepted_tuple_cnt = 0;\n";
2220 ret += "\tt->cycle_cnt = 0;\n";
2221 ret += "\tt->collision_cnt = 0;\n";
2222 ret += "\tt->eviction_cnt = 0;\n";
2224 ret += "\treturn 0;\n}\n\n";
2230 // accept processing before the where clause,
2231 // do flush processwing.
2232 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema, col_id_set &unpacked_cids, string &temporal_flush){
2236 string ret="\n/*\tslow flush\t*/\n";
2237 string slow_flush_str = fs->get_val_of_def("slow_flush");
2238 int n_slow_flush = atoi(slow_flush_str.c_str());
2239 if(n_slow_flush <= 0) n_slow_flush = 2;
2240 if(n_slow_flush > 1){
2241 ret += "\tt->flush_ctr++;\n";
2242 ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2243 ret += "\t\tt->flush_ctr = 0;\n";
2244 ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2247 ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2252 bool first_one = true;
2254 col_id_set flush_cids; // col ids accessed when computing flush variables.
2255 // unpack them at temporal flush test time.
2256 temporal_flush = "";
2259 for(g=0;g<gb_tbl->size();g++){
2260 data_type *gdt = gb_tbl->get_data_type(g);
2261 if(gdt->is_temporal()){
2262 gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2264 // To perform the test, first need to compute the value
2265 // of the temporal gb attrs.
2266 if(gdt->is_buffer_type()){
2267 // NOTE : if the SE defining the gb is anything
2268 // other than a ref to a variable, this will generate
2269 // illegal code. To be resolved with Spatch.
2270 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2271 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2272 temporal_flush += tmpstr;
2273 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2274 gdt->get_buffer_assign_copy().c_str(), g, g);
2276 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2278 temporal_flush += tmpstr;
2279 // END computing the value of the temporal GB attr.
2282 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2283 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2284 if(first_one){first_one = false;} else {change_test.append(") && (");}
2285 change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2288 if(!first_one){ // will be false iff. there is a temporal GB attribute
2289 temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2290 temporal_flush += "\tif( !( (";
2291 temporal_flush += change_test;
2292 temporal_flush += ") ) ){\n";
2294 // temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2295 temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2296 temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2297 temporal_flush+="\t\t}\n";
2298 temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2299 temporal_flush+="\t\tt->generation++;\n";
2300 temporal_flush+="\t\tt->flush_pos = 0;\n";
2303 // Now set the saved temporal value of the gb to the
2304 // current value of the gb. Only for simple types,
2305 // not for buffer types -- but the strings are not
2306 // temporal in any case.
2308 for(g=0;g<gb_tbl->size();g++){
2309 data_type *gdt = gb_tbl->get_data_type(g);
2310 if(gdt->is_temporal()){
2311 if(gdt->is_buffer_type()){
2313 fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2315 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2316 temporal_flush += tmpstr;
2317 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2318 temporal_flush += tmpstr;
2322 temporal_flush += "\t}\n\n";
2325 // Unpack all the temporal attributes referenced in select clause
2326 // and update the last value of the attribute
2327 col_id_set temp_cids; // col ids of temp attributes in select clause
2328 col_id_set::iterator csi;
2330 for(s=0;s<sl_list.size();s++){
2331 data_type *sdt = sl_list[s]->get_data_type();
2332 if (sdt->is_temporal()) {
2333 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2337 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2338 if(unpacked_cids.count((*csi)) == 0){
2339 int tblref = (*csi).tblvar_ref;
2340 int schref = (*csi).schema_ref;
2341 string field = (*csi).field;
2342 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2344 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2345 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2346 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2348 ret += "\tif(retval) return 1;\n";
2350 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2353 unpacked_cids.insert( (*csi) );
2358 // Do the flush here if this is a real_time query
2359 string rt_level = fs->get_val_of_def("real_time");
2360 if(rt_level != "" && temporal_flush != ""){
2361 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2362 if(unpacked_cids.count((*csi)) == 0){
2363 int tblref = (*csi).tblvar_ref;
2364 int schref = (*csi).schema_ref;
2365 string field = (*csi).field;
2366 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2368 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2369 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2371 ret += "\tif(retval) return 1;\n";
2373 unpacked_cids.insert( (*csi) );
2376 ret += temporal_flush;
2382 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2387 /////////////// Processing for filter-only query
2389 // test passed : create the tuple, then assign to it.
2390 ret += "/*\t\tCreate and post the tuple\t*/\n";
2392 // Unpack partial fcns ref'd by the select clause.
2393 // Its a kind of a WHERE clause ...
2394 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2395 if(fcn_ref_cnt[p] > 1){
2396 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2398 if(is_partial_fcn[p]){
2399 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2400 ret += "\tif(retval) goto end;\n";
2402 if(fcn_ref_cnt[p] > 1){
2403 if(!is_partial_fcn[p]){
2404 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2406 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2411 // increment the counter of accepted tuples
2412 ret += "\n\t#ifdef LFTA_STATS\n";
2413 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2414 ret += "\t#endif\n\n";
2416 // First, compute the size of the tuple.
2418 // Unpack any BUFFER type selections into temporaries
2419 // so that I can compute their size and not have
2420 // to recompute their value during tuple packing.
2421 // I can use regular assignment here because
2422 // these temporaries are non-persistent.
2424 for(s=0;s<sl_list.size();s++){
2425 data_type *sdt = sl_list[s]->get_data_type();
2426 if(sdt->is_buffer_type()){
2427 sprintf(tmpstr,"\tselvar_%d = ",s);
2429 ret += generate_se_code(sl_list[s],schema);
2435 // The size of the tuple is the size of the tuple struct plus the
2436 // size of the buffers to be copied in.
2438 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2439 for(s=0;s<sl_list.size();s++){
2440 data_type *sdt = sl_list[s]->get_data_type();
2441 if(sdt->is_buffer_type()){
2442 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2449 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2450 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2452 // Test passed, make assignments to the tuple.
2454 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2456 // Mark tuple as REGULAR_TUPLE
2457 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2460 for(s=0;s<sl_list.size();s++){
2461 data_type *sdt = sl_list[s]->get_data_type();
2462 if(sdt->is_buffer_type()){
2463 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2465 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2468 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2470 // if(sdt->needs_hn_translation())
2471 // ret += sdt->hton_translation() +"( ";
2472 ret += generate_se_code(sl_list[s],schema);
2473 // if(sdt->needs_hn_translation())
2481 ret += "\tpost_tuple(tuple);\n";
2483 // Increment the counter of posted tuples
2484 ret += "\n\t#ifdef LFTA_STATS\n";
2485 ret += "\tt->out_tuple_cnt++;\n";
2486 ret+="\tt->out_tuple_sz+=tuple_size;\n";
2487 ret += "\t#endif\n\n";
2494 // TODO Ensure that postfilter predicates are being generated
2495 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2501 unsigned int window_len = fs->temporal_range;
2502 unsigned int n_bloom = 11;
2503 string n_bloom_str = fs->get_val_of_def("num_bloom");
2504 int tmp_n_bloom = atoi(n_bloom_str.c_str());
2506 n_bloom = tmp_n_bloom+1;
2507 float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2508 sprintf(tmpstr,"%f",bloom_width);
2509 string bloom_width_str = tmpstr;
2511 if(window_len < n_bloom){
2512 n_bloom = window_len+1;
2513 bloom_width_str = "1";
2517 // Grab the current window time
2518 scalarexp_t winvar(fs->temporal_var);
2519 ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2521 int bf_exp_size = 12; // base-2 log of number of bits
2522 string bloom_len_str = fs->get_val_of_def("bloom_size");
2523 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2524 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2525 bf_exp_size = tmp_bf_exp_size;
2527 int bf_bit_size = 1 << bf_exp_size;
2528 int bf_byte_size = bf_bit_size / (8*sizeof(char));
2530 unsigned int ht_size = 4096;
2531 string ht_size_s = fs->get_val_of_def("aggregate_slots");
2532 int tmp_ht_size = atoi(ht_size_s.c_str());
2533 if(tmp_ht_size > 1024){
2534 unsigned int hs = 1; // make it power of 2
2537 tmp_ht_size = tmp_ht_size >> 1;
2544 for(i=0;i<bf_exp_size;i++)
2545 bf_mask = (bf_mask << 1) | 1;
2547 for(i=ht_size;i>1;i=i>>1)
2548 bf_mask = (bf_mask << 1) | 1;
2552 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2555 bloom_width_str.c_str(),
2567 // If this is a bloom-filter fj, first test if the
2568 // bloom filter needs to be advanced.
2569 // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2570 // t->bf_size : number of bits in bloom filter
2572 // TODO: Don't iterate more than n_bloom times!
2573 // As written, its possible to wrap around many times.
2576 "// Clean out old bloom filters if needed.\n"
2577 "// TODO vectorize this ? \n"
2578 " if(t->first_exec){\n"
2579 " t->first_exec = 0;\n"
2580 " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2581 " t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2583 " curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2584 " if(curr_bin != t->last_bin){\n"
2585 " for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2586 " t->last_bloom_pos++;\n"
2587 " if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2588 " t->last_bloom_pos = 0;\n"
2589 " tmp_i = t->last_bloom_pos;\n"
2590 " for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2591 " SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2595 " t->last_bin = curr_bin;\n"
2601 //-----------------------------------------------------------------
2602 // First, determine whether to do S (filter stream) processing.
2605 "// S (filtering stream) predicate, should it be processed?\n"
2608 // Sort S preds based on cost.
2609 vector<cnf_elem *> s_filt = fs->pred_t1;
2610 col_id_set::iterator csi;
2611 if(s_filt.size() > 0){
2613 // Unpack fields ref'd in the S pred
2614 for(w=0;w<s_filt.size();++w){
2615 col_id_set this_pred_cids;
2616 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2617 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2618 if(unpacked_cids.count( (*csi) ) == 0){
2619 int tblref = (*csi).tblvar_ref;
2620 int schref = (*csi).schema_ref;
2621 string field = (*csi).field;
2622 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2623 unpacked_cids.insert( (*csi) );
2629 // Sort by evaluation cost.
2630 // First, estimate evaluation costs
2631 // Eliminate predicates covered by the prefilter (those in s_pids).
2632 // I need to do it before the sort becuase the indices refer
2633 // to the position in the unsorted list.
2634 vector<cnf_elem *> tmp_wh;
2635 for(w=0;w<s_filt.size();++w){
2636 compute_cnf_cost(s_filt[w],Ext_fcns);
2637 tmp_wh.push_back(s_filt[w]);
2641 sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2643 // Now generate the predicates.
2644 for(w=0;w<s_filt.size();++w){
2645 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2648 // Find partial fcns ref'd in this cnf element
2650 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2651 // Since set<..> is a "Sorted Associative Container",
2652 // we can walk through it in sorted order by walking from
2653 // begin() to end(). (and the partial fcns must be
2654 // evaluated in this order).
2655 set<int>::iterator si;
2657 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2658 if(fcn_ref_cnt[(*si)] > 1){
2659 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2661 if(is_partial_fcn[(*si)]){
2662 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2663 ret += "\t\tif(retval) goto end_s;\n";
2665 if(fcn_ref_cnt[(*si)] > 1){
2666 if(!is_partial_fcn[(*si)]){
2667 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2668 // Testing for S is a side branch.
2669 // I don't want a cacheable partial function to be
2670 // marked as evaluated. Therefore I mark the function
2671 // as evalauted ONLY IF it is not partial.
2672 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2678 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2679 ") ) goto end_s;\n";
2682 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2685 for(p=0;p<fs->hash_eq.size();++p)
2686 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2689 // First, generate the S scalar expressions in the hash_eq
2691 // Iterate over the bloom filters
2693 ret += "\t\tbucket=0;\n";
2694 for(p=0;p<fs->hash_eq.size();++p){
2696 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2697 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2698 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2700 // SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2702 " bucket &= "+int_to_string(bf_mask)+";\n"
2703 " SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2708 ret += "// Add the S record to the hash table, choose a position\n";
2709 ret += "\t\tbucket=0;\n";
2710 for(p=0;p<fs->hash_eq.size();++p){
2712 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2713 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2714 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2717 " bucket &= "+int_to_string(bf_mask)+";\n"
2718 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2720 // Try the first bucket
2722 for(p=0;p<fs->hash_eq.size();++p){
2723 if(p>0) ret += " && ";
2724 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2725 // " == s_equijoin_"+int_to_string(p);
2726 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2727 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2728 string rhs_op = "s_equijoin_"+int_to_string(p);
2729 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2731 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2732 ret += "\t\t}else{\n\t\t\tif(";
2733 for(p=0;p<fs->hash_eq.size();++p){
2734 if(p>0) ret += " && ";
2735 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2736 // " == s_equijoin_"+int_to_string(p);
2737 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2738 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2739 string rhs_op = "s_equijoin_"+int_to_string(p);
2740 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2742 ret += "){\n\t\t\t\tthe_bucket = bucket1;\n";
2743 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2744 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2745 ret += "\t\t\t}\n\t\t}\n";
2746 for(p=0;p<fs->hash_eq.size();++p){
2747 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2748 if(hdt->is_buffer_type()){
2749 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2752 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2753 " = s_equijoin_"+int_to_string(p)+";\n";
2756 ret+="\t\tt->join_table[the_bucket].ts = curr_fj_ts;\n";
2758 ret += "\tend_s:\n";
2760 // ------------------------------------------------------------
2761 // Next, determine if the R record should be processed.
2765 "// R (main stream) cheap predicate\n"
2769 // Unpack r_filt fields
2770 vector<cnf_elem *> r_filt = fs->pred_t0;
2771 for(w=0;w<r_filt.size();++w){
2772 col_id_set this_pred_cids;
2773 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2774 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2775 if(unpacked_cids.count( (*csi) ) == 0){
2776 int tblref = (*csi).tblvar_ref;
2777 int schref = (*csi).schema_ref;
2778 string field = (*csi).field;
2779 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2780 unpacked_cids.insert( (*csi) );
2785 // Sort R preds based on cost.
2787 vector<cnf_elem *> tmp_wh;
2788 for(w=0;w<r_filt.size();++w){
2789 compute_cnf_cost(r_filt[w],Ext_fcns);
2790 tmp_wh.push_back(r_filt[w]);
2794 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2796 // WARNING! the constant 20 below is a wild-ass guess.
2798 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
2800 // Test the cheap filters on R.
2803 // Now generate the predicates.
2804 for(w=0;w<cheap_rpos;++w){
2805 sprintf(tmpstr,"//\t\tcheap R predicate clause %d. (cost %d)\n",w,r_filt[w]->cost);
2808 // Find partial fcns ref'd in this cnf element
2810 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2811 // Since set<..> is a "Sorted Associative Container",
2812 // we can walk through it in sorted order by walking from
2813 // begin() to end(). (and the partial fcns must be
2814 // evaluated in this order).
2815 set<int>::iterator si;
2816 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2817 if(fcn_ref_cnt[(*si)] > 1){
2818 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2820 if(is_partial_fcn[(*si)]){
2821 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2822 ret += "\t\tif(retval) goto end;\n";
2824 if(fcn_ref_cnt[(*si)] > 1){
2825 if(!is_partial_fcn[(*si)]){
2826 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2828 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2833 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2837 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2840 ret += "\n// Do the join\n\n";
2841 for(p=0;p<fs->hash_eq.size();++p)
2842 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2845 // Passed the cheap pred, now test the join with S.
2848 ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2849 for(p=0;p<fs->hash_eq.size();++p){
2851 " bucket"+int_to_string(i)+
2852 " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2853 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2854 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2857 " bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2859 ret += "\tfound = 0;\n";
2860 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2862 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2863 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2864 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2873 ret += "\tfound = 0;\n";
2874 ret += "\t\tbucket=0;\n";
2875 for(p=0;p<fs->hash_eq.size();++p){
2877 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2878 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2879 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2882 " bucket &= "+int_to_string(bf_mask)+";\n"
2883 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2885 // Try the first bucket
2887 for(p=0;p<fs->hash_eq.size();++p){
2888 if(p>0) ret += " && ";
2889 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2890 // " == r_equijoin_"+int_to_string(p);
2891 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2892 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2893 string rhs_op = "s_equijoin_"+int_to_string(p);
2894 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2896 if(p>0) ret += " && ";
2897 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2898 ret += "){\n\t\t\tfound = 1;\n";
2899 ret += "\t\t}else {if(";
2900 for(p=0;p<fs->hash_eq.size();++p){
2901 if(p>0) ret += " && ";
2902 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2903 // " == r_equijoin_"+int_to_string(p);
2904 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2905 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2906 string rhs_op = "s_equijoin_"+int_to_string(p);
2907 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2909 if(p>0) ret += " && ";
2910 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2911 ret += ")\n\t\t\tfound=1;\n";
2920 // Test the expensive filters on R.
2921 if(cheap_rpos < r_filt.size()){
2923 // Now generate the predicates.
2924 for(w=cheap_rpos;w<r_filt.size();++w){
2925 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2928 // Find partial fcns ref'd in this cnf element
2930 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2931 // Since set<..> is a "Sorted Associative Container",
2932 // we can walk through it in sorted order by walking from
2933 // begin() to end(). (and the partial fcns must be
2934 // evaluated in this order).
2935 set<int>::iterator si;
2936 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2937 if(fcn_ref_cnt[(*si)] > 1){
2938 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2940 if(is_partial_fcn[(*si)]){
2941 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2942 ret += "\t\tif(retval) goto end;\n";
2944 if(fcn_ref_cnt[(*si)] > 1){
2945 if(!is_partial_fcn[(*si)]){
2946 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2948 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2953 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2957 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2962 /////////////// post the tuple
2964 // test passed : create the tuple, then assign to it.
2965 ret += "/*\t\tCreate and post the tuple\t*/\n";
2967 // Unpack r_filt fields
2968 for(s=0;s<sl_list.size();++s){
2969 col_id_set this_se_cids;
2970 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2971 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2972 if(unpacked_cids.count( (*csi) ) == 0){
2973 int tblref = (*csi).tblvar_ref;
2974 int schref = (*csi).schema_ref;
2975 string field = (*csi).field;
2976 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2977 unpacked_cids.insert( (*csi) );
2983 // Unpack partial fcns ref'd by the select clause.
2984 // Its a kind of a WHERE clause ...
2985 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2986 if(fcn_ref_cnt[p] > 1){
2987 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2989 if(is_partial_fcn[p]){
2990 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2991 ret += "\tif(retval) goto end;\n";
2993 if(fcn_ref_cnt[p] > 1){
2994 if(!is_partial_fcn[p]){
2995 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2997 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3002 // increment the counter of accepted tuples
3003 ret += "\n\t#ifdef LFTA_STATS\n";
3004 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3005 ret += "\t#endif\n\n";
3007 // First, compute the size of the tuple.
3009 // Unpack any BUFFER type selections into temporaries
3010 // so that I can compute their size and not have
3011 // to recompute their value during tuple packing.
3012 // I can use regular assignment here because
3013 // these temporaries are non-persistent.
3015 for(s=0;s<sl_list.size();s++){
3016 data_type *sdt = sl_list[s]->get_data_type();
3017 if(sdt->is_buffer_type()){
3018 sprintf(tmpstr,"\tselvar_%d = ",s);
3020 ret += generate_se_code(sl_list[s],schema);
3026 // The size of the tuple is the size of the tuple struct plus the
3027 // size of the buffers to be copied in.
3029 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3030 for(s=0;s<sl_list.size();s++){
3031 data_type *sdt = sl_list[s]->get_data_type();
3032 if(sdt->is_buffer_type()){
3033 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3040 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3041 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3043 // Test passed, make assignments to the tuple.
3045 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3047 // Mark tuple as REGULAR_TUPLE
3048 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3051 for(s=0;s<sl_list.size();s++){
3052 data_type *sdt = sl_list[s]->get_data_type();
3053 if(sdt->is_buffer_type()){
3054 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3056 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3059 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3061 // if(sdt->needs_hn_translation())
3062 // ret += sdt->hton_translation() +"( ";
3063 ret += generate_se_code(sl_list[s],schema);
3064 // if(sdt->needs_hn_translation())
3072 ret += "\tpost_tuple(tuple);\n";
3074 // Increment the counter of posted tuples
3075 ret += "\n\t#ifdef LFTA_STATS\n";
3076 ret += "\n\tt->out_tuple_cnt++;\n\n";
3077 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3078 ret += "\t#endif\n\n";
3085 string generate_wj_accept_body(watch_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
3091 string wl_schema = fs->from[1]->get_schema_name();
3092 string wl_elem_str = generate_watchlist_element_name(wl_schema);
3093 string wl_node_str = generate_watchlist_struct_name(wl_schema);
3094 string tgt = generate_watchlist_name(wl_schema);
3096 ret += "//\n//\t\tGenerate test to update watchtable here\n//\n\n";
3102 // ------------------------------------------------------------
3103 // Determine if the R record should be processed.
3107 "// R (main stream) cheap predicate\n"
3111 // Unpack r_filt fields
3112 vector<cnf_elem *> r_filt = fs->pred_t0;
3113 for(w=0;w<r_filt.size();++w){
3114 col_id_set this_pred_cids;
3115 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
3116 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3117 if(unpacked_cids.count( (*csi) ) == 0){
3118 int tblref = (*csi).tblvar_ref;
3119 int schref = (*csi).schema_ref;
3120 string field = (*csi).field;
3121 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3122 unpacked_cids.insert( (*csi) );
3127 // Sort R preds based on cost.
3129 vector<cnf_elem *> tmp_wh;
3130 for(w=0;w<r_filt.size();++w){
3131 compute_cnf_cost(r_filt[w],Ext_fcns);
3132 tmp_wh.push_back(r_filt[w]);
3136 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
3138 // WARNING! the constant 20 below is a wild-ass guess.
3140 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++);
3142 // Test the cheap filters on R.
3145 // Now generate the predicates.
3146 for(w=0;w<cheap_rpos;++w){
3147 sprintf(tmpstr,"//\t\tCheap R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3150 // Find partial fcns ref'd in this cnf element
3152 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3153 // Since set<..> is a "Sorted Associative Container",
3154 // we can walk through it in sorted order by walking from
3155 // begin() to end(). (and the partial fcns must be
3156 // evaluated in this order).
3157 set<int>::iterator si;
3158 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3159 if(fcn_ref_cnt[(*si)] > 1){
3160 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3162 if(is_partial_fcn[(*si)]){
3163 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3164 ret += "\t\tif(retval) goto end;\n";
3166 if(fcn_ref_cnt[(*si)] > 1){
3167 if(!is_partial_fcn[(*si)]){
3168 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3170 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3175 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3179 ret += "\n\n/*\t\t (no cheap R predicate to test)\t*/\n\n";
3182 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3183 map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
3184 vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
3185 for(w=0;w<kflds.size();++w){
3186 string kfld = kflds[w];
3187 col_id_set this_pred_cids;
3188 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
3189 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3190 if(unpacked_cids.count( (*csi) ) == 0){
3191 int tblref = (*csi).tblvar_ref;
3192 int schref = (*csi).schema_ref;
3193 string field = (*csi).field;
3194 if(tblref==0) // LHS from packet, don't unpack the RHS
3195 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3196 unpacked_cids.insert( (*csi) );
3202 ret += "\n// Do the join\n\n";
3203 ret += "\n// (ensure that the watchtable is fresh)\n";
3204 ret += "\tif(t->ux_time >= "+tgt+".next_refresh){\n";
3205 ret += "\t\treload_watchlist__"+wl_schema+"();\n";
3206 ret += "\t\t"+tgt+".next_refresh = t->ux_time+"+tgt+".refresh_interval;\n";
3210 for(p=0;p<fs->key_flds.size();++p){
3211 string kfld = fs->key_flds[p];
3212 ret += "\tr_equijoin_"+kfld+" = "+generate_se_code(fs->hash_eq[kfld]->pr->get_left_se(),schema)+";\n";
3216 // Passed the cheap pred, now test the join with S.
3217 ret += "\tbucket=0;\n";
3218 ret += "\thash=0;\n";
3219 for(p=0;p<fs->key_flds.size();++p){
3220 string kfld = fs->key_flds[p];
3222 " hash ^= (("+hash_nums[p%NRANDS]+" * lfta_"+
3223 fs->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_type_str()+
3224 +"_to_hash(r_equijoin_"+kfld+")));\n";
3226 ret += "\t\tbucket = (hash>>32) \% "+tgt+".ht_size;\n";
3228 ret += "\t\trec = "+tgt+".ht[bucket];\n";
3229 ret += "\t\twhile(rec!=NULL){\n";
3230 ret += "\t\t\tif(hash==rec->hashval){\n";
3231 ret += "\t\t\t\tif(";
3232 for(p=0;p<fs->key_flds.size();++p){
3233 string kfld = fs->key_flds[p];
3234 if(p>0) ret += " && ";
3235 data_type *hdt = fs->hash_eq[kfld]->pr->get_right_se()->get_data_type();
3236 string lhs_op = "r_equijoin_"+kfld;
3237 string rhs_op = "rec->"+kfld;
3238 ret += generate_equality_test(lhs_op,rhs_op,hdt);
3241 ret += "\t\t\t\t\tbreak;\n";
3243 ret += "\t\t\trec=rec->next;\n";
3245 ret += "\t\tif(rec==NULL)\n";
3246 ret += "\t\t\tgoto end;\n";
3248 ret += "\n/*\tPassed the hash lookup clause, unpack the other predicate fields. */\n";
3249 for(w=0;w<where.size();++w){
3250 col_id_set this_pred_cids;
3251 gather_pr_col_ids(where[w]->pr, this_pred_cids, gb_tbl);
3252 for(auto csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3253 if(unpacked_cids.count( (*csi) ) == 0){
3254 int tblref = (*csi).tblvar_ref;
3255 int schref = (*csi).schema_ref;
3256 string field = (*csi).field;
3257 if(tblref==0) // LHS from packet
3258 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3259 else // RHS from hash bucket
3260 ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3261 unpacked_cids.insert( (*csi) );
3267 // Test the expensive filters on R.
3268 // TODO Should merge this with other predicates and eval in order
3269 // of cost - see the fj code.
3270 // TODO join and postfilter predicates haven't been costed yet.
3271 if(cheap_rpos < r_filt.size()){
3273 // Now generate the predicates.
3274 for(w=cheap_rpos;w<r_filt.size();++w){
3275 sprintf(tmpstr,"//\t\tExpensive R predicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
3278 // Find partial fcns ref'd in this cnf element
3280 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
3281 // Since set<..> is a "Sorted Associative Container",
3282 // we can walk through it in sorted order by walking from
3283 // begin() to end(). (and the partial fcns must be
3284 // evaluated in this order).
3285 set<int>::iterator si;
3286 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3287 if(fcn_ref_cnt[(*si)] > 1){
3288 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3290 if(is_partial_fcn[(*si)]){
3291 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3292 ret += "\t\tif(retval) goto end;\n";
3294 if(fcn_ref_cnt[(*si)] > 1){
3295 if(!is_partial_fcn[(*si)]){
3296 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3298 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3303 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
3307 ret += "\n\n/*\t\t (no expensive R predicate to test)\t*/\n\n";
3310 // TODO sort the additional predicates by cost
3313 for(w=0;w<fs->pred_t1.size();++w){
3314 sprintf(tmpstr,"//\t\tS Predicate clause %d.(cost %d)\n",w,fs->pred_t1[w]->cost);
3317 // Find partial fcns ref'd in this cnf element
3319 collect_partial_fcns_pr(fs->pred_t1[w]->pr, pfcn_refs);
3320 // Since set<..> is a "Sorted Associative Container",
3321 // we can walk through it in sorted order by walking from
3322 // begin() to end(). (and the partial fcns must be
3323 // evaluated in this order).
3324 set<int>::iterator si;
3325 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3326 if(fcn_ref_cnt[(*si)] > 1){
3327 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3329 if(is_partial_fcn[(*si)]){
3330 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3331 ret += "\t\tif(retval) goto end;\n";
3333 if(fcn_ref_cnt[(*si)] > 1){
3334 if(!is_partial_fcn[(*si)]){
3335 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3337 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3342 ret += "\tif( !("+generate_predicate_code(fs->pred_t1[w]->pr,schema)+
3347 for(w=0;w<fs->join_filter.size();++w){
3348 sprintf(tmpstr,"//\t\tJoin Predicate clause %d.(cost %d)\n",w,fs->join_filter[w]->cost);
3351 // Find partial fcns ref'd in this cnf element
3353 collect_partial_fcns_pr(fs->join_filter[w]->pr, pfcn_refs);
3354 // Since set<..> is a "Sorted Associative Container",
3355 // we can walk through it in sorted order by walking from
3356 // begin() to end(). (and the partial fcns must be
3357 // evaluated in this order).
3358 set<int>::iterator si;
3359 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3360 if(fcn_ref_cnt[(*si)] > 1){
3361 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3363 if(is_partial_fcn[(*si)]){
3364 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3365 ret += "\t\tif(retval) goto end;\n";
3367 if(fcn_ref_cnt[(*si)] > 1){
3368 if(!is_partial_fcn[(*si)]){
3369 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3371 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3376 ret += "\tif( !("+generate_predicate_code(fs->join_filter[w]->pr,schema)+
3381 for(w=0;w<fs->postfilter.size();++w){
3382 sprintf(tmpstr,"//\t\tpostfilter Predicate clause %d.(cost %d)\n",w,fs->postfilter[w]->cost);
3385 // Find partial fcns ref'd in this cnf element
3387 collect_partial_fcns_pr(fs->postfilter[w]->pr, pfcn_refs);
3388 // Since set<..> is a "Sorted Associative Container",
3389 // we can walk through it in sorted order by walking from
3390 // begin() to end(). (and the partial fcns must be
3391 // evaluated in this order).
3392 set<int>::iterator si;
3393 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3394 if(fcn_ref_cnt[(*si)] > 1){
3395 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3397 if(is_partial_fcn[(*si)]){
3398 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3399 ret += "\t\tif(retval) goto end;\n";
3401 if(fcn_ref_cnt[(*si)] > 1){
3402 if(!is_partial_fcn[(*si)]){
3403 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3405 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3410 ret += "\tif( !("+generate_predicate_code(fs->postfilter[w]->pr,schema)+
3416 /////////////// post the tuple
3418 // test passed : create the tuple, then assign to it.
3419 ret += "/*\t\tCreate and post the tuple\t*/\n";
3422 for(s=0;s<sl_list.size();++s){
3423 col_id_set this_se_cids;
3424 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
3425 for(auto csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
3426 if(unpacked_cids.count( (*csi) ) == 0){
3427 int tblref = (*csi).tblvar_ref;
3428 int schref = (*csi).schema_ref;
3429 string field = (*csi).field;
3430 if(tblref==0) // LHS from packet
3431 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3432 else // RHS from hash bucket
3433 ret += "\tunpack_var_"+field+"_1 = rec->"+field+";\n";
3434 unpacked_cids.insert( (*csi) );
3440 // Unpack partial fcns ref'd by the select clause.
3441 // Its a kind of a WHERE clause ...
3442 for(p=sl_fcns_start;p<sl_fcns_end;p++){
3443 if(fcn_ref_cnt[p] > 1){
3444 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
3446 if(is_partial_fcn[p]){
3447 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3448 ret += "\tif(retval) goto end;\n";
3450 if(fcn_ref_cnt[p] > 1){
3451 if(!is_partial_fcn[p]){
3452 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
3454 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
3459 // increment the counter of accepted tuples
3460 ret += "\n\t#ifdef LFTA_STATS\n";
3461 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3462 ret += "\t#endif\n\n";
3464 // First, compute the size of the tuple.
3466 // Unpack any BUFFER type selections into temporaries
3467 // so that I can compute their size and not have
3468 // to recompute their value during tuple packing.
3469 // I can use regular assignment here because
3470 // these temporaries are non-persistent.
3472 for(s=0;s<sl_list.size();s++){
3473 data_type *sdt = sl_list[s]->get_data_type();
3474 if(sdt->is_buffer_type()){
3475 sprintf(tmpstr,"\tselvar_%d = ",s);
3477 ret += generate_se_code(sl_list[s],schema);
3483 // The size of the tuple is the size of the tuple struct plus the
3484 // size of the buffers to be copied in.
3486 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
3487 for(s=0;s<sl_list.size();s++){
3488 data_type *sdt = sl_list[s]->get_data_type();
3489 if(sdt->is_buffer_type()){
3490 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
3497 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
3498 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
3500 // Test passed, make assignments to the tuple.
3502 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
3504 // Mark tuple as REGULAR_TUPLE
3505 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
3508 for(s=0;s<sl_list.size();s++){
3509 data_type *sdt = sl_list[s]->get_data_type();
3510 if(sdt->is_buffer_type()){
3511 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
3513 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
3516 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
3518 // if(sdt->needs_hn_translation())
3519 // ret += sdt->hton_translation() +"( ";
3520 ret += generate_se_code(sl_list[s],schema);
3521 // if(sdt->needs_hn_translation())
3529 ret += "\tpost_tuple(tuple);\n";
3531 // Increment the counter of posted tuples
3532 ret += "\n\t#ifdef LFTA_STATS\n";
3533 ret += "\n\tt->out_tuple_cnt++;\n\n";
3534 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
3535 ret += "\t#endif\n\n";
3541 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
3545 ////////////// Processing for aggregtion query
3547 // First, search for a match. Start by unpacking the group-by attributes.
3549 // One complication : if a real-time aggregate flush occurs,
3550 // the GB attr has already been calculated. So don't compute
3551 // it again if 1) its temporal and 2) it will be computed in the
3552 // agggregate flush code.
3554 // Unpack the partial fcns ref'd by the gb's and the aggr defs.
3555 for(p=gb_fcns_start;p<gb_fcns_end;p++){
3556 if(is_partial_fcn[p]){
3557 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3558 ret += "\tif(retval) goto end;\n";
3561 for(p=ag_fcns_start;p<ag_fcns_end;p++){
3562 if(is_partial_fcn[p]){
3563 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
3564 ret += "\tif(retval) goto end;\n";
3568 // increment the counter of accepted tuples
3569 ret += "\n\t#ifdef LFTA_STATS\n";
3570 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
3571 ret += "\t#endif\n\n";
3573 ret += "/*\t\tTest if the group is in the hash table \t*/\n";
3574 // Compute the values of the group-by variables.
3575 for(g=0;g<gb_tbl->size();g++){
3576 data_type *gdt = gb_tbl->get_data_type(g);
3577 if((! gdt->is_temporal()) || temporal_flush == ""){
3579 if(gdt->is_buffer_type()){
3580 // NOTE : if the SE defining the gb is anything
3581 // other than a ref to a variable, this will generate
3582 // illegal code. To be resolved with Spatch.
3583 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
3584 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
3586 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
3587 gdt->get_buffer_assign_copy().c_str(), g, g);
3589 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
3596 // A quick aside : if any of the GB attrs are temporal,
3597 // test for change and flush if any change occurred.
3598 // We've already computed the flush code,
3599 // Put it here if this is not a real time query.
3600 // We've already unpacked all column refs, so no need to
3601 // do it again here.
3603 string rt_level = fs->get_val_of_def("real_time");
3604 if(rt_level == "" && temporal_flush != ""){
3605 ret += temporal_flush;
3608 // Compute the hash bucket
3609 if(gb_tbl->size() > 0){
3610 ret += "\thashval = ";\
3611 for(g=0;g<gb_tbl->size();g++){
3612 if(g>0) ret += " ^ ";
3613 data_type *gdt = gb_tbl->get_data_type(g);
3614 if(gdt->is_buffer_type()){
3615 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3616 gdt->get_type_str().c_str(), g);
3618 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
3619 gdt->get_type_str().c_str(), g);
3624 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
3625 ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
3627 ret+="\tprobe = 0;\n";
3628 ret+="\thash2 = 0;\n\n";
3631 // Does the lfta reference a udaf?
3632 bool has_udaf = false;
3633 for(a=0;a<aggr_tbl->size();a++){
3634 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
3637 // Scan for a match, or alternatively the best slot.
3638 // Currently, hardcode 5 tests.
3640 " gen_val = t->generation & SLOT_GEN_BITS;\n"
3641 " match_found = 0;\n"
3642 " best_slot = probe;\n"
3643 " for(i=0;i<5 && match_found == 0;i++){\n"
3644 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
3646 if(gb_tbl->size()>0){
3647 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
3649 string rhs_op, lhs_op;
3650 for(g=0;g<gb_tbl->size();g++){
3651 if(g>0) ret += " && ";
3653 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
3654 sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
3655 ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
3660 " match_found = 1;\n"
3661 " best_slot = probe;\n"
3664 "// Rate slots in case no match found: prefer empty, then full but old slots\n"
3665 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3666 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3667 " best_slot = probe;\n"
3669 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3670 " best_slot = probe;\n"
3674 " if(probe >= t->max_aggrs)\n"
3677 " if(match_found){\n"
3679 ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3682 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3684 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3685 if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3687 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3689 bool first_g = true;
3690 for(int g=0;g<gb_tbl->size();g++){
3691 data_type *gdt = gb_tbl->get_data_type(g);
3692 if(gdt->is_temporal()){
3693 if(first_g) first_g = false; else ret+=" + ";
3694 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3697 ret += ") == 0 ){\n";
3700 " fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3706 ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3708 "\t\t\t#ifdef LFTA_STATS\n"
3709 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3710 "\t\t\t\tt->collision_cnt++;\n\n"
3714 ret += generate_init_group(schema,"best_slot");
3724 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, bool is_wj, set<unsigned int> &s_pids){
3726 string ret="static gs_retval_t accept_packet_"+node_name+
3727 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3728 ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3732 // Define all of the variables needed by this
3736 // Gather all column references, need to define unpacking variables.
3739 col_id_set::iterator csi;
3741 // If its a filter join, rebind all colrefs
3742 // to the first range var, to avoid double unpacking.
3745 for(w=0;w<where.size();++w)
3746 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3747 for(s=0;s<sl_list.size();s++)
3748 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3751 for(w=0;w<where.size();++w){
3752 if(is_wj || is_fj || s_pids.count(w) == 0)
3753 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3755 for(s=0;s<sl_list.size();s++){
3756 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3761 for(g=0;g<gb_tbl->size();g++)
3762 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3765 // Variables for unpacking attributes.
3766 ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3767 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3768 int schref = (*csi).schema_ref;
3769 int tblref = (*csi).tblvar_ref;
3770 string field = (*csi).field;
3771 data_type dt(schema->get_type_name(schref,field));
3772 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3773 field.c_str(), tblref);
3779 // Variables that are always needed
3780 ret += "/*\t\tVariables which are always needed\t*/\n";
3781 ret += "\tgs_retval_t retval;\n";
3782 ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3783 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3785 ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3788 // Variables needed for aggregation queries.
3790 ret += "\n/*\t\tVariables for aggregation\t*/\n";
3791 ret+="\tunsigned int i, probe;\n";
3792 ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3793 ret+="\tgs_uint64_t hashval, hash2;\n";
3794 // Variables for storing group-by attribute values.
3795 if(gb_tbl->size() > 0)
3796 ret += "/*\t\tGroup-by attributes\t*/\n";
3797 for(g=0;g<gb_tbl->size();g++){
3798 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3800 data_type *gdt = gb_tbl->get_data_type(g);
3801 if(gdt->is_buffer_type()){
3802 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3807 // Temporaries for min/max
3808 string aggr_tmp_str = "";
3809 for(a=0;a<aggr_tbl->size();a++){
3810 string aggr_op = aggr_tbl->get_op(a);
3811 if(aggr_op == "MIN" || aggr_op == "MAX"){
3812 sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3813 aggr_tmp_str.append(tmpstr);
3816 if(aggr_tmp_str != ""){
3817 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3818 ret += aggr_tmp_str;
3821 // Variables for udaf output temporaries
3822 bool no_udaf = true;
3823 for(a=0;a<aggr_tbl->size();a++){
3824 if(! aggr_tbl->is_builtin(a)){
3826 ret+="/*\t\tUDAF output vars.\t*/\n";
3829 int afcn_id = aggr_tbl->get_fcn_id(a);
3830 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3831 sprintf(tmpstr,"udaf_ret%d", a);
3832 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3837 // Variables needed for a filter join query
3838 if(fs->node_type() == "filter_join"){
3839 filter_join_qpn *fjq = (filter_join_qpn *)fs;
3840 bool uses_bloom = fjq->use_bloom;
3841 ret += "/*\t\tJoin fields\t*/\n";
3842 for(g=0;g<fjq->hash_eq.size();g++){
3843 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3845 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3850 " /* Variables for fj bloom filter */ \n"
3851 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3852 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3853 "\tlong long int curr_fj_ts;\n"
3854 "\tlong long int curr_bin, the_bin;\n"
3859 " /* Variables for fj join table */ \n"
3860 "\tunsigned int i, bucket, found; \n"
3861 "\tunsigned int bucket1, the_bucket;\n"
3862 " long long int curr_fj_ts;\n"
3869 if(fs->node_type() == "watch_join"){
3870 watch_join_qpn *wlq = (watch_join_qpn *)fs;
3871 ret += "/*\t\tJoin fields\t*/\n";
3872 for(int k=0;k<wlq->key_flds.size(); ++k){
3873 string kfld = wlq->key_flds[k];
3874 ret += "\t"+wlq->hash_eq[kfld]->pr->get_right_se()->get_data_type()->get_cvar_type()+" s_equijoin_"+kfld+";\n";
3875 ret += "\t"+wlq->hash_eq[kfld]->pr->get_left_se()->get_data_type()->get_cvar_type()+" r_equijoin_"+kfld+";\n";
3878 " /* Variables for wl join table */ \n"
3879 "\tunsigned int i, bucket;\n"
3880 "\tunsigned long long int hash; \n";
3881 string wl_schema = wlq->from[1]->get_schema_name();
3882 string wl_elem_str = generate_watchlist_element_name(wl_schema);
3883 ret += "\tstruct "+wl_elem_str+" *rec = NULL;\n";
3889 // Variables needed to store selected attributes of BUFFER type
3890 // temporarily, in order to compute their size for storage
3891 // in an output tuple.
3893 string select_var_defs = "";
3894 for(int s=0;s<sl_list.size();s++){
3895 data_type *sdt = sl_list[s]->get_data_type();
3896 if(sdt->is_buffer_type()){
3897 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3898 select_var_defs.append(tmpstr);
3901 if(select_var_defs != ""){
3902 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3903 ret += select_var_defs;
3906 // Variables to store results of partial functions.
3908 if(partial_fcns.size()>0){
3909 ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3910 for(p=0;p<partial_fcns.size();++p){
3911 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3912 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3913 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3915 if(!is_aggr_query && fcn_ref_cnt[p] >1){
3916 ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3921 if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3925 // variable to hold packet struct //
3927 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3931 ret += "\t#ifdef LFTA_STATS\n";
3932 // variable to store counter of cpu cycles spend in accept_tuple
3933 ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3934 // increment counter of received tuples
3935 ret += "\tt->in_tuple_cnt++;\n";
3936 ret += "\t#endif\n";
3939 // -------------------------------------------------
3940 // If the packet is "packet", test if its for this lfta,
3941 // and if so load it into its struct
3944 ret+="\n/* packed tuple : test and load. \t*/\n";
3945 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3946 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3947 ret+="\t\tgoto end;\n\n";
3952 col_id_set unpacked_cids; // Keep track of the cols that have been unpacked.
3954 string temporal_flush;
3956 ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3957 else { // non-aggregation operators
3959 // Unpack all the temporal attributes referenced in select clause
3960 // and update the last value of the attribute
3961 col_id_set temp_cids; // col ids of temp attributes in select clause
3963 for(s=0;s<sl_list.size();s++){
3964 data_type *sdt = sl_list[s]->get_data_type();
3965 if (sdt->is_temporal()) {
3966 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3969 // If this is a filter join,
3970 // ensure that the temporal range field is unpacked.
3972 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3973 if(temp_cids.count(window_var_cid)==0)
3974 temp_cids.insert(window_var_cid);
3977 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3978 if(unpacked_cids.count((*csi)) == 0){
3979 int tblref = (*csi).tblvar_ref;
3980 int schref = (*csi).schema_ref;
3981 string field = (*csi).field;
3982 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3983 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3986 unpacked_cids.insert( (*csi) );
3992 vector<cnf_elem *> filter = fs->get_filter_clause();
3993 // Test the filter predicate (some query types have additional preds).
3994 if(filter.size() > 0 && !is_wj){ // watchlist join does specialized processing
3996 // Sort by evaluation cost.
3997 // First, estimate evaluation costs
3998 // Eliminate predicates covered by the prefilter (those in s_pids).
3999 // I need to do it before the sort becuase the indices refer
4000 // to the position in the unsorted list./
4001 vector<cnf_elem *> tmp_wh;
4002 for(w=0;w<filter.size();++w){
4003 if(s_pids.count(w) == 0){
4004 compute_cnf_cost(filter[w],Ext_fcns);
4005 tmp_wh.push_back(filter[w]);
4010 sort(filter.begin(), filter.end(), compare_cnf_cost());
4012 // Now generate the predicates.
4013 for(w=0;w<filter.size();++w){
4014 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
4016 // Find the set of variables accessed in this CNF elem,
4017 // but in no previous element.
4018 col_id_set this_pred_cids;
4019 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
4020 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4021 if(unpacked_cids.count( (*csi) ) == 0){
4022 int tblref = (*csi).tblvar_ref;
4023 int schref = (*csi).schema_ref;
4024 string field = (*csi).field;
4025 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4026 unpacked_cids.insert( (*csi) );
4029 // Find partial fcns ref'd in this cnf element
4031 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
4032 // Since set<..> is a "Sorted Associative Container",
4033 // we can walk through it in sorted order by walking from
4034 // begin() to end(). (and the partial fcns must be
4035 // evaluated in this order).
4036 set<int>::iterator si;
4037 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
4038 if(fcn_ref_cnt[(*si)] > 1){
4039 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
4041 if(is_partial_fcn[(*si)]){
4042 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
4043 ret += "\t\tif(retval) goto end;\n";
4045 if(fcn_ref_cnt[(*si)] > 1){
4046 if(!is_partial_fcn[(*si)]){
4047 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
4049 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
4054 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
4058 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
4062 // We've passed the WHERE clause,
4063 // unpack the remainder of the accessed fields.
4065 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
4066 vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
4067 for(w=0;w<h_eq.size();++w){
4068 col_id_set this_pred_cids;
4069 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
4070 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4071 if(unpacked_cids.count( (*csi) ) == 0){
4072 int tblref = (*csi).tblvar_ref;
4073 int schref = (*csi).schema_ref;
4074 string field = (*csi).field;
4075 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4076 unpacked_cids.insert( (*csi) );
4080 }else if(is_wj){ // STOPPED HERE move this to wj main body
4082 ret += "\n//\tPassed the WHERE clause, unpack the hash fields. \n";
4083 map<string, cnf_elem *> h_eq = ((watch_join_qpn *)fs)-> hash_eq;
4084 vector<string> kflds = ((watch_join_qpn *)fs)->key_flds;
4085 for(w=0;w<kflds.size();++w){
4086 string kfld = kflds[w];
4087 col_id_set this_pred_cids;
4088 gather_pr_col_ids(h_eq[kfld]->pr, this_pred_cids, gb_tbl);
4089 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
4090 if(unpacked_cids.count( (*csi) ) == 0){
4091 int tblref = (*csi).tblvar_ref;
4092 int schref = (*csi).schema_ref;
4093 string field = (*csi).field;
4094 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4095 unpacked_cids.insert( (*csi) );
4101 ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
4103 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4104 if(unpacked_cids.count( (*csi) ) == 0){
4105 int schref = (*csi).schema_ref;
4106 int tblref = (*csi).tblvar_ref;
4107 string field = (*csi).field;
4108 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
4109 unpacked_cids.insert( (*csi) );
4116 ////////////////// After this, the query types
4117 ////////////////// are processed differently.
4119 if(!is_aggr_query && !is_fj & !is_wj)
4120 ret += generate_sel_accept_body(fs, node_name, schema);
4121 else if(is_aggr_query)
4122 ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
4125 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4127 ret += generate_wj_accept_body((watch_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
4134 ret += "\n\tend:\n";
4135 ret += "\t#ifdef LFTA_STATS\n";
4136 ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
4137 ret += "\t#endif\n";
4138 ret += "\n\treturn 1;\n}\n\n";
4144 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
4147 string ret = "struct FTA * "+generate_alloc_name(node_name) +
4148 "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void *value){\n";
4150 ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
4153 ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
4155 // assign a streamid to fta instance
4156 ret+="\t/* assign a streamid */\n";
4157 ret+="\tf->f.ftaid = ftaid;\n";
4158 ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
4159 ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
4162 ret += "\tf->n_aggrs = 0;\n";
4164 ret += "\tf->max_aggrs = ";
4166 // Computing the number of aggregate blocks is a little
4167 // tricky. If there are no GB attrs, or if all GB attrs
4168 // are temporal, then use a single aggregate block, else
4169 // use a default value (10). A user specification overrides
4171 bool single_group = true;
4172 for(g=0;g<gb_tbl->size();g++){
4173 data_type *gdt = gb_tbl->get_data_type(g);
4174 if(! gdt->is_temporal() ){
4175 single_group = false;
4178 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
4179 int max_aggr_i = atoi(max_aggr_str.c_str());
4180 if(max_aggr_i <= 0){
4184 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
4186 unsigned int naggrs = 1; // make it power of 2
4187 unsigned int nones = 0;
4191 naggrs = naggrs << 1;
4192 max_aggr_i = max_aggr_i >> 1;
4194 if(nones==1) // in case it was already a power of 2.
4196 ret += int_to_string(naggrs);
4200 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
4201 ret+="\t\treturn(0);\n";
4203 // ret+="/* compute how many integers we need to store the hashmap */\n";
4204 // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
4205 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
4206 ret+="\t\treturn(0);\n";
4208 ret+="/*\t\tfill bitmap with zero \t*/\n";
4209 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
4210 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
4211 ret+="\tf->generation=0;\n";
4212 ret+="\tf->flush_pos = f->max_aggrs;\n";
4214 ret += "\tf->flush_ctr = 0;\n";
4220 ret+="\tf->first_exec = 1;\n";
4221 unsigned int n_bloom = 11;
4222 string n_bloom_str = fs->get_val_of_def("num_bloom");
4223 int tmp_n_bloom = atoi(n_bloom_str.c_str());
4225 n_bloom = tmp_n_bloom+1;
4227 unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
4228 if(window_len < n_bloom){
4229 n_bloom = window_len+1;
4232 int bf_exp_size = 12; // base-2 log of number of bits
4233 string bloom_len_str = fs->get_val_of_def("bloom_size");
4234 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
4235 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
4236 bf_exp_size = tmp_bf_exp_size;
4238 int bf_bit_size = 1 << 12;
4239 int bf_byte_size = bf_bit_size / (8*sizeof(char));
4241 int bf_tot = n_bloom*bf_byte_size;
4242 ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
4243 ret+="\t\treturn(0);\n";
4246 " for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
4247 " f->bf_table[i] = 0;\n"
4250 unsigned int ht_size = 4096;
4251 string ht_size_s = fs->get_val_of_def("aggregate_slots");
4252 int tmp_ht_size = atoi(ht_size_s.c_str());
4253 if(tmp_ht_size > 1024){
4254 unsigned int hs = 1; // make it power of 2
4257 tmp_ht_size = tmp_ht_size >> 1;
4261 ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
4262 ret+="\t\treturn(0);\n";
4265 " for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
4266 " f->join_table[i].ts = 0;\n"
4271 // Initialize the complex literals (which might be handles).
4273 for(cl=0;cl<complex_literals->size();cl++){
4274 literal_t *l = complex_literals->get_literal(cl);
4275 // sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
4276 // ret += tmpstr + l->to_C_code() + ";\n";
4277 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
4278 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4283 // Initialize the last seen values of temporal attributes to min(max) value of
4284 // their respective type
4285 // Create places to hold the last values of temporal attributes referenced in select clause
4288 col_id_set temp_cids; // col ids of temp attributes in select clause
4291 col_id_set::iterator csi;
4293 for(s=0;s<sl_list.size();s++){
4294 data_type *sdt = sl_list[s]->get_data_type();
4295 if (sdt->is_temporal()) {
4296 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
4300 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
4301 int tblref = (*csi).tblvar_ref;
4302 int schref = (*csi).schema_ref;
4303 string field = (*csi).field;
4304 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
4305 if (dt.is_increasing()) {
4306 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
4308 } else if (dt.is_decreasing()) {
4309 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
4314 // initialize last seen values of temporal groubpy variables
4316 for(g=0;g<gb_tbl->size();g++){
4317 data_type *dt = gb_tbl->get_data_type(g);
4318 if(dt->is_temporal()){
4320 fprintf(stderr,"group by attribute %s is temporal, ",
4321 gb_tbl->get_name(g).c_str());
4323 if(dt->is_increasing()){
4324 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
4326 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
4333 ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
4334 ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
4335 ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
4336 ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
4337 ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
4339 // Initialize runtime stats
4340 ret+="\tf->in_tuple_cnt = 0;\n";
4341 ret+="\tf->out_tuple_cnt = 0;\n";
4342 ret+="\tf->out_tuple_sz = 0;\n";
4343 ret+="\tf->accepted_tuple_cnt = 0;\n";
4344 ret+="\tf->cycle_cnt = 0;\n";
4345 ret+="\tf->collision_cnt = 0;\n";
4346 ret+="\tf->eviction_cnt = 0;\n";
4347 ret+="\tf->sampling_rate = 1.0;\n";
4349 ret+="\tf->trace_id = 0;\n\n";
4350 if(param_tbl->size() > 0){
4352 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
4353 "#ifndef LFTA_IN_NIC\n"
4354 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
4362 // Register the pass-by-handle parameters
4364 for(ph=0;ph<param_handle_table.size();++ph){
4365 data_type pdt(param_handle_table[ph]->type_name);
4366 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
4367 switch(param_handle_table[ph]->val_type){
4370 if(pdt.is_buffer_type()) ret += "&(";
4371 sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
4373 if(pdt.is_buffer_type()) ret += ")";
4377 // not complex, no constructor
4379 ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
4382 // query parameter handles are regstered/deregistered in the
4383 // load_params function.
4384 // ret += "t->param_"+param_handle_table[ph]->param_name;
4387 fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
4392 ret += "\treturn (struct FTA *) f;\n";
4401 //////////////////////////////////////////////////////////////////
4403 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
4404 // map<string,string> &int_fcn_defs,
4405 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
4410 /////////////////////////////////////////////////////////////
4411 /// Do operator-generic processing, such as
4412 /// gathering the set of referenced columns,
4413 /// generating structures, etc.
4415 // Initialize globals to empty.
4416 gb_tbl = NULL; aggr_tbl = NULL;
4417 global_id = -1; nicprop = NULL;
4418 param_tbl = fs->get_param_tbl();
4419 sl_list.clear(); where.clear();
4420 partial_fcns.clear();
4421 fcn_ref_cnt.clear(); is_partial_fcn.clear();
4422 pred_class.clear(); pred_pos.clear();
4423 sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
4424 gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
4427 // Does the lfta read packed results from the NIC?
4428 nicprop = nicp; // load into global
4430 packed_return = false;
4431 if(nicp && nicp->option_exists("Return")){
4432 if(nicp->option_value("Return") == "Packed"){
4433 packed_return = true;
4435 fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
4440 // Extract data which defines the query.
4441 // complex literals gathered now.
4442 complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
4443 param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
4444 string node_name = fs->get_node_name();
4445 bool is_fj = false, uses_bloom = false;
4447 bool is_watch_tbl = false;
4450 if(fs->node_type() == "spx_qpn"){
4451 is_aggr_query = false;
4452 spx_qpn *spx_node = (spx_qpn *)fs;
4453 sl_list = spx_node->get_select_se_list();
4454 where = spx_node->get_where_clause();
4458 if(fs->node_type() == "sgah_qpn"){
4459 is_aggr_query = true;
4460 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4461 sl_list = sgah_node->get_select_se_list();
4462 where = sgah_node->get_where_clause();
4463 gb_tbl = sgah_node->get_gb_tbl();
4464 aggr_tbl = sgah_node->get_aggr_tbl();
4466 if((sgah_node->get_having_clause()).size() > 0){
4467 fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
4470 if(fs->node_type() == "filter_join"){
4471 is_aggr_query = false;
4473 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4474 sl_list = fj_node->get_select_se_list();
4475 where = fj_node->get_where_clause();
4476 uses_bloom = fj_node->use_bloom;
4480 if(fs->node_type() == "watch_join"){
4481 is_aggr_query = false;
4483 watch_join_qpn *wl_node = (watch_join_qpn *)fs;
4484 sl_list = wl_node->get_select_se_list();
4485 where = wl_node->get_where_clause();
4489 if(fs->node_type() == "watch_tbl_qpn"){
4490 is_aggr_query = false;
4491 is_watch_tbl = true;
4492 vector<scalarexp_t *> empty_sl_list;
4493 vector<cnf_elem *> empty_where;
4494 sl_list = empty_sl_list;
4495 where = empty_where;
4499 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
4503 // Build list of "partial functions", by clause.
4504 // NOTE : partial fcns are not handles well.
4505 // The act of searching for them associates the fcn call
4506 // in the SE with an index to an array. Refs to the
4507 // fcn value are replaced with refs to the variable they are
4508 // unpacked into. A more general tagging mechanism would be better.
4511 vector<bool> *pfunc_ptr = NULL;
4512 vector<int> *ref_cnt_ptr = NULL;
4513 if(!is_aggr_query){ // don't collect cacheable fcns on aggr query.
4514 ref_cnt_ptr = &fcn_ref_cnt;
4515 pfunc_ptr = &is_partial_fcn;
4519 for(i=0;i<sl_list.size();i++){
4520 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4522 wh_fcns_start = sl_fcns_end = partial_fcns.size();
4523 for(i=0;i<where.size();i++){
4524 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
4526 gb_fcns_start = wh_fcns_end = partial_fcns.size();
4528 for(i=0;i<gb_tbl->size();i++){
4529 find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
4532 ag_fcns_start = gb_fcns_end = partial_fcns.size();
4533 if(aggr_tbl != NULL){
4534 for(i=0;i<aggr_tbl->size();i++){
4535 find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
4538 ag_fcns_end = partial_fcns.size();
4540 // Fill up the is_partial_fcn and fcn_ref_cnt arrays.
4542 for(i=0; i<partial_fcns.size();i++){
4543 fcn_ref_cnt.push_back(1);
4544 is_partial_fcn.push_back(true);
4548 // Unmark non-partial expensive functions referenced only once.
4549 for(i=0; i<partial_fcns.size();i++){
4550 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
4551 partial_fcns[i]->set_partial_ref(-1);
4555 node_name = normalize_name(node_name);
4557 retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
4559 if(packed_return){ // generate unpack struct
4560 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
4561 int schref = input_tbls[0]->get_schema_ref();
4562 vector<string> refd_cols;
4563 for(s=0;s<sl_list.size();++s){
4564 gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
4566 for(p=0;p<where.size();++p){
4567 // I'm not disabling these preds ...
4568 gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
4571 for(g=0;g<gb_tbl->size();++g){
4572 gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
4575 sort(refd_cols.begin(), refd_cols.end());
4576 retval += "struct "+node_name+"_input_struct{\n";
4577 retval += "\tint __lfta_id_fm_nic__;\n";
4579 for(vsi=0;vsi<refd_cols.size();++vsi){
4580 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
4581 retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
4587 /////////////////////////////////////////////////////
4588 // Common stuff unpacked, do some generation
4592 retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
4594 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
4596 retval += "\n\n// watchtable code here \n\n";
4597 watch_tbl_qpn *wl_node = (watch_tbl_qpn *)fs;
4598 retval += generate_watchlist_structs(node_name, wl_node->table_layout, wl_node->filename, wl_node->refresh_interval);
4599 retval += generate_watchlist_load(node_name, wl_node->table_layout, wl_node->key_flds);
4603 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, is_wj, uses_bloom, schema);
4604 retval += generate_tuple_struct(node_name, sl_list) ;
4607 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
4608 if(param_tbl->size() > 0)
4609 retval += generate_fta_load_params(node_name) ;
4610 retval += generate_fta_free(node_name, is_aggr_query) ;
4611 retval += generate_fta_control(node_name, schema, is_aggr_query) ;
4612 retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, is_wj, s_pids) ;
4614 /* extract the value of Time_Correlation from interface definition */
4618 vector<tablevar_t *> tvec = fs->get_input_tbls();
4619 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
4620 if (time_corr_vec.empty())
4621 time_corr = DEFAULT_TIME_CORR;
4623 time_corr = atoi(time_corr_vec[0].c_str());
4625 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query, is_wj) );
4626 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
4634 int compute_snap_len(qp_node *fs, table_list *schema, string snap_type){
4636 // Initialize global vars
4638 sl_list.clear(); where.clear();
4641 if(fs->node_type() == "watch_tbl_qpn"){
4645 if(fs->node_type() == "spx_qpn"){
4646 spx_qpn *spx_node = (spx_qpn *)fs;
4647 sl_list = spx_node->get_select_se_list();
4648 where = spx_node->get_where_clause();
4650 else if(fs->node_type() == "sgah_qpn"){
4651 sgah_qpn *sgah_node = (sgah_qpn *)fs;
4652 sl_list = sgah_node->get_select_se_list();
4653 where = sgah_node->get_where_clause();
4654 gb_tbl = sgah_node->get_gb_tbl();
4656 else if(fs->node_type() == "filter_join"){
4657 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
4658 sl_list = fj_node->get_select_se_list();
4659 where = fj_node->get_where_clause();
4661 else if(fs->node_type() == "watch_join"){
4662 watch_join_qpn *fj_node = (watch_join_qpn *)fs;
4663 sl_list = fj_node->get_select_se_list();
4664 where = fj_node->get_where_clause();
4666 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
4670 // Gather all column references, need to define unpacking variables.
4673 col_id_set::iterator csi;
4675 for(w=0;w<where.size();++w)
4676 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
4677 for(s=0;s<sl_list.size();s++){
4678 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
4683 for(g=0;g<gb_tbl->size();g++)
4684 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
4687 // compute snap length
4690 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4691 int schref = (*csi).schema_ref;
4692 int tblref = (*csi).tblvar_ref;
4693 string field = (*csi).field;
4695 if(snap_type == "index"){
4696 int pos = schema->get_field_idx(schref, field);
4697 if(pos>snap_len) snap_len = pos;
4700 param_list *field_params = schema->get_modifier_list(schref, field);
4701 if(field_params->contains_key("snap_len")){
4702 string fld_snap_str = field_params->val_of("snap_len");
4704 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
4705 if(fld_snap > snap_len) snap_len = fld_snap;
4708 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
4714 if(n_snap == cid_set.size()){
4723 // Function which computes an optimal
4724 // set of unpacking functions.
4726 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
4727 map<string, int> pfcn_count;
4728 map<string, int>::iterator msii;
4729 col_id_set::iterator cisi;
4730 set<string>::iterator ssi;
4733 while(ucol_fcn_map.size() < upref_cids.size()){
4735 // Gather unpack functions referenced by unaccounted-for
4736 // columns, and increment their reference count.
4738 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4739 if(ucol_fcn_map.count((*cisi)) == 0){
4740 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4741 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
4742 pfcn_count[(*ssi)]++;
4746 // Get the lowest cost per field function.
4747 float min_cost = 0.0;
4748 string best_fcn = "";
4749 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
4750 int fcost = Schema->get_ufcn_cost((*msii).first);
4752 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4755 float this_cost = (1.0*fcost)/(*msii).second;
4756 if(msii == pfcn_count.begin() || this_cost < min_cost){
4757 min_cost = this_cost;
4758 best_fcn = (*msii).first;
4762 fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4766 // Assign this function to the unassigned fcns which use it.
4767 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4768 if(ucol_fcn_map.count((*cisi)) == 0){
4769 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4770 if(ufcns.count(best_fcn)>0)
4771 ucol_fcn_map[(*cisi)] = best_fcn;
4779 // Generate an initial test test for the lfta
4780 // Assume that the predicate references no external functions,
4781 // and especially no partial functions,
4782 // aggregates, internal functions.
4783 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4784 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4785 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4786 vector<int> &lfta_snap_lens, string iface){
4787 col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4788 col_id_set::iterator csi;
4792 // Gather complex literals in the prefilter.
4793 cplx_lit_table *complex_literals = new cplx_lit_table();
4794 for(p=0;p<pred_list.size();++p){
4795 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4799 // Find the combinable predicates
4800 vector<predicate_t *> pr_list;
4801 for(p=0;p<pred_list.size();++p){
4802 find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4805 // Analyze the combinable predicates to find the predicate classes.
4806 pred_class.clear(); // idx to equiv pred in equiv_list
4807 pred_pos.clear(); // idx to returned bitmask.
4808 vector<predicate_t *> equiv_list;
4809 vector<int> num_equiv;
4812 for(p=0;p<pr_list.size();++p){
4813 for(q=0;q<equiv_list.size();++q){
4814 if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4817 if(q == equiv_list.size()){ // no equiv : create new
4818 pred_class.push_back(equiv_list.size());
4819 equiv_list.push_back(pr_list[p]);
4820 pred_pos.push_back(0);
4821 num_equiv.push_back(1);
4823 }else{ // pr_list[p] is equivalent to pred q
4824 pred_class.push_back(q);
4825 pred_pos.push_back(num_equiv[q]);
4830 // Generate the variables which hold the common pred handles
4831 ret += "/*\t\tprefilter global vars.\t*/\n";
4832 for(q=0;q<equiv_list.size();++q){
4833 for(p=0;p<=(num_equiv[q]/32);++p){
4834 ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4838 // Struct to hold prefilter complex literals
4839 ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4840 if(complex_literals->size() == 0)
4841 ret += "\tint no_variable;\n";
4843 for(cl=0;cl<complex_literals->size();cl++){
4844 literal_t *l = complex_literals->get_literal(cl);
4845 data_type *dtl = new data_type( l->get_type() );
4846 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4849 ret += "} prefilter_complex_lits_"+iface+";\n\n";
4852 // Generate the prefilter initialziation code
4853 ret += "void init_lfta_prefilter_"+iface+"(){\n";
4855 // First initialize complex literals, if any.
4856 ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4857 for(cl=0;cl<complex_literals->size();cl++){
4858 literal_t *l = complex_literals->get_literal(cl);
4859 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4860 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4864 set<int> epred_seen;
4865 for(p=0;p<pr_list.size();++p){
4866 int q = pred_class[p];
4867 //printf("\tq=%d\n",q);
4868 if(epred_seen.count(q)>0){
4869 ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4870 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4871 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4872 for(o=0;o<op_list.size();++o){
4874 ret += generate_se_code(op_list[o],Schema)+", ";
4877 ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4878 epred_seen.insert(q);
4880 ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4881 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4882 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4883 for(o=0;o<op_list.size();++o){
4885 ret += generate_se_code(op_list[o],Schema)+", ";
4888 ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4889 epred_seen.insert(q);
4896 // Start on main body code generation
4897 ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4900 ///--------------------------------------------------------------
4901 /// Generate and store the prefilter body,
4902 /// reuse it for the snap length calculator
4903 ///-------------------------------------------------------------
4906 body += "\tstruct packet *p = (struct packet *)pkt;\n";
4910 // Gather the colids to store unpacked variables.
4911 for(p=0;p<pred_list.size();++p){
4912 gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4915 // make the col_ids refer to the base tables, and
4916 // grab the col_ids with at least one unpacking function.
4917 for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4918 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4920 tmp_col_id.field = (*csi).field;
4921 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4922 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4923 cid_set.insert(tmp_col_id);
4924 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4925 if(fe->get_unpack_fcns().size()>0)
4926 upref_cids.insert(tmp_col_id);
4931 // Find the set of unpacking programs needed for the
4932 // prefilter fields.
4933 map<col_id, string,lt_col_id> ucol_fcn_map;
4934 find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4935 set<string> pref_ufcns;
4936 map<col_id, string,lt_col_id>::iterator mcis;
4937 for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4938 pref_ufcns.insert((*mcis).second);
4943 // Variables for unpacking attributes.
4944 body += "/*\t\tVariables for unpacking attributes\t*/\n";
4945 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4946 int schref = (*csi).schema_ref;
4947 int tblref = (*csi).tblvar_ref;
4948 string field = (*csi).field;
4949 data_type dt(Schema->get_type_name(schref,field));
4950 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4951 field.c_str(), tblref);
4953 sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4956 // Variables for unpacking temporal attributes.
4957 body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4958 for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4959 if (cid_set.count(*csi) == 0) {
4960 int schref = (*csi).schema_ref;
4961 int tblref = (*csi).tblvar_ref;
4962 string field = (*csi).field;
4963 data_type dt(Schema->get_type_name(schref,field));
4964 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4965 field.c_str(), tblref);
4972 // Variables for combinable predicate evaluation
4973 body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4974 for(q=0;q<equiv_list.size();++q){
4975 for(p=0;p<=(num_equiv[q]/32);++p){
4976 body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4981 // Variables that are always needed
4982 body += "/*\t\tVariables which are always needed\t*/\n";
4983 body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4984 body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4986 // Call the unpacking functions for the prefilter fields
4987 if(pref_ufcns.size() > 0)
4988 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4989 set<string>::iterator ssi;
4990 for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4991 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4995 // Unpack the accessed attributes
4996 body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4997 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4998 int tblref = (*csi).tblvar_ref;
4999 int schref = (*csi).schema_ref;
5000 string field = (*csi).field;
5001 sprintf(tmpstr,"\tret_%s_%d = (%s(p, &unpack_var_%s_%d) == 0);\n",
5002 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5006 // next unpack the temporal attributes and ignore the errors
5007 // We are assuming here that failed unpack of temporal attributes
5008 // is not going to overwrite the last stored value
5009 // Failed upacks are ignored
5010 for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
5011 int tblref = (*csi).tblvar_ref;
5012 int schref = (*csi).schema_ref;
5013 string field = (*csi).field;
5014 sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
5015 Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
5019 // Evaluate the combinable predicates
5020 if(equiv_list.size()>0)
5021 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
5022 for(q=0;q<equiv_list.size();++q){
5023 for(p=0;p<=(num_equiv[q]/32);++p){
5025 // Only call the common eval fcn if all ref'd fields present.
5026 col_id_set pred_cids;
5027 col_id_set::iterator cpi;
5028 gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
5029 if(pred_cids.size()>0){
5031 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5032 if(cpi != pred_cids.begin())
5034 string field = (*cpi).field;
5035 int tblref = (*cpi).tblvar_ref;
5036 body += "ret_"+field+"_"+int_to_string(tblref);
5041 body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
5042 vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
5043 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
5044 for(o=0;o<op_list.size();++o){
5046 body += ","+generate_se_code(op_list[o],Schema);
5054 for(p=0;p<pred_list.size();++p){
5055 col_id_set pred_cids;
5056 col_id_set::iterator cpi;
5057 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
5058 if(pred_cids.size()>0){
5060 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
5061 if(cpi != pred_cids.begin())
5063 string field = (*cpi).field;
5064 int tblref = (*cpi).tblvar_ref;
5065 body += "ret_"+field+"_"+int_to_string(tblref);
5069 body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
5070 body+="\tbitpos = bitpos << 1;\n";
5073 // ---------------------------------------------------------------
5074 // Finished with the body of the prefilter
5075 // --------------------------------------------------------------
5079 // Collect fields referenced by an lfta but not
5080 // already unpacked for the prefilter.
5082 //printf("upref_cids is:\n");
5083 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
5084 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5085 //printf("pref_ufcns is:\n");
5086 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
5087 //printf("\t%s\n",(*ssi).c_str());
5090 for(l=0;l<lfta_cols.size();++l){
5091 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
5092 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
5094 tmp_col_id.field = (*csi).field;
5095 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
5096 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
5097 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
5098 set<string> fld_ufcns = fe->get_unpack_fcns();
5099 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(), upref_cids.count(tmp_col_id));
5100 if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
5101 // Ensure that this field not already unpacked.
5103 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
5104 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
5105 if(pref_ufcns.count((*ssi))){
5106 //printf("Field already unpacked.\n");
5111 //printf("\tadding to unpack list\n");
5112 upall_cids.insert(tmp_col_id);
5118 //printf("upall_cids is:\n");
5119 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
5120 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
5122 // Get the set of unpacking programs for these.
5123 map<col_id, string,lt_col_id> uall_fcn_map;
5124 find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
5125 set<string> pall_ufcns;
5126 for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
5127 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
5128 pall_ufcns.insert((*mcis).second);
5131 // Iterate through the remaining set of unpacking function
5132 if(pall_ufcns.size() > 0)
5133 ret += "//\t\tcall all remaining field unpacking functions.\n";
5134 for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
5135 // gather the set of columns unpacked by this ufcn
5136 col_id_set fcol_set;
5137 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
5138 if(uall_fcn_map[(*csi)] == (*ssi))
5139 fcol_set.insert((*csi));
5142 // gather the set of lftas which access a field unpacked by the fcn
5143 set<long long int> clfta;
5144 for(l=0;l<lfta_cols.size();l++){
5145 for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
5146 if(lfta_cols[l].count((*csi)) > 0)
5149 if(csi != fcol_set.end())
5150 clfta.insert(lfta_sigs[l]);
5153 // generate the unpacking code
5155 set<long long int>::iterator sii;
5156 for(sii=clfta.begin();sii!=clfta.end();++sii){
5157 if(sii!=clfta.begin())
5159 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
5162 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
5166 ret += "\treturn(retval);\n\n";
5170 // --------------------------------------------------------
5171 // reuse prefilter body for snaplen calculator
5173 // This is dummy code, so I'm commenting it out.
5176 ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
5181 vector<int> s_snaps = lfta_snap_lens;
5182 sort(s_snaps.begin(), s_snaps.end());
5184 if(s_snaps[0] == -1){
5185 set<unsigned long long int> sigset;
5186 for(i=0;i<lfta_snap_lens.size();++i){
5187 if(lfta_snap_lens[i] == -1){
5188 sigset.insert(lfta_sigs[i]);
5192 set<unsigned long long int>::iterator sulli;
5193 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5194 if(sulli!=sigset.begin())
5196 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5199 ret += ") return -1;\n";
5202 int nextpos = lfta_snap_lens.size()-1;
5203 int nextval = lfta_snap_lens[nextpos];
5204 while(nextval >= 0){
5205 set<unsigned long long int> sigset;
5206 for(i=0;i<lfta_snap_lens.size();++i){
5207 if(lfta_snap_lens[i] == nextval){
5208 sigset.insert(lfta_sigs[i]);
5212 set<unsigned long long int>::iterator sulli;
5213 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
5214 if(sulli!=sigset.begin())
5216 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
5219 ret += ") return "+int_to_string(nextval)+";\n";
5221 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
5223 nextval = lfta_snap_lens[nextpos];
5227 ret += "\treturn 0;\n";
5238 // Generate the struct which will store the the values of
5239 // temporal attributesunpacked by prefilter
5240 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
5242 col_id_set::iterator csi;
5244 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
5246 string ret="struct prefilter_unpacked_temp_vars {\n";
5247 ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
5251 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
5252 int schref = (*csi).schema_ref;
5253 int tblref = (*csi).tblvar_ref;
5254 string field = (*csi).field;
5255 data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
5256 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
5257 field.c_str(), tblref);
5260 if (init_code != "")
5262 if (dt.is_increasing())
5263 init_code += dt.get_min_literal();
5265 init_code += dt.get_max_literal();
5270 ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";