1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
34 // default value for correlation between the interface card and
36 #define DEFAULT_TIME_CORR 16
41 extern string hash_nums[NRANDS];
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
98 // ----------------------------------------------
99 // Data extracted from the query plan node
100 // for use by code generation.
102 static cplx_lit_table *complex_literals; //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl; // Table of all referenced parameters.
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
109 static gb_table *gb_tbl; // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl; // Table of all referenced aggregates.
112 static bool packed_return; // unpack using structyure, not fcns
113 static nic_property *nicprop; // nic properties for this interface.
114 static int global_id;
117 // The partial_fcns vector can now refer to
118 // partial functions, or expensive functions
119 // which can be cached (if there are multiple refs). A couple
120 // of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
130 // These vectors are for combinable predicates.
131 static vector<int> pred_class; // identifies the group
132 static vector<int> pred_pos; // position in the group.
136 static char tmpstr[1000];
138 //////////////////////////////////////////////////////////////////////
139 /// Various utilities
141 string generate_fta_name(string node_name){
142 string ret = normalize_name(node_name);
152 string generate_aggr_struct_name(string node_name){
153 string ret = normalize_name(node_name);
157 ret += "_aggr_struct";
162 string generate_fj_struct_name(string node_name){
163 string ret = normalize_name(node_name);
172 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
175 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
176 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
178 if(!schema->get_modifier_list(schref,field)->contains_key("required"))
179 ret += "\tif(retval) goto "+end_goto+";\n";
182 // TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
183 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
184 if(dt.is_buffer_type()){
185 if(dt.get_type() != v_str_t){
186 ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
187 ret += "\t\tgoto "+end_goto+";\n";
188 ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
189 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
190 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
192 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
196 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
197 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
203 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
204 string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
207 for(g=0;g<gb_tbl->size();g++){
208 sprintf(tmpstr,"gb_var%d",g);
209 ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
213 for(a=0;a<aggr_tbl->size();a++){
215 sprintf(tmpstr,"aggr_var%d",a);
216 if(aggr_tbl->is_builtin(a))
217 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
219 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
223 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
232 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
235 if(fs->use_bloom == false){ // uses hash table instead
236 ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
238 for(k=0;k<fs->hash_eq.size();++k){
239 sprintf(tmpstr,"key_var%d",k);
240 ret += "\t"+fs->hash_eq[k]->pr->get_left_se()->get_data_type()->make_cvar(tmpstr)+";\n";
242 ret += "\tlong long int ts;\n";
252 string generate_fta_struct(string node_name, gb_table *gb_tbl,
253 aggregate_table *aggr_tbl, param_table *param_tbl,
254 cplx_lit_table *complex_literals,
255 vector<handle_param_tbl_entry *> ¶m_handle_table,
256 bool is_aggr_query, bool is_fj, bool uses_bloom,
259 string ret = "struct " + generate_fta_name(node_name) + "{\n";
260 ret += "\tstruct FTA f;\n";
262 //-------------------------------------------------------------
263 // Aggregate-specific fields
267 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
269 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
270 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
271 // ret+="\tint bitmap_size;\n";
272 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
273 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
274 ret += "\tint max_windows; // max number of open windows.\n";
275 ret += "\tunsigned int generation; // initially zero, increment on\n";
276 ret += "\t // every hash table flush - whether regular or induced.\n";
277 ret += "\t // Old groups are identified by a generation mismatch.\n";
278 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
279 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
284 bool uses_temporal_flush = false;
285 for(g=0;g<gb_tbl->size();g++){
286 data_type *dt = gb_tbl->get_data_type(g);
287 if(dt->is_temporal()){
289 fprintf(stderr,"group by attribute %s is temporal, ",
290 gb_tbl->get_name(g).c_str());
291 if(dt->is_increasing()){
292 fprintf(stderr,"increasing.\n");
294 fprintf(stderr,"decreasing.\n");
297 data_type *gdt = gb_tbl->get_data_type(g);
298 if(gdt->is_buffer_type()){
299 fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
301 sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
303 sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
305 sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
307 uses_temporal_flush = true;
313 if(! uses_temporal_flush){
314 fprintf(stderr,"Warning: no temporal flush.\n");
318 // ---------------------------------------------------------
319 // Filter-join specific fields
324 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
325 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
326 "\tint first_exec;\n"
327 "\tlong long int last_bin;\n"
328 "\tint last_bloom_pos;\n"
331 }else{ // limited hash table
333 " struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
340 //--------------------------------------------------------
343 // Create places to hold the parameters.
345 vector<string> param_vec = param_tbl->get_param_names();
346 for(p=0;p<param_vec.size();p++){
347 data_type *dt = param_tbl->get_data_type(param_vec[p]);
348 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
349 param_vec[p].c_str());
351 if(param_tbl->handle_access(param_vec[p])){
352 ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
356 // Create places to hold complex literals.
358 for(cl=0;cl<complex_literals->size();cl++){
359 literal_t *l = complex_literals->get_literal(cl);
360 data_type *dtl = new data_type( l->get_type() );
361 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
365 // Create places to hold the pass-by-handle parameters.
366 for(p=0;p<param_handle_table.size();++p){
367 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
371 // Create places to hold the last values of temporal
372 // attributes referenced in select clause
373 // we also need to store values of the temoral attributed
374 // of last flushed tuple in aggr queries
375 // to make sure we generate the cirrect temporal tuple
376 // in the presense of slow flushes
379 col_id_set temp_cids; // col ids of temp attributes in select clause
382 col_id_set::iterator csi;
384 for(s=0;s<sl_list.size();s++){
385 data_type *sdt = sl_list[s]->get_data_type();
386 if (sdt->is_temporal()) {
387 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
391 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
392 int tblref = (*csi).tblvar_ref;
393 int schref = (*csi).schema_ref;
394 string field = (*csi).field;
395 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
396 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
400 ret += "\tgs_uint64_t trace_id;\n\n";
402 // Fields to store the runtime stats
404 ret += "\tgs_uint32_t in_tuple_cnt;\n";
405 ret += "\tgs_uint32_t out_tuple_cnt;\n";
406 ret += "\tgs_uint32_t out_tuple_sz;\n";
407 ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
408 ret += "\tgs_uint64_t cycle_cnt;\n";
409 ret += "\tgs_uint32_t collision_cnt;\n";
410 ret += "\tgs_uint32_t eviction_cnt;\n";
411 ret += "\tgs_float_t sampling_rate;\n";
420 //------------------------------------------------------------
421 // Set colref tblvars to 0..
422 // (special processing for join-like operators in an lfta).
424 void reset_se_col_ids_tblvars(scalarexp_t *se, gb_table *gtbl){
425 vector<scalarexp_t *> operands;
431 switch(se->get_operator_type()){
437 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
440 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
441 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
445 se->get_colref()->set_tablevar_ref(0);
448 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
451 reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
457 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
460 operands = se->get_operands();
461 for(o=0;o<operands.size();o++){
462 reset_se_col_ids_tblvars(operands[o], gtbl);
466 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
467 se->get_lineno(), se->get_charno(),se->get_operator_type());
473 // reset column tblvars accessed in this pr.
475 void reset_pr_col_ids_tblvars(predicate_t *pr, gb_table *gtbl){
476 vector<scalarexp_t *> op_list;
479 switch(pr->get_operator_type()){
481 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
484 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
485 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
488 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
491 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
492 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
495 op_list = pr->get_op_list();
496 for(o=0;o<op_list.size();++o){
497 reset_se_col_ids_tblvars(op_list[o],gtbl) ;
501 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
502 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
509 // Generate code that makes reference
510 // to the tuple, and not to any aggregates.
511 static string generate_se_code(scalarexp_t *se,table_list *schema){
513 data_type *ldt, *rdt;
515 vector<scalarexp_t *> operands;
518 switch(se->get_operator_type()){
520 if(se->is_handle_ref()){
521 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
525 if(se->get_literal()->is_cpx_lit()){
526 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
530 return(se->get_literal()->to_C_code("")); // not complex, no constructor
532 if(se->is_handle_ref()){
533 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
538 ret += se->get_param_name();
541 ldt = se->get_left_se()->get_data_type();
542 if(ldt->complex_operator(se->get_op()) ){
543 ret += ldt->get_complex_operator(se->get_op());
545 ret += generate_se_code(se->get_left_se(),schema);
550 ret += generate_se_code(se->get_left_se(),schema);
555 ldt = se->get_left_se()->get_data_type();
556 rdt = se->get_right_se()->get_data_type();
558 if(ldt->complex_operator(rdt, se->get_op()) ){
559 ret += ldt->get_complex_operator(rdt, se->get_op());
561 ret += generate_se_code(se->get_left_se(),schema);
563 ret += generate_se_code(se->get_right_se(),schema);
567 ret += generate_se_code(se->get_left_se(),schema);
569 ret += generate_se_code(se->get_right_se(),schema);
574 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet unpacked ...
575 // so return the defining code.
576 ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
579 sprintf(tmpstr,"unpack_var_%s_%d",
580 se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
585 // Should not be ref'ing any aggr here.
586 if(se->get_aggr_ref() >= 0){
587 fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
588 return("ERROR in generate_se_code");
591 if(se->is_partial()){
592 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
596 operands = se->get_operands();
597 for(o=0;o<operands.size();o++){
599 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
601 ret += generate_se_code(operands[o], schema);
607 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
608 se->get_lineno(), se->get_charno(),se->get_operator_type());
609 return("ERROR in generate_se_code");
613 // generate code that refers only to aggregate data and constants.
614 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
617 data_type *ldt, *rdt;
619 vector<scalarexp_t *> operands;
622 switch(se->get_operator_type()){
624 if(se->is_handle_ref()){
625 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
629 if(se->get_literal()->is_cpx_lit()){
630 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
634 return(se->get_literal()->to_C_code("")); // not complex no constructor
636 if(se->is_handle_ref()){
637 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
642 ret += se->get_param_name();
645 ldt = se->get_left_se()->get_data_type();
646 if(ldt->complex_operator(se->get_op()) ){
647 ret += ldt->get_complex_operator(se->get_op());
649 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
654 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
659 ldt = se->get_left_se()->get_data_type();
660 rdt = se->get_right_se()->get_data_type();
662 if(ldt->complex_operator(rdt, se->get_op()) ){
663 ret += ldt->get_complex_operator(rdt, se->get_op());
665 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
667 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
671 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
673 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
678 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet
679 // unpacked ... so return the defining code.
680 sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
684 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
685 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
686 se->get_lineno(), se->get_charno());
692 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
697 if(se->get_aggr_ref() >= 0){
698 sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
703 if(se->is_partial()){
704 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
708 operands = se->get_operands();
709 for(o=0;o<operands.size();o++){
711 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
713 ret += generate_se_code_fm_aggr(operands[o], var, schema);
719 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
720 se->get_lineno(), se->get_charno(),se->get_operator_type());
721 return("ERROR in generate_se_code");
727 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
730 vector<scalarexp_t *> operands;
733 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
734 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
735 se->get_lineno(), se->get_charno());
736 return("ERROR in generate_se_code");
739 ret = "\tretval = " + se->get_op() + "( ";
740 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
743 operands = se->get_operands();
744 for(o=0;o<operands.size();o++){
746 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
748 ret += generate_se_code_fm_aggr(operands[o], var, schema);
755 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
758 vector<scalarexp_t *> operands;
760 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
761 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
762 se->get_lineno(), se->get_charno());
763 return("ERROR in generate_se_code");
766 ret = se->get_op() + "( ";
768 operands = se->get_operands();
769 for(o=0;o<operands.size();o++){
771 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
773 ret += generate_se_code(operands[o], schema);
782 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
785 vector<scalarexp_t *> operands;
788 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
789 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
790 se->get_lineno(), se->get_charno());
791 return("ERROR in generate_se_code");
794 ret = "\tretval = " + se->get_op() + "( ",
795 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
798 operands = se->get_operands();
799 for(o=0;o<operands.size();o++){
801 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
803 ret += generate_se_code(operands[o], schema);
814 static string generate_C_comparison_op(string op){
815 if(op == "=") return("==");
816 if(op == "<>") return("!=");
820 static string generate_C_boolean_op(string op){
821 if( (op == "AND") || (op == "And") || (op == "and") ){
824 if( (op == "OR") || (op == "Or") || (op == "or") ){
827 if( (op == "NOT") || (op == "Not") || (op == "not") ){
831 fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
832 return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
836 static string generate_predicate_code(predicate_t *pr,table_list *schema){
838 vector<literal_t *> litv;
840 data_type *ldt, *rdt;
841 vector<scalarexp_t *> op_list;
843 unsigned int bitmask;
845 switch(pr->get_operator_type()){
847 ldt = pr->get_left_se()->get_data_type();
850 litv = pr->get_lit_vec();
851 for(i=0;i<litv.size();i++){
852 if(i>0) ret += " || ";
855 if(ldt->complex_comparison(ldt) ){
856 ret += ldt->get_comparison_fcn(ldt) ;
858 if(ldt->is_buffer_type() ) ret += "&";
859 ret += generate_se_code(pr->get_left_se(), schema);
861 if(ldt->is_buffer_type() ) ret += "&";
862 if(litv[i]->is_cpx_lit()){
863 sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
866 ret += litv[i]->to_C_code("");
870 ret += generate_se_code(pr->get_left_se(), schema);
872 ret += litv[i]->to_C_code("");
881 ldt = pr->get_left_se()->get_data_type();
882 rdt = pr->get_right_se()->get_data_type();
885 if(ldt->complex_comparison(rdt) ){
886 ret += ldt->get_comparison_fcn(rdt);
888 if(ldt->is_buffer_type() ) ret += "&";
889 ret += generate_se_code(pr->get_left_se(),schema);
891 if(rdt->is_buffer_type() ) ret += "&";
892 ret += generate_se_code(pr->get_right_se(),schema);
894 ret += generate_C_comparison_op(pr->get_op());
897 ret += generate_se_code(pr->get_left_se(),schema);
898 ret += generate_C_comparison_op(pr->get_op());
899 ret += generate_se_code(pr->get_right_se(),schema);
905 ret += generate_C_boolean_op(pr->get_op());
906 ret += generate_predicate_code(pr->get_left_pr(),schema);
911 ret += generate_predicate_code(pr->get_left_pr(),schema);
912 ret += generate_C_boolean_op(pr->get_op());
913 ret += generate_predicate_code(pr->get_right_pr(),schema);
917 op_list = pr->get_op_list();
918 cref = pr->get_combinable_ref();
919 if(cref >= 0){ // predicate is a combinable pred reference
921 if(pred_class.size() >= cref && pred_class[cref] >= 0){
922 ppos = pred_pos[cref];
923 bitmask = 1 << ppos % 32;
924 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
930 ret = pr->get_op() + "(";
931 if (pr->is_sampling_fcn) {
932 ret += "t->sampling_rate";
933 if (!op_list.empty())
936 for(o=0;o<op_list.size();++o){
938 if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
940 ret += generate_se_code(op_list[o],schema);
945 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
946 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
947 return("ERROR in generate_predicate_code");
952 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
955 if(dt->complex_comparison(dt) ){
956 ret += dt->get_comparison_fcn(dt);
958 if(dt->is_buffer_type() ) ret += "&";
961 if(dt->is_buffer_type() ) ret += "&";
973 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
976 if(dt->complex_comparison(dt) ){
977 ret += dt->get_comparison_fcn(dt);
979 if(dt->is_buffer_type() ) ret += "&";
982 if(dt->is_buffer_type() ) ret += "&";
994 // Here I assume that only MIN and MAX aggregates can be computed
995 // over BUFFER data types.
997 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
998 string retval = "\t\t";
999 string op = atbl->get_op(aidx);
1002 if(! atbl->is_builtin(aidx)) {
1004 retval += op+"_LFTA_AGGR_UPDATE_(";
1005 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1006 retval+="("+var+")";
1007 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1008 for(o=0;o<opl.size();++o){
1010 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1012 retval += generate_se_code(opl[o], schema);
1019 // Built-in aggregate processing.
1021 data_type *dt = atbl->get_data_type(aidx);
1025 retval.append("++;\n");
1030 retval.append(" += ");
1031 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1032 retval.append(";\n");
1036 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1037 retval.append(tmpstr);
1038 if(dt->complex_comparison(dt)){
1039 if(dt->is_buffer_type())
1040 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1042 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1044 sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1046 retval.append(tmpstr);
1047 if(dt->is_buffer_type()){
1048 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1050 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1052 retval.append(tmpstr);
1057 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1058 retval.append(tmpstr);
1059 if(dt->complex_comparison(dt)){
1060 if(dt->is_buffer_type())
1061 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1063 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1065 sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1067 retval.append(tmpstr);
1068 if(dt->is_buffer_type()){
1069 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1071 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1073 retval.append(tmpstr);
1078 if(op == "AND_AGGR"){
1080 retval.append(" &= ");
1081 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1082 retval.append(";\n");
1085 if(op == "OR_AGGR"){
1087 retval.append(" |= ");
1088 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1089 retval.append(";\n");
1092 if(op == "XOR_AGGR"){
1094 retval.append(" ^= ");
1095 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1096 retval.append(";\n");
1099 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1100 return("ERROR: aggregate not recognized: "+op);
1106 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1108 string op = atbl->get_op(aidx);
1111 if(! atbl->is_builtin(aidx)) {
1113 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1114 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1115 retval+="("+var+"));\n";
1117 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1118 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1119 retval+="("+var+")";
1120 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1121 for(o=0;o<opl.size();++o){
1123 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1125 retval += generate_se_code(opl[o],schema);
1131 // Built-in aggregate processing.
1134 data_type *dt = atbl->get_data_type(aidx);
1137 retval = "\t\t"+var;
1138 retval.append(" = 1;\n");
1142 if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1143 op == "OR_AGGR" || op == "XOR_AGGR"){
1144 if(dt->is_buffer_type()){
1145 sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1146 retval.append(tmpstr);
1147 sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1148 retval.append(tmpstr);
1150 retval = "\t\t"+var;
1152 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1153 retval.append(";\n");
1158 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1159 return("ERROR: aggregate not recognized: "+op);
1163 ////////////////////////////////////////////////////////////
1166 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1167 std::string &node_name, std::string &schema_embed_str){
1168 // Include these only once, not once per lfta
1169 // string ret = "#include \"rts.h\"\n";
1170 // ret += "#include \"fta.h\"\n\n");
1172 string ret = "#ifndef LFTA_IN_NIC\n";
1173 ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1174 ret += "#include<stdio.h>\n";
1175 ret += "#include <limits.h>\n";
1176 ret += "#include <float.h>\n";
1177 ret += "#include \"rdtsc.h\"\n";
1186 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1188 // need to create and output the tuple.
1189 string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1190 // Check for any UDAFs with LFTA_BAILOUT
1191 ret += "\tlfta_bailout = 0;\n";
1192 for(a=0;a<aggr_tbl->size();a++){
1193 if(aggr_tbl->has_bailout(a)){
1194 ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1195 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1196 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1199 ret += "\tif(! lfta_bailout){\n";
1201 // First, compute the size of the tuple.
1203 // Unpack UDAF return values
1204 for(a=0;a<aggr_tbl->size();a++){
1205 if(! aggr_tbl->is_builtin(a)){
1206 ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1207 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1208 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1214 // Unpack partial fcns ref'd by the select clause.
1215 if(sl_fcns_start != sl_fcns_end){
1216 ret += "\t\tunpack_failed = 0;\n";
1217 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1218 if(is_partial_fcn[p]){
1219 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1220 "t->aggr_table["+idx+"].",schema);
1221 ret += "\t\tif(retval) unpack_failed = 1;\n";
1224 // BEGIN don't allocate tuple if
1225 ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1228 // Unpack any BUFFER type selections into temporaries
1229 // so that I can compute their size and not have
1230 // to recompute their value during tuple packing.
1231 // I can use regular assignment here because
1232 // these temporaries are non-persistent.
1234 for(s=0;s<sl_list.size();s++){
1235 data_type *sdt = sl_list[s]->get_data_type();
1236 if(sdt->is_buffer_type()){
1237 sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1239 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1245 // The size of the tuple is the size of the tuple struct plus the
1246 // size of the buffers to be copied in.
1248 ret += "\t\t\ttuple_size = sizeof( struct ";
1249 ret += generate_tuple_name(node_name);
1251 for(s=0;s<sl_list.size();s++){
1252 data_type *sdt = sl_list[s]->get_data_type();
1253 if(sdt->is_buffer_type()){
1254 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1261 ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1262 ret += "\t\t\tif( tuple != NULL){\n";
1265 // Test passed, make assignments to the tuple.
1267 ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1268 ret += generate_tuple_name(node_name) ;
1271 // Mark tuple as REGULAR_TUPLE
1272 ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1274 for(s=0;s<sl_list.size();s++){
1275 data_type *sdt = sl_list[s]->get_data_type();
1276 if(sdt->is_buffer_type()){
1277 sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1279 sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1282 sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1284 // if(sdt->needs_hn_translation())
1285 // ret += sdt->hton_translation() +"( ";
1286 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1287 // if(sdt->needs_hn_translation())
1294 ret += "\t\t\t\tpost_tuple(tuple);\n";
1295 ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1296 ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1297 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1298 ret += "\t\t\t\t#endif\n\n";
1301 if(sl_fcns_start != sl_fcns_end) // END don't allocate tuple if
1302 ret += "\t\t}\n"; // unpack failed.
1305 // Need to release memory held by BUFFER types.
1308 for(g=0;g<gb_tbl->size();g++){
1309 data_type *gdt = gb_tbl->get_data_type(g);
1310 if(gdt->is_buffer_type()){
1311 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1315 for(a=0;a<aggr_tbl->size();a++){
1316 if(aggr_tbl->is_builtin(a)){
1317 data_type *adt = aggr_tbl->get_data_type(a);
1318 if(adt->is_buffer_type()){
1319 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1323 ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1324 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1325 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1329 ret += "\t\tt->n_aggrs--;\n";
1335 string generate_gb_match_test(string idx){
1337 string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") && IS_NEW(t->aggr_table_bitmap,"+idx+")";
1338 if(gb_tbl->size()>0){
1339 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1342 // Next, scan list for a match on the group-by attributes.
1343 string rhs_op, lhs_op;
1344 for(g=0;g<gb_tbl->size();g++){
1347 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1348 sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1349 ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1359 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1363 ret += "/*\t\tMatch found : update in place.\t*/\n";
1366 for(a=0;a<aggr_tbl->size();a++){
1367 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1368 ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1369 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1372 // garbage collect copied buffer type gb attrs.
1373 for(g=0;g<gb_tbl->size();g++){
1374 data_type *gdt = gb_tbl->get_data_type(g);
1375 if(gdt->is_buffer_type()){
1376 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1383 bool first_udaf = true;
1386 for(a=0;a<aggr_tbl->size();a++){
1387 if(! aggr_tbl->is_builtin(a)){
1388 if(! first_udaf)ret += " || ";
1389 else first_udaf = false;
1390 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1391 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1392 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1396 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1397 ret += generate_tuple_from_aggr(node_name,schema,idx);
1398 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1405 string generate_init_group( table_list *schema, string idx){
1407 string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1408 // Fill up the aggregate block.
1409 for(g=0;g<gb_tbl->size();g++){
1410 sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1413 for(a=0;a<aggr_tbl->size();a++){
1414 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1415 ret += generate_aggr_init(tmpstr, aggr_tbl,a, schema);
1417 ret+="\t\tt->n_aggrs++;\n";
1422 string generate_fta_flush(string node_name, table_list *schema,
1423 ext_fcn_list *Ext_fcns){
1426 string select_var_defs ;
1429 // Flush from previous epoch
1431 ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1433 ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1434 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1435 ret += "\tint i, lfta_bailout;\n";
1436 ret += "\tunsigned int gen_val;\n";
1438 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1439 ret += generate_fta_name(node_name)+" *) f;\n";
1444 // Variables needed to store selected attributes of BUFFER type
1445 // temporarily, in order to compute their size for storage
1446 // in an output tuple.
1448 select_var_defs = "";
1449 for(s=0;s<sl_list.size();s++){
1450 data_type *sdt = sl_list[s]->get_data_type();
1451 if(sdt->is_buffer_type()){
1452 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1453 select_var_defs.append(tmpstr);
1456 if(select_var_defs != ""){
1457 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1458 ret += select_var_defs;
1462 // Variables to store results of partial functions.
1463 if(sl_fcns_start != sl_fcns_end){
1464 ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1465 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1466 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1467 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1470 ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1473 // Variables for udaf output temporaries
1474 bool no_udaf = true;
1476 for(a=0;a<aggr_tbl->size();a++){
1477 if(! aggr_tbl->is_builtin(a)){
1479 ret+="/*\t\tUDAF output vars.\t*/\n";
1482 int afcn_id = aggr_tbl->get_fcn_id(a);
1483 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1484 sprintf(tmpstr,"udaf_ret%d", a);
1485 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1490 // ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1492 ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1493 ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1494 ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1497 for(g=0;g<gb_tbl->size();g++){
1498 data_type *gdt = gb_tbl->get_data_type(g);
1499 if(gdt->is_temporal()){
1500 if(first_g) first_g=false; else ret+=" || ";
1501 ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1505 ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1507 "#ifdef LFTA_STATS\n"
1508 "\t\t\tt->eviction_cnt++;\n"
1513 ret+=generate_tuple_from_aggr(node_name,schema,"i");
1515 // ret+="\t\t\tt->n_aggrs--;\n"; // done in generate_tuple_from_aggr
1516 ret+="\t\t\tnflush--;\n";
1519 ret+="\tt->flush_pos=i;\n";
1520 ret+="\tif(t->n_aggrs == 0) {\n";
1521 ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1524 ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1526 for(int g=0;g<gb_tbl->size();g++){
1527 data_type *dt = gb_tbl->get_data_type(g);
1528 if(dt->is_temporal()){
1529 data_type *gdt = gb_tbl->get_data_type(g);
1530 if(!gdt->is_buffer_type()){
1531 sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1536 ret += "\t}\n}\n\n";
1541 // TODO Remove sprintf to perform string catenation
1542 string generate_fta_load_params(string node_name){
1544 vector<string> param_names = param_tbl->get_param_names();
1546 string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1547 ret += " *t, int sz, void *value, int initial_call){\n";
1548 ret += "\tint pos=0;\n";
1549 ret += "\tint data_pos;\n";
1551 for(p=0;p<param_names.size();p++){
1552 data_type *dt = param_tbl->get_data_type(param_names[p]);
1553 if(dt->is_buffer_type()){
1554 sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1556 sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1563 ret += "\n\tdata_pos = ";
1564 for(p=0;p<param_names.size();p++){
1565 if(p>0) ret += " + ";
1566 data_type *dt = param_tbl->get_data_type(param_names[p]);
1568 ret += dt->get_tuple_cvar_type();
1572 ret += "\tif(data_pos > sz) return 1;\n\n";
1575 for(p=0;p<param_names.size();p++){
1576 data_type *dt = param_tbl->get_data_type(param_names[p]);
1577 if(dt->is_buffer_type()){
1578 sprintf(tmpstr,"\taccess_var_%s = *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1580 switch( dt->get_type() ){
1582 // ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n"; // ntoh conversion
1583 // ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n"; // ntoh conversion
1584 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1586 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1588 sprintf(tmpstr,"\ttmp_var_%s.length = access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1592 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1596 // First, destroy the old
1597 ret += "\tif(! initial_call)\n";
1598 sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1600 // Next, create the new.
1601 sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1604 // if(dt->needs_hn_translation()){
1605 // sprintf(tmpstr,"\tt->param_%s = %s( *( (%s *)( (char *)value+pos) ) );\n",
1606 // param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1608 sprintf(tmpstr,"\tt->param_%s = *( (%s *)( (char *)value+pos) );\n",
1609 param_names[p].c_str(), dt->get_cvar_type().c_str() );
1613 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1617 // Register the pass-by-handle parameters
1619 ret += "/* register and de-register the pass-by-handle parameters */\n";
1622 for(ph=0;ph<param_handle_table.size();++ph){
1623 data_type pdt(param_handle_table[ph]->type_name);
1624 switch(param_handle_table[ph]->val_type){
1630 ret += "\tif(! initial_call)\n";
1631 sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1632 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1634 sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1637 if(pdt.is_buffer_type()) ret += "&(";
1638 ret += "t->param_"+param_handle_table[ph]->param_name;
1639 if(pdt.is_buffer_type()) ret += ")";
1643 sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1644 fprintf(stderr,"%s\n",tmpstr);
1649 ret+="\treturn 0;\n";
1658 string generate_fta_free(string node_name, bool is_aggr_query){
1660 string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1661 ret+= "\tstruct "+generate_fta_name(node_name)+
1662 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1663 ret += "\tint i;\n";
1666 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1667 ret+="\t/* \t\tmark all groups as old */\n";
1668 ret+="\tt->generation++;\n";
1669 ret+="\tt->flush_pos = 0;\n";
1670 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1673 // Deregister the pass-by-handle parameters
1674 ret += "/* de-register the pass-by-handle parameters */\n";
1676 for(ph=0;ph<param_handle_table.size();++ph){
1677 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1678 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1683 ret += "\treturn 0;\n}\n\n";
1688 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1689 string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f, gs_int32_t command, gs_int32_t sz, void *value){\n";
1690 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1691 ret += generate_fta_name(node_name)+" *) f;\n\n";
1695 ret += "\t/* temp status tuple */\n";
1696 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1697 ret += "\tgs_int32_t tuple_size;\n";
1701 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1703 ret+="\t\tif (!t->n_aggrs) {\n";
1704 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1705 ret+="\t\t\tif( tuple != NULL)\n";
1706 ret+="\t\t\t\tpost_tuple(tuple);\n";
1708 ret+="\t\t}else{\n";
1710 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1711 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1712 ret +="\t\tt->generation++;\n";
1713 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1714 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1715 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1716 ret+="\t\t\tt->flush_pos = 0;\n";
1717 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1722 if(param_tbl->size() > 0){
1724 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1725 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1726 "#ifndef LFTA_IN_NIC\n"
1727 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1734 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1735 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1739 ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1742 ret+="\t\tif (t->n_aggrs) {\n";
1743 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1744 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1745 ret +="\t\tt->generation++;\n";
1746 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1747 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1748 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1749 ret+="\t\t\tt->flush_pos = 0;\n";
1750 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1754 ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1755 ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1756 ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1758 /* mark tuple as EOF_TUPLE */
1759 ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1760 ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1761 ret += "\t\tpost_tuple(tuple);\n";
1764 ret += "\treturn 0;\n}\n\n";
1769 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){
1770 string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1771 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1772 ret += generate_fta_name(node_name)+" *) f;\n\n";
1774 ret += "\t/* Create a temp status tuple */\n";
1775 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1776 ret += "\tgs_int32_t tuple_size;\n";
1777 ret += "\tunsigned int i;\n";
1778 ret += "\ttime_t cur_time;\n";
1779 ret += "\tint time_advanced;\n";
1780 ret += "\tstruct fta_stat stats;\n";
1784 /* copy the last seen values of temporal attributes */
1785 col_id_set temp_cids; // col ids of temp attributes in select clause
1788 /* HACK: in order to reuse the SE generation code, we need to copy
1789 * the last values of the temp attributes into new variables
1790 * which have names unpack_var_XXX_XXX
1794 col_id_set::iterator csi;
1796 for(s=0;s<sl_list.size();s++){
1797 data_type *sdt = sl_list[s]->get_data_type();
1798 if (sdt->is_temporal()) {
1799 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
1803 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1804 int tblref = (*csi).tblvar_ref;
1805 int schref = (*csi).schema_ref;
1806 string field = (*csi).field;
1807 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1808 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
1812 if (is_aggr_query) {
1813 for(g=0;g<gb_tbl->size();g++){
1814 data_type *gdt = gb_tbl->get_data_type(g);
1815 if(gdt->is_temporal()){
1816 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1818 data_type *gdt = gb_tbl->get_data_type(g);
1819 if(gdt->is_buffer_type()){
1820 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1828 ret += "\ttime_advanced = 0;\n";
1830 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1831 int tblref = (*csi).tblvar_ref;
1832 int schref = (*csi).schema_ref;
1833 string field = (*csi).field;
1834 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1836 // update last seen value with the value seen
1837 ret += "\t#ifdef PREFILTER_DEFINED\n";
1838 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
1839 field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
1841 ret += "\t\ttime_advanced = 1;\n\t}\n";
1842 ret += "\t#endif\n";
1844 // we need to pay special attention to time fields
1845 if (field == "time" || field == "timestamp"){
1846 ret += "\tcur_time = time(&cur_time);\n";
1848 if (field == "time") {
1849 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
1852 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
1853 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1855 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
1856 field.c_str(), tblref, time_corr);
1858 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
1859 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1863 ret += "\t\ttime_advanced = 1;\n";
1866 sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
1867 field.c_str(), tblref, field.c_str(), tblref);
1870 sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
1871 field.c_str(), tblref, field.c_str(), tblref);
1876 // for aggregation lftas we need to check if the time was advanced beyond the current epoch
1877 if (is_aggr_query) {
1880 bool first_one = true;
1881 for(g=0;g<gb_tbl->size();g++){
1882 data_type *gdt = gb_tbl->get_data_type(g);
1883 if(gdt->is_temporal()){
1884 // To perform the test, first need to compute the value
1885 // of the temporal gb attrs.
1886 if(gdt->is_buffer_type()){
1887 // NOTE : if the SE defining the gb is anything
1888 // other than a ref to a variable, this will generate
1889 // illegal code. To be resolved with Spatch.
1890 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
1891 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
1893 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
1894 gdt->get_buffer_assign_copy().c_str(), g, g);
1896 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
1900 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
1901 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
1902 if(first_one){first_one = false;} else {change_test.append(") && (");}
1903 change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
1907 ret += "\n\tif( time_advanced && !( (";
1911 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
1912 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
1913 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1915 ret += "\t\t/* \t\tmark all groups as old */\n";
1916 ret +="\t\tt->generation++;\n";
1917 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1918 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1919 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1920 ret += "\t\tt->flush_pos = 0;\n";
1922 for(g=0;g<gb_tbl->size();g++){
1923 data_type *gdt = gb_tbl->get_data_type(g);
1924 if(gdt->is_temporal()){
1925 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
1926 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
1933 ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
1934 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
1935 ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
1938 for(s=0;s<sl_list.size();s++){
1939 data_type *sdt = sl_list[s]->get_data_type();
1940 if(sdt->is_temporal()){
1942 if (sl_list[s]->is_gb()) {
1943 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
1947 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
1949 // if(sdt->needs_hn_translation())
1950 // ret += sdt->hton_translation() +"( ";
1951 if (sl_list[s]->is_gb()) {
1952 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
1955 ret += generate_se_code(sl_list[s],schema);
1957 // if(sdt->needs_hn_translation())
1963 /* mark tuple as temporal */
1964 ret += "\n\t/* Mark tuple as temporal */\n";
1965 ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
1967 ret += "\n\t/* Copy trace id */\n";
1968 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
1970 ret += "\n\t/* Populate runtime stats */\n";
1971 ret += "\tstats.ftaid = f->ftaid;\n";
1972 ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
1973 ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
1974 ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
1975 ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
1976 ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
1977 ret += "\tstats.collision_cnt = t->collision_cnt;\n";
1978 ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
1979 ret += "\tstats.sampling_rate = t->sampling_rate;\n";
1981 ret += "\n#ifdef LFTA_PROFILE\n";
1982 ret += "\n\t/* Print stats */\n";
1983 ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
1984 ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
1985 ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
1986 ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
1987 ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
1988 ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
1989 ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
1990 ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
1991 ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
1992 ret += "\n#endif\n";
1995 ret += "\n\t/* Copy stats */\n";
1996 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
1997 ret+="\tpost_tuple(tuple);\n";
1999 ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2000 ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2002 ret += "\n\t/* Reset runtime stats */\n";
2003 ret += "\tt->in_tuple_cnt = 0;\n";
2004 ret += "\tt->out_tuple_cnt = 0;\n";
2005 ret += "\tt->out_tuple_sz = 0;\n";
2006 ret += "\tt->accepted_tuple_cnt = 0;\n";
2007 ret += "\tt->cycle_cnt = 0;\n";
2008 ret += "\tt->collision_cnt = 0;\n";
2009 ret += "\tt->eviction_cnt = 0;\n";
2011 ret += "\treturn 0;\n}\n\n";
2017 // accept processing before the where clause,
2018 // do flush processwing.
2019 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema, col_id_set &unpacked_cids, string &temporal_flush){
2023 string ret="\n/*\tslow flush\t*/\n";
2024 string slow_flush_str = fs->get_val_of_def("slow_flush");
2025 int n_slow_flush = atoi(slow_flush_str.c_str());
2026 if(n_slow_flush <= 0) n_slow_flush = 2;
2027 if(n_slow_flush > 1){
2028 ret += "\tt->flush_ctr++;\n";
2029 ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2030 ret += "\t\tt->flush_ctr = 0;\n";
2031 ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2034 ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2039 bool first_one = true;
2041 col_id_set flush_cids; // col ids accessed when computing flush variables.
2042 // unpack them at temporal flush test time.
2043 temporal_flush = "";
2046 for(g=0;g<gb_tbl->size();g++){
2047 data_type *gdt = gb_tbl->get_data_type(g);
2048 if(gdt->is_temporal()){
2049 gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2051 // To perform the test, first need to compute the value
2052 // of the temporal gb attrs.
2053 if(gdt->is_buffer_type()){
2054 // NOTE : if the SE defining the gb is anything
2055 // other than a ref to a variable, this will generate
2056 // illegal code. To be resolved with Spatch.
2057 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2058 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2059 temporal_flush += tmpstr;
2060 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2061 gdt->get_buffer_assign_copy().c_str(), g, g);
2063 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2065 temporal_flush += tmpstr;
2066 // END computing the value of the temporal GB attr.
2069 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2070 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2071 if(first_one){first_one = false;} else {change_test.append(") && (");}
2072 change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2075 if(!first_one){ // will be false iff. there is a temporal GB attribute
2076 temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2077 temporal_flush += "\tif( !( (";
2078 temporal_flush += change_test;
2079 temporal_flush += ") ) ){\n";
2081 // temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2082 temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2083 temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2084 temporal_flush+="\t\t}\n";
2085 temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2086 temporal_flush+="\t\tt->generation++;\n";
2087 temporal_flush+="\t\tt->flush_pos = 0;\n";
2090 // Now set the saved temporal value of the gb to the
2091 // current value of the gb. Only for simple types,
2092 // not for buffer types -- but the strings are not
2093 // temporal in any case.
2095 for(g=0;g<gb_tbl->size();g++){
2096 data_type *gdt = gb_tbl->get_data_type(g);
2097 if(gdt->is_temporal()){
2098 if(gdt->is_buffer_type()){
2100 fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2102 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2103 temporal_flush += tmpstr;
2104 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2105 temporal_flush += tmpstr;
2109 temporal_flush += "\t}\n\n";
2112 // Unpack all the temporal attributes referenced in select clause
2113 // and update the last value of the attribute
2114 col_id_set temp_cids; // col ids of temp attributes in select clause
2115 col_id_set::iterator csi;
2117 for(s=0;s<sl_list.size();s++){
2118 data_type *sdt = sl_list[s]->get_data_type();
2119 if (sdt->is_temporal()) {
2120 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2124 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2125 if(unpacked_cids.count((*csi)) == 0){
2126 int tblref = (*csi).tblvar_ref;
2127 int schref = (*csi).schema_ref;
2128 string field = (*csi).field;
2129 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2131 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2132 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2133 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2135 ret += "\tif(retval) return 1;\n";
2137 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2140 unpacked_cids.insert( (*csi) );
2145 // Do the flush here if this is a real_time query
2146 string rt_level = fs->get_val_of_def("real_time");
2147 if(rt_level != "" && temporal_flush != ""){
2148 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2149 if(unpacked_cids.count((*csi)) == 0){
2150 int tblref = (*csi).tblvar_ref;
2151 int schref = (*csi).schema_ref;
2152 string field = (*csi).field;
2153 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2155 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2156 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2158 ret += "\tif(retval) return 1;\n";
2160 unpacked_cids.insert( (*csi) );
2163 ret += temporal_flush;
2169 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2174 /////////////// Processing for filter-only query
2176 // test passed : create the tuple, then assign to it.
2177 ret += "/*\t\tCreate and post the tuple\t*/\n";
2179 // Unpack partial fcns ref'd by the select clause.
2180 // Its a kind of a WHERE clause ...
2181 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2182 if(fcn_ref_cnt[p] > 1){
2183 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2185 if(is_partial_fcn[p]){
2186 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2187 ret += "\tif(retval) goto end;\n";
2189 if(fcn_ref_cnt[p] > 1){
2190 if(!is_partial_fcn[p]){
2191 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2193 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2198 // increment the counter of accepted tuples
2199 ret += "\n\t#ifdef LFTA_STATS\n";
2200 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2201 ret += "\t#endif\n\n";
2203 // First, compute the size of the tuple.
2205 // Unpack any BUFFER type selections into temporaries
2206 // so that I can compute their size and not have
2207 // to recompute their value during tuple packing.
2208 // I can use regular assignment here because
2209 // these temporaries are non-persistent.
2211 for(s=0;s<sl_list.size();s++){
2212 data_type *sdt = sl_list[s]->get_data_type();
2213 if(sdt->is_buffer_type()){
2214 sprintf(tmpstr,"\tselvar_%d = ",s);
2216 ret += generate_se_code(sl_list[s],schema);
2222 // The size of the tuple is the size of the tuple struct plus the
2223 // size of the buffers to be copied in.
2225 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2226 for(s=0;s<sl_list.size();s++){
2227 data_type *sdt = sl_list[s]->get_data_type();
2228 if(sdt->is_buffer_type()){
2229 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2236 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2237 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2239 // Test passed, make assignments to the tuple.
2241 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2243 // Mark tuple as REGULAR_TUPLE
2244 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2247 for(s=0;s<sl_list.size();s++){
2248 data_type *sdt = sl_list[s]->get_data_type();
2249 if(sdt->is_buffer_type()){
2250 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2252 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2255 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2257 // if(sdt->needs_hn_translation())
2258 // ret += sdt->hton_translation() +"( ";
2259 ret += generate_se_code(sl_list[s],schema);
2260 // if(sdt->needs_hn_translation())
2268 ret += "\tpost_tuple(tuple);\n";
2270 // Increment the counter of posted tuples
2271 ret += "\n\t#ifdef LFTA_STATS\n";
2272 ret += "\tt->out_tuple_cnt++;\n";
2273 ret+="\tt->out_tuple_sz+=tuple_size;\n";
2274 ret += "\t#endif\n\n";
2281 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2287 unsigned int window_len = fs->temporal_range;
2288 unsigned int n_bloom = 11;
2289 string n_bloom_str = fs->get_val_of_def("num_bloom");
2290 int tmp_n_bloom = atoi(n_bloom_str.c_str());
2292 n_bloom = tmp_n_bloom+1;
2293 float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2294 sprintf(tmpstr,"%f",bloom_width);
2295 string bloom_width_str = tmpstr;
2297 if(window_len < n_bloom){
2298 n_bloom = window_len+1;
2299 bloom_width_str = "1";
2303 // Grab the current window time
2304 scalarexp_t winvar(fs->temporal_var);
2305 ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2307 int bf_exp_size = 12; // base-2 log of number of bits
2308 string bloom_len_str = fs->get_val_of_def("bloom_size");
2309 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2310 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2311 bf_exp_size = tmp_bf_exp_size;
2313 int bf_bit_size = 1 << bf_exp_size;
2314 int bf_byte_size = bf_bit_size / (8*sizeof(char));
2316 unsigned int ht_size = 4096;
2317 string ht_size_s = fs->get_val_of_def("aggregate_slots");
2318 int tmp_ht_size = atoi(ht_size_s.c_str());
2319 if(tmp_ht_size > 1024){
2320 unsigned int hs = 1; // make it power of 2
2323 tmp_ht_size = tmp_ht_size >> 1;
2330 for(i=0;i<bf_exp_size;i++)
2331 bf_mask = (bf_mask << 1) | 1;
2333 for(i=ht_size;i>1;i=i>>1)
2334 bf_mask = (bf_mask << 1) | 1;
2338 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2341 bloom_width_str.c_str(),
2353 // If this is a bloom-filter fj, first test if the
2354 // bloom filter needs to be advanced.
2355 // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2356 // t->bf_size : number of bits in bloom filter
2359 "// Clean out old bloom filters if needed.\n"
2360 " if(t->first_exec){\n"
2361 " t->first_exec = 0;\n"
2362 " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2363 " t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2365 " curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2366 " if(curr_bin != t->last_bin){\n"
2367 " for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2368 " t->last_bloom_pos++;\n"
2369 " if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2370 " t->last_bloom_pos = 0;\n"
2371 " tmp_i = t->last_bloom_pos;\n"
2372 " for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2373 " SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2377 " t->last_bin = curr_bin;\n"
2383 //-----------------------------------------------------------------
2384 // First, determine whether to do S (filter stream) processing.
2387 "// S (filtering stream) predicate, should it be processed?\n"
2390 // Sort S preds based on cost.
2391 vector<cnf_elem *> s_filt = fs->pred_t1;
2392 col_id_set::iterator csi;
2393 if(s_filt.size() > 0){
2395 // Unpack fields ref'd in the S pred
2396 for(w=0;w<s_filt.size();++w){
2397 col_id_set this_pred_cids;
2398 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2399 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2400 if(unpacked_cids.count( (*csi) ) == 0){
2401 int tblref = (*csi).tblvar_ref;
2402 int schref = (*csi).schema_ref;
2403 string field = (*csi).field;
2404 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2405 unpacked_cids.insert( (*csi) );
2411 // Sort by evaluation cost.
2412 // First, estimate evaluation costs
2413 // Eliminate predicates covered by the prefilter (those in s_pids).
2414 // I need to do it before the sort becuase the indices refer
2415 // to the position in the unsorted list.
2416 vector<cnf_elem *> tmp_wh;
2417 for(w=0;w<s_filt.size();++w){
2418 compute_cnf_cost(s_filt[w],Ext_fcns);
2419 tmp_wh.push_back(s_filt[w]);
2423 sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2425 // Now generate the predicates.
2426 for(w=0;w<s_filt.size();++w){
2427 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2430 // Find partial fcns ref'd in this cnf element
2432 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2433 // Since set<..> is a "Sorted Associative Container",
2434 // we can walk through it in sorted order by walking from
2435 // begin() to end(). (and the partial fcns must be
2436 // evaluated in this order).
2437 set<int>::iterator si;
2439 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2440 if(fcn_ref_cnt[(*si)] > 1){
2441 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2443 if(is_partial_fcn[(*si)]){
2444 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2445 ret += "\t\tif(retval) goto end_s;\n";
2447 if(fcn_ref_cnt[(*si)] > 1){
2448 if(!is_partial_fcn[(*si)]){
2449 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2450 // Testing for S is a side branch.
2451 // I don't want a cacheable partial function to be
2452 // marked as evaluated. Therefore I mark the function
2453 // as evalauted ONLY IF it is not partial.
2454 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2460 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2461 ") ) goto end_s;\n";
2464 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2467 for(p=0;p<fs->hash_eq.size();++p)
2468 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2471 // First, generate the S scalar expressions in the hash_eq
2473 // Iterate over the bloom filters
2475 ret += "\t\tbucket=0;\n";
2476 for(p=0;p<fs->hash_eq.size();++p){
2478 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2479 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2480 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2482 // SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2484 " bucket &= "+int_to_string(bf_mask)+";\n"
2485 " SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2490 ret += "\t\tbucket=0;\n";
2491 for(p=0;p<fs->hash_eq.size();++p){
2493 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2494 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2495 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2498 " bucket &= "+int_to_string(bf_mask)+";\n"
2499 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2501 // Try the first bucket
2503 for(p=0;p<fs->hash_eq.size();++p){
2504 if(p>0) ret += " && ";
2505 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2506 // " == s_equijoin_"+int_to_string(p);
2507 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2508 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2509 string rhs_op = "s_equijoin_"+int_to_string(p);
2510 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2512 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2513 ret += "\t\t}else {if(";
2514 for(p=0;p<fs->hash_eq.size();++p){
2515 if(p>0) ret += " && ";
2516 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2517 // " == s_equijoin_"+int_to_string(p);
2518 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2519 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2520 string rhs_op = "s_equijoin_"+int_to_string(p);
2521 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2523 ret += "){\n\t\t\tthe_bucket = bucket1;\n";
2524 ret += "\t\t}else{ if(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2525 ret+="\t\t\tthe_bucket = bucket;\n\t\t\telse the_bucket=bucket1;\n";
2527 for(p=0;p<fs->hash_eq.size();++p){
2528 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2529 if(hdt->is_buffer_type()){
2530 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2533 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2534 " = s_equijoin_"+int_to_string(p)+";\n";
2537 ret+="\t\tt->join_table[the_bucket].ts = curr_fj_ts;\n";
2539 ret += "\tend_s:\n";
2541 // ------------------------------------------------------------
2542 // Next, determine if the R record should be processed.
2546 "// R (main stream) cheap predicate\n"
2550 // Unpack r_filt fields
2551 vector<cnf_elem *> r_filt = fs->pred_t0;
2552 for(w=0;w<r_filt.size();++w){
2553 col_id_set this_pred_cids;
2554 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2555 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2556 if(unpacked_cids.count( (*csi) ) == 0){
2557 int tblref = (*csi).tblvar_ref;
2558 int schref = (*csi).schema_ref;
2559 string field = (*csi).field;
2560 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2561 unpacked_cids.insert( (*csi) );
2566 // Sort S preds based on cost.
2568 vector<cnf_elem *> tmp_wh;
2569 for(w=0;w<r_filt.size();++w){
2570 compute_cnf_cost(r_filt[w],Ext_fcns);
2571 tmp_wh.push_back(r_filt[w]);
2575 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2577 // WARNING! the constant 20 below is a wild-ass guess.
2579 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)
2581 // Test the cheap filters on R.
2584 // Now generate the predicates.
2585 for(w=0;w<cheap_rpos;++w){
2586 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2589 // Find partial fcns ref'd in this cnf element
2591 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2592 // Since set<..> is a "Sorted Associative Container",
2593 // we can walk through it in sorted order by walking from
2594 // begin() to end(). (and the partial fcns must be
2595 // evaluated in this order).
2596 set<int>::iterator si;
2597 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2598 if(fcn_ref_cnt[(*si)] > 1){
2599 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2601 if(is_partial_fcn[(*si)]){
2602 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2603 ret += "\t\tif(retval) goto end;\n";
2605 if(fcn_ref_cnt[(*si)] > 1){
2606 if(!is_partial_fcn[(*si)]){
2607 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2609 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2614 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2618 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2621 ret += "\n// Do the join\n\n";
2622 for(p=0;p<fs->hash_eq.size();++p)
2623 ret += "\t\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2626 // Passed the cheap pred, now test the join with S.
2629 ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2630 for(p=0;p<fs->hash_eq.size();++p){
2632 " bucket"+int_to_string(i)+
2633 " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2634 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2635 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2638 " bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2640 ret += "\tfound = 0;\n";
2641 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2643 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2644 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2645 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2654 ret += "\tfound = 0;\n";
2655 ret += "\t\tbucket=0;\n";
2656 for(p=0;p<fs->hash_eq.size();++p){
2658 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2659 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2660 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2663 " bucket &= "+int_to_string(bf_mask)+";\n"
2664 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2666 // Try the first bucket
2668 for(p=0;p<fs->hash_eq.size();++p){
2669 if(p>0) ret += " && ";
2670 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2671 // " == r_equijoin_"+int_to_string(p);
2672 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2673 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2674 string rhs_op = "s_equijoin_"+int_to_string(p);
2675 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2677 if(p>0) ret += " && ";
2678 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2679 ret += "){\n\t\t\tfound = 1;\n";
2680 ret += "\t\t}else {if(";
2681 for(p=0;p<fs->hash_eq.size();++p){
2682 if(p>0) ret += " && ";
2683 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2684 // " == r_equijoin_"+int_to_string(p);
2685 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2686 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2687 string rhs_op = "s_equijoin_"+int_to_string(p);
2688 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2690 if(p>0) ret += " && ";
2691 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2692 ret += ")\n\t\t\tfound=1;\n";
2701 // Test the expensive filters on R.
2702 if(cheap_rpos < r_filt.size()){
2704 // Now generate the predicates.
2705 for(w=cheap_rpos;w<r_filt.size();++w){
2706 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2709 // Find partial fcns ref'd in this cnf element
2711 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2712 // Since set<..> is a "Sorted Associative Container",
2713 // we can walk through it in sorted order by walking from
2714 // begin() to end(). (and the partial fcns must be
2715 // evaluated in this order).
2716 set<int>::iterator si;
2717 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2718 if(fcn_ref_cnt[(*si)] > 1){
2719 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2721 if(is_partial_fcn[(*si)]){
2722 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2723 ret += "\t\tif(retval) goto end;\n";
2725 if(fcn_ref_cnt[(*si)] > 1){
2726 if(!is_partial_fcn[(*si)]){
2727 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2729 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2734 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2738 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2743 /////////////// post the tuple
2745 // test passed : create the tuple, then assign to it.
2746 ret += "/*\t\tCreate and post the tuple\t*/\n";
2748 // Unpack r_filt fields
2749 for(s=0;s<sl_list.size();++s){
2750 col_id_set this_se_cids;
2751 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2752 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2753 if(unpacked_cids.count( (*csi) ) == 0){
2754 int tblref = (*csi).tblvar_ref;
2755 int schref = (*csi).schema_ref;
2756 string field = (*csi).field;
2757 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2758 unpacked_cids.insert( (*csi) );
2764 // Unpack partial fcns ref'd by the select clause.
2765 // Its a kind of a WHERE clause ...
2766 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2767 if(fcn_ref_cnt[p] > 1){
2768 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2770 if(is_partial_fcn[p]){
2771 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2772 ret += "\tif(retval) goto end;\n";
2774 if(fcn_ref_cnt[p] > 1){
2775 if(!is_partial_fcn[p]){
2776 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2778 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2783 // increment the counter of accepted tuples
2784 ret += "\n\t#ifdef LFTA_STATS\n";
2785 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2786 ret += "\t#endif\n\n";
2788 // First, compute the size of the tuple.
2790 // Unpack any BUFFER type selections into temporaries
2791 // so that I can compute their size and not have
2792 // to recompute their value during tuple packing.
2793 // I can use regular assignment here because
2794 // these temporaries are non-persistent.
2796 for(s=0;s<sl_list.size();s++){
2797 data_type *sdt = sl_list[s]->get_data_type();
2798 if(sdt->is_buffer_type()){
2799 sprintf(tmpstr,"\tselvar_%d = ",s);
2801 ret += generate_se_code(sl_list[s],schema);
2807 // The size of the tuple is the size of the tuple struct plus the
2808 // size of the buffers to be copied in.
2810 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2811 for(s=0;s<sl_list.size();s++){
2812 data_type *sdt = sl_list[s]->get_data_type();
2813 if(sdt->is_buffer_type()){
2814 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2821 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2822 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2824 // Test passed, make assignments to the tuple.
2826 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2828 // Mark tuple as REGULAR_TUPLE
2829 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2832 for(s=0;s<sl_list.size();s++){
2833 data_type *sdt = sl_list[s]->get_data_type();
2834 if(sdt->is_buffer_type()){
2835 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2837 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2840 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2842 // if(sdt->needs_hn_translation())
2843 // ret += sdt->hton_translation() +"( ";
2844 ret += generate_se_code(sl_list[s],schema);
2845 // if(sdt->needs_hn_translation())
2853 ret += "\tpost_tuple(tuple);\n";
2855 // Increment the counter of posted tuples
2856 ret += "\n\t#ifdef LFTA_STATS\n";
2857 ret += "\n\tt->out_tuple_cnt++;\n\n";
2858 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
2859 ret += "\t#endif\n\n";
2865 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
2869 ////////////// Processing for aggregtion query
2871 // First, search for a match. Start by unpacking the group-by attributes.
2873 // One complication : if a real-time aggregate flush occurs,
2874 // the GB attr has already been calculated. So don't compute
2875 // it again if 1) its temporal and 2) it will be computed in the
2876 // agggregate flush code.
2878 // Unpack the partial fcns ref'd by the gb's and the aggr defs.
2879 for(p=gb_fcns_start;p<gb_fcns_end;p++){
2880 if(is_partial_fcn[p]){
2881 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2882 ret += "\tif(retval) goto end;\n";
2885 for(p=ag_fcns_start;p<ag_fcns_end;p++){
2886 if(is_partial_fcn[p]){
2887 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2888 ret += "\tif(retval) goto end;\n";
2892 // increment the counter of accepted tuples
2893 ret += "\n\t#ifdef LFTA_STATS\n";
2894 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2895 ret += "\t#endif\n\n";
2897 ret += "/*\t\tTest if the group is in the hash table \t*/\n";
2898 // Compute the values of the group-by variables.
2899 for(g=0;g<gb_tbl->size();g++){
2900 data_type *gdt = gb_tbl->get_data_type(g);
2901 if((! gdt->is_temporal()) || temporal_flush == ""){
2903 if(gdt->is_buffer_type()){
2904 // NOTE : if the SE defining the gb is anything
2905 // other than a ref to a variable, this will generate
2906 // illegal code. To be resolved with Spatch.
2907 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2908 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2910 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2911 gdt->get_buffer_assign_copy().c_str(), g, g);
2913 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2920 // A quick aside : if any of the GB attrs are temporal,
2921 // test for change and flush if any change occurred.
2922 // We've already computed the flush code,
2923 // Put it here if this is not a real time query.
2924 // We've already unpacked all column refs, so no need to
2925 // do it again here.
2927 string rt_level = fs->get_val_of_def("real_time");
2928 if(rt_level == "" && temporal_flush != ""){
2929 ret += temporal_flush;
2932 // Compute the hash bucket
2933 if(gb_tbl->size() > 0){
2934 ret += "\thashval = ";\
2935 for(g=0;g<gb_tbl->size();g++){
2936 if(g>0) ret += " ^ ";
2937 data_type *gdt = gb_tbl->get_data_type(g);
2938 if(gdt->is_buffer_type()){
2939 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2940 gdt->get_type_str().c_str(), g);
2942 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2943 gdt->get_type_str().c_str(), g);
2948 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
2949 ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
2951 ret+="\tprobe = 0;\n";
2952 ret+="\thash2 = 0;\n\n";
2955 // Does the lfta reference a udaf?
2956 bool has_udaf = false;
2957 for(a=0;a<aggr_tbl->size();a++){
2958 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
2961 // Scan for a match, or alternatively the best slot.
2962 // Currently, hardcode 5 tests.
2964 " gen_val = t->generation & SLOT_GEN_BITS;\n"
2965 " match_found = 0;\n"
2966 " best_slot = probe;\n"
2967 " for(i=0;i<5 && match_found == 0;i++){\n"
2968 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
2970 if(gb_tbl->size()>0){
2971 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
2973 string rhs_op, lhs_op;
2974 for(g=0;g<gb_tbl->size();g++){
2975 if(g>0) ret += " && ";
2977 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
2978 sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
2979 ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
2984 " match_found = 1;\n"
2985 " best_slot = probe;\n"
2988 "// Rate slots in case no match found: prefer empty, then full but old slots\n"
2989 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
2990 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
2991 " best_slot = probe;\n"
2993 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
2994 " best_slot = probe;\n"
2998 " if(probe >= t->max_aggrs)\n"
3001 " if(match_found){\n"
3003 ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3006 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3008 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3009 if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3011 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3013 bool first_g = true;
3014 for(int g=0;g<gb_tbl->size();g++){
3015 data_type *gdt = gb_tbl->get_data_type(g);
3016 if(gdt->is_temporal()){
3017 if(first_g) first_g = false; else ret+=" + ";
3018 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3021 ret += ") == 0 ){\n";
3024 " fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3030 ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3032 "\t\t\t#ifdef LFTA_STATS\n"
3033 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3034 "\t\t\t\tt->collision_cnt++;\n\n"
3038 ret += generate_init_group(schema,"best_slot");
3048 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){
3050 string ret="static gs_retval_t accept_packet_"+node_name+
3051 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3052 ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3056 // Define all of the variables needed by this
3060 // Gather all column references, need to define unpacking variables.
3063 col_id_set::iterator csi;
3065 // If its a filter join, rebind all colrefs
3066 // to the first range var, to avoid double unpacking.
3069 for(w=0;w<where.size();++w)
3070 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3071 for(s=0;s<sl_list.size();s++)
3072 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3075 for(w=0;w<where.size();++w){
3076 if(is_fj || s_pids.count(w) == 0)
3077 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3079 for(s=0;s<sl_list.size();s++){
3080 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3085 for(g=0;g<gb_tbl->size();g++)
3086 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3089 // Variables for unpacking attributes.
3090 ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3091 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3092 int schref = (*csi).schema_ref;
3093 int tblref = (*csi).tblvar_ref;
3094 string field = (*csi).field;
3095 data_type dt(schema->get_type_name(schref,field));
3096 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3097 field.c_str(), tblref);
3103 // Variables that are always needed
3104 ret += "/*\t\tVariables which are always needed\t*/\n";
3105 ret += "\tgs_retval_t retval;\n";
3106 ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3107 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3109 ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3112 // Variables needed for aggregation queries.
3114 ret += "\n/*\t\tVariables for aggregation\t*/\n";
3115 ret+="\tunsigned int i, probe;\n";
3116 ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3117 ret+="\tgs_uint64_t hashval, hash2;\n";
3118 // Variables for storing group-by attribute values.
3119 if(gb_tbl->size() > 0)
3120 ret += "/*\t\tGroup-by attributes\t*/\n";
3121 for(g=0;g<gb_tbl->size();g++){
3122 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3124 data_type *gdt = gb_tbl->get_data_type(g);
3125 if(gdt->is_buffer_type()){
3126 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3131 // Temporaries for min/max
3132 string aggr_tmp_str = "";
3133 for(a=0;a<aggr_tbl->size();a++){
3134 string aggr_op = aggr_tbl->get_op(a);
3135 if(aggr_op == "MIN" || aggr_op == "MAX"){
3136 sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3137 aggr_tmp_str.append(tmpstr);
3140 if(aggr_tmp_str != ""){
3141 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3142 ret += aggr_tmp_str;
3145 // Variables for udaf output temporaries
3146 bool no_udaf = true;
3147 for(a=0;a<aggr_tbl->size();a++){
3148 if(! aggr_tbl->is_builtin(a)){
3150 ret+="/*\t\tUDAF output vars.\t*/\n";
3153 int afcn_id = aggr_tbl->get_fcn_id(a);
3154 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3155 sprintf(tmpstr,"udaf_ret%d", a);
3156 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3161 // Variables needed for a filter join query
3162 if(fs->node_type() == "filter_join"){
3163 filter_join_qpn *fjq = (filter_join_qpn *)fs;
3164 bool uses_bloom = fjq->use_bloom;
3165 ret += "/*\t\tJoin fields\t*/\n";
3166 for(g=0;g<fjq->hash_eq.size();g++){
3167 sprintf(tmpstr,"\t%s s_equijoin_%d, r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g,g);
3172 " /* Variables for fj bloom filter */ \n"
3173 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3174 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3175 "\tlong long int curr_fj_ts;\n"
3176 "\tunsigned int curr_bin, the_bin;\n"
3181 " /* Variables for fj join table */ \n"
3182 "\tunsigned int i, bucket, found; \n"
3183 "\tunsigned int bucket1, the_bucket;\n"
3184 " long long int curr_fj_ts;\n"
3191 // Variables needed to store selected attributes of BUFFER type
3192 // temporarily, in order to compute their size for storage
3193 // in an output tuple.
3195 string select_var_defs = "";
3196 for(s=0;s<sl_list.size();s++){
3197 data_type *sdt = sl_list[s]->get_data_type();
3198 if(sdt->is_buffer_type()){
3199 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3200 select_var_defs.append(tmpstr);
3203 if(select_var_defs != ""){
3204 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3205 ret += select_var_defs;
3208 // Variables to store results of partial functions.
3210 if(partial_fcns.size()>0){
3211 ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3212 for(p=0;p<partial_fcns.size();++p){
3213 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3214 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3215 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3217 if(!is_aggr_query && fcn_ref_cnt[p] >1){
3218 ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3223 if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3227 // variable to hold packet struct //
3229 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3233 ret += "\t#ifdef LFTA_STATS\n";
3234 // variable to store counter of cpu cycles spend in accept_tuple
3235 ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3236 // increment counter of received tuples
3237 ret += "\tt->in_tuple_cnt++;\n";
3238 ret += "\t#endif\n";
3241 // -------------------------------------------------
3242 // If the packet is "packet", test if its for this lfta,
3243 // and if so load it into its struct
3246 ret+="\n/* packed tuple : test and load. \t*/\n";
3247 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3248 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3249 ret+="\t\tgoto end;\n\n";
3254 col_id_set unpacked_cids; // Keep track of the cols that have been unpacked.
3256 string temporal_flush;
3258 ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3259 else { // non-aggregation operators
3261 // Unpack all the temporal attributes referenced in select clause
3262 // and update the last value of the attribute
3263 col_id_set temp_cids; // col ids of temp attributes in select clause
3265 for(s=0;s<sl_list.size();s++){
3266 data_type *sdt = sl_list[s]->get_data_type();
3267 if (sdt->is_temporal()) {
3268 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3271 // If this is a filter join,
3272 // ensure that the temporal range field is unpacked.
3274 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3275 if(temp_cids.count(window_var_cid)==0)
3276 temp_cids.insert(window_var_cid);
3279 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3280 if(unpacked_cids.count((*csi)) == 0){
3281 int tblref = (*csi).tblvar_ref;
3282 int schref = (*csi).schema_ref;
3283 string field = (*csi).field;
3284 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3285 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3288 unpacked_cids.insert( (*csi) );
3294 vector<cnf_elem *> filter = fs->get_filter_clause();
3295 // Test the filter predicate (some query types have additional preds).
3296 if(filter.size() > 0){
3298 // Sort by evaluation cost.
3299 // First, estimate evaluation costs
3300 // Eliminate predicates covered by the prefilter (those in s_pids).
3301 // I need to do it before the sort becuase the indices refer
3302 // to the position in the unsorted list./
3303 vector<cnf_elem *> tmp_wh;
3304 for(w=0;w<filter.size();++w){
3305 if(s_pids.count(w) == 0){
3306 compute_cnf_cost(filter[w],Ext_fcns);
3307 tmp_wh.push_back(filter[w]);
3312 sort(filter.begin(), filter.end(), compare_cnf_cost());
3314 // Now generate the predicates.
3315 for(w=0;w<filter.size();++w){
3316 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
3318 // Find the set of variables accessed in this CNF elem,
3319 // but in no previous element.
3320 col_id_set this_pred_cids;
3321 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
3322 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3323 if(unpacked_cids.count( (*csi) ) == 0){
3324 int tblref = (*csi).tblvar_ref;
3325 int schref = (*csi).schema_ref;
3326 string field = (*csi).field;
3327 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3328 unpacked_cids.insert( (*csi) );
3331 // Find partial fcns ref'd in this cnf element
3333 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
3334 // Since set<..> is a "Sorted Associative Container",
3335 // we can walk through it in sorted order by walking from
3336 // begin() to end(). (and the partial fcns must be
3337 // evaluated in this order).
3338 set<int>::iterator si;
3339 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3340 if(fcn_ref_cnt[(*si)] > 1){
3341 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3343 if(is_partial_fcn[(*si)]){
3344 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3345 ret += "\t\tif(retval) goto end;\n";
3347 if(fcn_ref_cnt[(*si)] > 1){
3348 if(!is_partial_fcn[(*si)]){
3349 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3351 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3356 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
3360 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
3364 // We've passed the WHERE clause,
3365 // unpack the remainder of the accessed fields.
3367 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3368 vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
3369 for(w=0;w<h_eq.size();++w){
3370 col_id_set this_pred_cids;
3371 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
3372 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3373 if(unpacked_cids.count( (*csi) ) == 0){
3374 int tblref = (*csi).tblvar_ref;
3375 int schref = (*csi).schema_ref;
3376 string field = (*csi).field;
3377 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3378 unpacked_cids.insert( (*csi) );
3383 ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
3385 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
3386 if(unpacked_cids.count( (*csi) ) == 0){
3387 int schref = (*csi).schema_ref;
3388 int tblref = (*csi).tblvar_ref;
3389 string field = (*csi).field;
3390 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3391 unpacked_cids.insert( (*csi) );
3398 ////////////////// After this, the query types
3399 ////////////////// are processed differently.
3401 if(!is_aggr_query && !is_fj)
3402 ret += generate_sel_accept_body(fs, node_name, schema);
3403 else if(is_aggr_query)
3404 ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
3406 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
3411 ret += "\n\tend:\n";
3412 ret += "\t#ifdef LFTA_STATS\n";
3413 ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
3414 ret += "\t#endif\n";
3415 ret += "\n\treturn 1;\n}\n\n";
3421 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
3424 string ret = "struct FTA * "+generate_alloc_name(node_name) +
3425 "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void *value){\n";
3427 ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
3430 ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
3432 // assign a streamid to fta instance
3433 ret+="\t/* assign a streamid */\n";
3434 ret+="\tf->f.ftaid = ftaid;\n";
3435 ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
3436 ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
3439 ret += "\tf->n_aggrs = 0;\n";
3441 ret += "\tf->max_aggrs = ";
3443 // Computing the number of aggregate blocks is a little
3444 // tricky. If there are no GB attrs, or if all GB attrs
3445 // are temporal, then use a single aggregate block, else
3446 // use a default value (10). A user specification overrides
3448 bool single_group = true;
3449 for(g=0;g<gb_tbl->size();g++){
3450 data_type *gdt = gb_tbl->get_data_type(g);
3451 if(! gdt->is_temporal() ){
3452 single_group = false;
3455 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
3456 int max_aggr_i = atoi(max_aggr_str.c_str());
3457 if(max_aggr_i <= 0){
3461 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
3463 unsigned int naggrs = 1; // make it power of 2
3464 unsigned int nones = 0;
3468 naggrs = naggrs << 1;
3469 max_aggr_i = max_aggr_i >> 1;
3471 if(nones==1) // in case it was already a power of 2.
3473 ret += int_to_string(naggrs);
3477 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
3478 ret+="\t\treturn(0);\n";
3480 // ret+="/* compute how many integers we need to store the hashmap */\n";
3481 // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
3482 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
3483 ret+="\t\treturn(0);\n";
3485 ret+="/*\t\tfill bitmap with zero \t*/\n";
3486 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
3487 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
3488 ret+="\tf->generation=0;\n";
3489 ret+="\tf->flush_pos = f->max_aggrs;\n";
3491 ret += "\tf->flush_ctr = 0;\n";
3497 ret+="\tf->first_exec = 1;\n";
3498 unsigned int n_bloom = 11;
3499 string n_bloom_str = fs->get_val_of_def("num_bloom");
3500 int tmp_n_bloom = atoi(n_bloom_str.c_str());
3502 n_bloom = tmp_n_bloom+1;
3504 unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
3505 if(window_len < n_bloom){
3506 n_bloom = window_len+1;
3509 int bf_exp_size = 12; // base-2 log of number of bits
3510 string bloom_len_str = fs->get_val_of_def("bloom_size");
3511 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
3512 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
3513 bf_exp_size = tmp_bf_exp_size;
3515 int bf_bit_size = 1 << 12;
3516 int bf_byte_size = bf_bit_size / (8*sizeof(char));
3518 int bf_tot = n_bloom*bf_byte_size;
3519 ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
3520 ret+="\t\treturn(0);\n";
3523 " for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
3524 " f->bf_table[i] = 0;\n"
3527 unsigned int ht_size = 4096;
3528 string ht_size_s = fs->get_val_of_def("aggregate_slots");
3529 int tmp_ht_size = atoi(ht_size_s.c_str());
3530 if(tmp_ht_size > 1024){
3531 unsigned int hs = 1; // make it power of 2
3534 tmp_ht_size = tmp_ht_size >> 1;
3538 ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
3539 ret+="\t\treturn(0);\n";
3542 " for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
3543 " f->join_table[i].ts = 0;\n"
3548 // Initialize the complex literals (which might be handles).
3550 for(cl=0;cl<complex_literals->size();cl++){
3551 literal_t *l = complex_literals->get_literal(cl);
3552 // sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
3553 // ret += tmpstr + l->to_C_code() + ";\n";
3554 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
3555 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
3560 // Initialize the last seen values of temporal attributes to min(max) value of
3561 // their respective type
3562 // Create places to hold the last values of temporal attributes referenced in select clause
3565 col_id_set temp_cids; // col ids of temp attributes in select clause
3568 col_id_set::iterator csi;
3570 for(s=0;s<sl_list.size();s++){
3571 data_type *sdt = sl_list[s]->get_data_type();
3572 if (sdt->is_temporal()) {
3573 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3577 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3578 int tblref = (*csi).tblvar_ref;
3579 int schref = (*csi).schema_ref;
3580 string field = (*csi).field;
3581 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
3582 if (dt.is_increasing()) {
3583 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
3585 } else if (dt.is_decreasing()) {
3586 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
3591 // initialize last seen values of temporal groubpy variables
3593 for(g=0;g<gb_tbl->size();g++){
3594 data_type *dt = gb_tbl->get_data_type(g);
3595 if(dt->is_temporal()){
3597 fprintf(stderr,"group by attribute %s is temporal, ",
3598 gb_tbl->get_name(g).c_str());
3600 if(dt->is_increasing()){
3601 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
3603 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
3610 ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
3611 ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
3612 ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
3613 ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
3614 ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
3616 // Initialize runtime stats
3617 ret+="\tf->in_tuple_cnt = 0;\n";
3618 ret+="\tf->out_tuple_cnt = 0;\n";
3619 ret+="\tf->out_tuple_sz = 0;\n";
3620 ret+="\tf->accepted_tuple_cnt = 0;\n";
3621 ret+="\tf->cycle_cnt = 0;\n";
3622 ret+="\tf->collision_cnt = 0;\n";
3623 ret+="\tf->eviction_cnt = 0;\n";
3624 ret+="\tf->sampling_rate = 1.0;\n";
3626 ret+="\tf->trace_id = 0;\n\n";
3627 if(param_tbl->size() > 0){
3629 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
3630 "#ifndef LFTA_IN_NIC\n"
3631 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
3639 // Register the pass-by-handle parameters
3641 for(ph=0;ph<param_handle_table.size();++ph){
3642 data_type pdt(param_handle_table[ph]->type_name);
3643 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
3644 switch(param_handle_table[ph]->val_type){
3647 if(pdt.is_buffer_type()) ret += "&(";
3648 sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
3650 if(pdt.is_buffer_type()) ret += ")";
3654 // not complex, no constructor
3656 ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
3659 // query parameter handles are regstered/deregistered in the
3660 // load_params function.
3661 // ret += "t->param_"+param_handle_table[ph]->param_name;
3664 fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
3669 ret += "\treturn (struct FTA *) f;\n";
3678 //////////////////////////////////////////////////////////////////
3680 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
3681 // map<string,string> &int_fcn_defs,
3682 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
3687 /////////////////////////////////////////////////////////////
3688 /// Do operator-generic processing, such as
3689 /// gathering the set of referenced columns,
3690 /// generating structures, etc.
3692 // Initialize globals to empty.
3693 gb_tbl = NULL; aggr_tbl = NULL;
3694 global_id = -1; nicprop = NULL;
3695 param_tbl = fs->get_param_tbl();
3696 sl_list.clear(); where.clear();
3697 partial_fcns.clear();
3698 fcn_ref_cnt.clear(); is_partial_fcn.clear();
3699 pred_class.clear(); pred_pos.clear();
3700 sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
3701 gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
3704 // Does the lfta read packed results from the NIC?
3705 nicprop = nicp; // load into global
3707 packed_return = false;
3708 if(nicp && nicp->option_exists("Return")){
3709 if(nicp->option_value("Return") == "Packed"){
3710 packed_return = true;
3712 fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
3717 // Extract data which defines the query.
3718 // complex literals gathered now.
3719 complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
3720 param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
3721 string node_name = fs->get_node_name();
3722 bool is_fj = false, uses_bloom = false;
3725 if(fs->node_type() == "spx_qpn"){
3726 is_aggr_query = false;
3727 spx_qpn *spx_node = (spx_qpn *)fs;
3728 sl_list = spx_node->get_select_se_list();
3729 where = spx_node->get_where_clause();
3733 if(fs->node_type() == "sgah_qpn"){
3734 is_aggr_query = true;
3735 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3736 sl_list = sgah_node->get_select_se_list();
3737 where = sgah_node->get_where_clause();
3738 gb_tbl = sgah_node->get_gb_tbl();
3739 aggr_tbl = sgah_node->get_aggr_tbl();
3741 if((sgah_node->get_having_clause()).size() > 0){
3742 fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
3745 if(fs->node_type() == "filter_join"){
3746 is_aggr_query = false;
3748 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3749 sl_list = fj_node->get_select_se_list();
3750 where = fj_node->get_where_clause();
3751 uses_bloom = fj_node->use_bloom;
3755 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
3759 // Build list of "partial functions", by clause.
3760 // NOTE : partial fcns are not handles well.
3761 // The act of searching for them associates the fcn call
3762 // in the SE with an index to an array. Refs to the
3763 // fcn value are replaced with refs to the variable they are
3764 // unpacked into. A more general tagging mechanism would be better.
3767 vector<bool> *pfunc_ptr = NULL;
3768 vector<int> *ref_cnt_ptr = NULL;
3769 if(!is_aggr_query){ // don't collect cacheable fcns on aggr query.
3770 ref_cnt_ptr = &fcn_ref_cnt;
3771 pfunc_ptr = &is_partial_fcn;
3775 for(i=0;i<sl_list.size();i++){
3776 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3778 wh_fcns_start = sl_fcns_end = partial_fcns.size();
3779 for(i=0;i<where.size();i++){
3780 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3782 gb_fcns_start = wh_fcns_end = partial_fcns.size();
3784 for(i=0;i<gb_tbl->size();i++){
3785 find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
3788 ag_fcns_start = gb_fcns_end = partial_fcns.size();
3789 if(aggr_tbl != NULL){
3790 for(i=0;i<aggr_tbl->size();i++){
3791 find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
3794 ag_fcns_end = partial_fcns.size();
3796 // Fill up the is_partial_fcn and fcn_ref_cnt arrays.
3798 for(i=0; i<partial_fcns.size();i++){
3799 fcn_ref_cnt.push_back(1);
3800 is_partial_fcn.push_back(true);
3804 // Unmark non-partial expensive functions referenced only once.
3805 for(i=0; i<partial_fcns.size();i++){
3806 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
3807 partial_fcns[i]->set_partial_ref(-1);
3811 node_name = normalize_name(node_name);
3813 retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
3815 if(packed_return){ // generate unpack struct
3816 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
3817 int schref = input_tbls[0]->get_schema_ref();
3818 vector<string> refd_cols;
3819 for(s=0;s<sl_list.size();++s){
3820 gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
3822 for(p=0;p<where.size();++p){
3823 // I'm not disabling these preds ...
3824 gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
3827 for(g=0;g<gb_tbl->size();++g){
3828 gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
3831 sort(refd_cols.begin(), refd_cols.end());
3832 retval += "struct "+node_name+"_input_struct{\n";
3833 retval += "\tint __lfta_id_fm_nic__;\n";
3835 for(vsi=0;vsi<refd_cols.size();++vsi){
3836 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
3837 retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
3843 /////////////////////////////////////////////////////
3844 // Common stuff unpacked, do some generation
3847 retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
3849 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
3851 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);
3852 retval += generate_tuple_struct(node_name, sl_list) ;
3855 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
3856 if(param_tbl->size() > 0)
3857 retval += generate_fta_load_params(node_name) ;
3858 retval += generate_fta_free(node_name, is_aggr_query) ;
3859 retval += generate_fta_control(node_name, schema, is_aggr_query) ;
3860 retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;
3863 /* extract the value of Time_Correlation from interface definition */
3867 vector<tablevar_t *> tvec = fs->get_input_tbls();
3868 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
3869 if (time_corr_vec.empty())
3870 time_corr = DEFAULT_TIME_CORR;
3872 time_corr = atoi(time_corr_vec[0].c_str());
3874 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );
3875 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
3882 int compute_snap_len(qp_node *fs, table_list *schema){
3884 // Initialize global vars
3886 sl_list.clear(); where.clear();
3888 if(fs->node_type() == "spx_qpn"){
3889 spx_qpn *spx_node = (spx_qpn *)fs;
3890 sl_list = spx_node->get_select_se_list();
3891 where = spx_node->get_where_clause();
3893 else if(fs->node_type() == "sgah_qpn"){
3894 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3895 sl_list = sgah_node->get_select_se_list();
3896 where = sgah_node->get_where_clause();
3897 gb_tbl = sgah_node->get_gb_tbl();
3899 else if(fs->node_type() == "filter_join"){
3900 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3901 sl_list = fj_node->get_select_se_list();
3902 where = fj_node->get_where_clause();
3904 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
3908 // Gather all column references, need to define unpacking variables.
3911 col_id_set::iterator csi;
3913 for(w=0;w<where.size();++w)
3914 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3915 for(s=0;s<sl_list.size();s++){
3916 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3921 for(g=0;g<gb_tbl->size();g++)
3922 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3925 // compute snap length
3928 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3929 int schref = (*csi).schema_ref;
3930 int tblref = (*csi).tblvar_ref;
3931 string field = (*csi).field;
3933 param_list *field_params = schema->get_modifier_list(schref, field);
3934 if(field_params->contains_key("snap_len")){
3935 string fld_snap_str = field_params->val_of("snap_len");
3937 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
3938 if(fld_snap > snap_len) snap_len = fld_snap;
3941 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
3946 if(n_snap == cid_set.size()){
3955 // Function which computes an optimal
3956 // set of unpacking functions.
3958 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
3959 map<string, int> pfcn_count;
3960 map<string, int>::iterator msii;
3961 col_id_set::iterator cisi;
3962 set<string>::iterator ssi;
3965 while(ucol_fcn_map.size() < upref_cids.size()){
3967 // Gather unpack functions referenced by unaccounted-for
3968 // columns, and increment their reference count.
3970 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
3971 if(ucol_fcn_map.count((*cisi)) == 0){
3972 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
3973 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
3974 pfcn_count[(*ssi)]++;
3978 // Get the lowest cost per field function.
3979 float min_cost = 0.0;
3980 string best_fcn = "";
3981 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
3982 int fcost = Schema->get_ufcn_cost((*msii).first);
3984 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
3987 float this_cost = (1.0*fcost)/(*msii).second;
3988 if(msii == pfcn_count.begin() || this_cost < min_cost){
3989 min_cost = this_cost;
3990 best_fcn = (*msii).first;
3994 fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
3998 // Assign this function to the unassigned fcns which use it.
3999 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4000 if(ucol_fcn_map.count((*cisi)) == 0){
4001 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4002 if(ufcns.count(best_fcn)>0)
4003 ucol_fcn_map[(*cisi)] = best_fcn;
4011 // Generate an initial test test for the lfta
4012 // Assume that the predicate references no external functions,
4013 // and especially no partial functions,
4014 // aggregates, internal functions.
4015 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4016 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4017 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4018 vector<int> &lfta_snap_lens, string iface){
4019 col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4020 col_id_set::iterator csi;
4024 // Gather complex literals in the prefilter.
4025 cplx_lit_table *complex_literals = new cplx_lit_table();
4026 for(p=0;p<pred_list.size();++p){
4027 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4031 // Find the combinable predicates
4032 vector<predicate_t *> pr_list;
4033 for(p=0;p<pred_list.size();++p){
4034 find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4037 // Analyze the combinable predicates to find the predicate classes.
4038 pred_class.clear(); // idx to equiv pred in equiv_list
4039 pred_pos.clear(); // idx to returned bitmask.
4040 vector<predicate_t *> equiv_list;
4041 vector<int> num_equiv;
4044 for(p=0;p<pr_list.size();++p){
4045 for(q=0;q<equiv_list.size();++q){
4046 if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4049 if(q == equiv_list.size()){ // no equiv : create new
4050 pred_class.push_back(equiv_list.size());
4051 equiv_list.push_back(pr_list[p]);
4052 pred_pos.push_back(0);
4053 num_equiv.push_back(1);
4055 }else{ // pr_list[p] is equivalent to pred q
4056 pred_class.push_back(q);
4057 pred_pos.push_back(num_equiv[q]);
4062 // Generate the variables which hold the common pred handles
4063 ret += "/*\t\tprefilter global vars.\t*/\n";
4064 for(q=0;q<equiv_list.size();++q){
4065 for(p=0;p<=(num_equiv[q]/32);++p){
4066 ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4070 // Struct to hold prefilter complex literals
4071 ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4072 if(complex_literals->size() == 0)
4073 ret += "\tint no_variable;\n";
4075 for(cl=0;cl<complex_literals->size();cl++){
4076 literal_t *l = complex_literals->get_literal(cl);
4077 data_type *dtl = new data_type( l->get_type() );
4078 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4081 ret += "} prefilter_complex_lits_"+iface+";\n\n";
4084 // Generate the prefilter initialziation code
4085 ret += "void init_lfta_prefilter_"+iface+"(){\n";
4087 // First initialize complex literals, if any.
4088 ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4089 for(cl=0;cl<complex_literals->size();cl++){
4090 literal_t *l = complex_literals->get_literal(cl);
4091 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4092 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4096 set<int> epred_seen;
4097 for(p=0;p<pr_list.size();++p){
4098 int q = pred_class[p];
4099 //printf("\tq=%d\n",q);
4100 if(epred_seen.count(q)>0){
4101 ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4102 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4103 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4104 for(o=0;o<op_list.size();++o){
4106 ret += generate_se_code(op_list[o],Schema)+", ";
4109 ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4110 epred_seen.insert(q);
4112 ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4113 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4114 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4115 for(o=0;o<op_list.size();++o){
4117 ret += generate_se_code(op_list[o],Schema)+", ";
4120 ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4121 epred_seen.insert(q);
4128 // Start on main body code generation
4129 ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4132 ///--------------------------------------------------------------
4133 /// Generate and store the prefilter body,
4134 /// reuse it for the snap length calculator
4135 ///-------------------------------------------------------------
4138 body += "\tstruct packet *p = (struct packet *)pkt;\n";
4142 // Gather the colids to store unpacked variables.
4143 for(p=0;p<pred_list.size();++p){
4144 gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4147 // make the col_ids refer to the base tables, and
4148 // grab the col_ids with at least one unpacking function.
4149 for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4150 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4152 tmp_col_id.field = (*csi).field;
4153 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4154 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4155 cid_set.insert(tmp_col_id);
4156 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4157 if(fe->get_unpack_fcns().size()>0)
4158 upref_cids.insert(tmp_col_id);
4163 // Find the set of unpacking programs needed for the
4164 // prefilter fields.
4165 map<col_id, string,lt_col_id> ucol_fcn_map;
4166 find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4167 set<string> pref_ufcns;
4168 map<col_id, string,lt_col_id>::iterator mcis;
4169 for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4170 pref_ufcns.insert((*mcis).second);
4175 // Variables for unpacking attributes.
4176 body += "/*\t\tVariables for unpacking attributes\t*/\n";
4177 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4178 int schref = (*csi).schema_ref;
4179 int tblref = (*csi).tblvar_ref;
4180 string field = (*csi).field;
4181 data_type dt(Schema->get_type_name(schref,field));
4182 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4183 field.c_str(), tblref);
4185 sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4188 // Variables for unpacking temporal attributes.
4189 body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4190 for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4191 if (cid_set.count(*csi) == 0) {
4192 int schref = (*csi).schema_ref;
4193 int tblref = (*csi).tblvar_ref;
4194 string field = (*csi).field;
4195 data_type dt(Schema->get_type_name(schref,field));
4196 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4197 field.c_str(), tblref);
4204 // Variables for combinable predicate evaluation
4205 body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4206 for(q=0;q<equiv_list.size();++q){
4207 for(p=0;p<=(num_equiv[q]/32);++p){
4208 body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4213 // Variables that are always needed
4214 body += "/*\t\tVariables which are always needed\t*/\n";
4215 body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4216 body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4218 // Call the unpacking functions for the prefilter fields
4219 if(pref_ufcns.size() > 0)
4220 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4221 set<string>::iterator ssi;
4222 for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4223 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4227 // Unpack the accessed attributes
4228 body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4229 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4230 int tblref = (*csi).tblvar_ref;
4231 int schref = (*csi).schema_ref;
4232 string field = (*csi).field;
4233 sprintf(tmpstr,"\tret_%s_%d = (%s(p, &unpack_var_%s_%d) == 0);\n",
4234 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4238 // next unpack the temporal attributes and ignore the errors
4239 // We are assuming here that failed unpack of temporal attributes
4240 // is not going to overwrite the last stored value
4241 // Failed upacks are ignored
4242 for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
4243 int tblref = (*csi).tblvar_ref;
4244 int schref = (*csi).schema_ref;
4245 string field = (*csi).field;
4246 sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
4247 Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4251 // Evaluate the combinable predicates
4252 if(equiv_list.size()>0)
4253 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
4254 for(q=0;q<equiv_list.size();++q){
4255 for(p=0;p<=(num_equiv[q]/32);++p){
4257 // Only call the common eval fcn if all ref'd fields present.
4258 col_id_set pred_cids;
4259 col_id_set::iterator cpi;
4260 gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
4261 if(pred_cids.size()>0){
4263 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4264 if(cpi != pred_cids.begin())
4266 string field = (*cpi).field;
4267 int tblref = (*cpi).tblvar_ref;
4268 body += "ret_"+field+"_"+int_to_string(tblref);
4273 body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
4274 vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
4275 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4276 for(o=0;o<op_list.size();++o){
4278 body += ","+generate_se_code(op_list[o],Schema);
4286 for(p=0;p<pred_list.size();++p){
4287 col_id_set pred_cids;
4288 col_id_set::iterator cpi;
4289 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
4290 if(pred_cids.size()>0){
4292 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4293 if(cpi != pred_cids.begin())
4295 string field = (*cpi).field;
4296 int tblref = (*cpi).tblvar_ref;
4297 body += "ret_"+field+"_"+int_to_string(tblref);
4301 body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
4302 body+="\tbitpos = bitpos << 1;\n";
4305 // ---------------------------------------------------------------
4306 // Finished with the body of the prefilter
4307 // --------------------------------------------------------------
4311 // Collect fields referenced by an lfta but not
4312 // already unpacked for the prefilter.
4314 //printf("upref_cids is:\n");
4315 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
4316 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4317 //printf("pref_ufcns is:\n");
4318 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
4319 //printf("\t%s\n",(*ssi).c_str());
4322 for(l=0;l<lfta_cols.size();++l){
4323 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
4324 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4326 tmp_col_id.field = (*csi).field;
4327 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4328 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4329 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4330 set<string> fld_ufcns = fe->get_unpack_fcns();
4331 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(), upref_cids.count(tmp_col_id));
4332 if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
4333 // Ensure that this field not already unpacked.
4335 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
4336 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
4337 if(pref_ufcns.count((*ssi))){
4338 //printf("Field already unpacked.\n");
4343 //printf("\tadding to unpack list\n");
4344 upall_cids.insert(tmp_col_id);
4350 //printf("upall_cids is:\n");
4351 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
4352 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4354 // Get the set of unpacking programs for these.
4355 map<col_id, string,lt_col_id> uall_fcn_map;
4356 find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
4357 set<string> pall_ufcns;
4358 for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
4359 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
4360 pall_ufcns.insert((*mcis).second);
4363 // Iterate through the remaining set of unpacking function
4364 if(pall_ufcns.size() > 0)
4365 ret += "//\t\tcall all remaining field unpacking functions.\n";
4366 for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
4367 // gather the set of columns unpacked by this ufcn
4368 col_id_set fcol_set;
4369 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
4370 if(uall_fcn_map[(*csi)] == (*ssi))
4371 fcol_set.insert((*csi));
4374 // gather the set of lftas which access a field unpacked by the fcn
4375 set<long long int> clfta;
4376 for(l=0;l<lfta_cols.size();l++){
4377 for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
4378 if(lfta_cols[l].count((*csi)) > 0)
4381 if(csi != fcol_set.end())
4382 clfta.insert(lfta_sigs[l]);
4385 // generate the unpacking code
4387 set<long long int>::iterator sii;
4388 for(sii=clfta.begin();sii!=clfta.end();++sii){
4389 if(sii!=clfta.begin())
4391 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
4394 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4398 ret += "\treturn(retval);\n\n";
4402 // --------------------------------------------------------
4403 // reuse prefilter body for snaplen calculator
4405 // This is dummy code, so I'm commenting it out.
4408 ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
4413 vector<int> s_snaps = lfta_snap_lens;
4414 sort(s_snaps.begin(), s_snaps.end());
4416 if(s_snaps[0] == -1){
4417 set<unsigned long long int> sigset;
4418 for(i=0;i<lfta_snap_lens.size();++i){
4419 if(lfta_snap_lens[i] == -1){
4420 sigset.insert(lfta_sigs[i]);
4424 set<unsigned long long int>::iterator sulli;
4425 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4426 if(sulli!=sigset.begin())
4428 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4431 ret += ") return -1;\n";
4434 int nextpos = lfta_snap_lens.size()-1;
4435 int nextval = lfta_snap_lens[nextpos];
4436 while(nextval >= 0){
4437 set<unsigned long long int> sigset;
4438 for(i=0;i<lfta_snap_lens.size();++i){
4439 if(lfta_snap_lens[i] == nextval){
4440 sigset.insert(lfta_sigs[i]);
4444 set<unsigned long long int>::iterator sulli;
4445 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4446 if(sulli!=sigset.begin())
4448 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4451 ret += ") return "+int_to_string(nextval)+";\n";
4453 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
4455 nextval = lfta_snap_lens[nextpos];
4459 ret += "\treturn 0;\n";
4470 // Generate the struct which will store the the values of
4471 // temporal attributesunpacked by prefilter
4472 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
4474 col_id_set::iterator csi;
4476 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
4478 string ret="struct prefilter_unpacked_temp_vars {\n";
4479 ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
4483 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4484 int schref = (*csi).schema_ref;
4485 int tblref = (*csi).tblvar_ref;
4486 string field = (*csi).field;
4487 data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
4488 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4489 field.c_str(), tblref);
4492 if (init_code != "")
4494 if (dt.is_increasing())
4495 init_code += dt.get_min_literal();
4497 init_code += dt.get_max_literal();
4502 ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";