1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include "parse_fta.h"
23 #include "parse_schema.h"
24 #include "analyze_fta.h"
25 #include "generate_utils.h"
26 #include "query_plan.h"
27 #include "generate_lfta_code.h"
28 #include "generate_nic_code.h"
32 extern int DEFAULT_LFTA_HASH_TABLE_SIZE;
34 // default value for correlation between the interface card and
36 #define DEFAULT_TIME_CORR 16
41 extern string hash_nums[NRANDS];
44 "12916008961267169387ull", "13447227858232756685ull",
45 "15651770379918602919ull", "1154671861688431608ull",
46 "6777078091984849858ull", "14217205709582564356ull",
47 "4955408621820609982ull", "15813680319165523695ull",
48 "9897969721407807129ull", "5799700135519793083ull",
49 "3446529189623437397ull", "2766403683465910630ull",
50 "3759321430908793328ull", "6569396511892890354ull",
51 "11124853911180290924ull", "17425412145238035549ull",
52 "6879931585355039943ull", "16598635011539670441ull",
53 "9615975578494811651ull", "4378135509538422740ull",
54 "741282195344332574ull", "17368612862906255584ull",
55 "17294299200556814618ull", "518343398779663051ull",
56 "3861893449302272757ull", "8951107288843549591ull",
57 "15785139392894559409ull", "5917810836789601602ull",
58 "16169988133001117004ull", "9792861259254509262ull",
59 "5089058010244872136ull", "2130075224835397689ull",
60 "844136788226150435ull", "1303298091153875333ull",
61 "3579898206894361183ull", "7529542662845336496ull",
62 "13151949992653382522ull", "2145333536541545660ull",
63 "11258221828939586934ull", "3741808146124570279ull",
64 "16272841626371307089ull", "12174572036188391283ull",
65 "9749343496254107661ull", "9141275584134508830ull",
66 "10134192232065698216ull", "12944268412561423018ull",
67 "17499725811865666340ull", "5281482378159088661ull",
68 "13254803486023572607ull", "4526762838498717025ull",
69 "15990846379668494011ull", "10680949816169027468ull",
70 "7116154096012931030ull", "5296740689865236632ull",
71 "5222427027515795922ull", "6893215299448261251ull",
72 "10164707755932877485ull", "15325979189512082255ull",
73 "3713267224148573289ull", "12292682741753167354ull",
74 "4098115959960163588ull", "16095675565885113990ull",
75 "11391590846210510720ull", "8432889531466002673ull",
76 "7146668520368482523ull", "7678169991822407997ull",
77 "9882712513525031447ull", "13904414563513869160ull",
78 "1080076724395768626ull", "8448147843172150388ull",
79 "17633093729608185134ull", "10044622457050142303ull",
80 "4128911859292425737ull", "30642269109444395ull",
81 "16124215396922640581ull", "15444089895060081110ull",
82 "16437006538696302944ull", "800338649777443426ull",
83 "5355794945275091932ull", "11656354278827687117ull",
84 "1110873718944691255ull", "10829576045617693977ull",
85 "3846916616884579955ull", "17055821716837625668ull",
86 "13418968402643535758ull", "11671612594828802128ull",
87 "11597298928184328586ull", "13196028510862205499ull",
88 "16539578557089782373ull", "3182048322921507591ull",
89 "10016080431267550241ull", "148751875162592690ull",
90 "10400930266590768572ull", "4023803397139127870ull",
91 "17766462746879108920ull", "14807761432134600873ull",
92 "13521540421053792403ull", "13980983198941385205ull",
93 "16257584414193564367ull", "1760484796451765024ull"
98 // ----------------------------------------------
99 // Data extracted from the query plan node
100 // for use by code generation.
102 static cplx_lit_table *complex_literals; //Table of literals with constructors.
103 static vector<handle_param_tbl_entry *> param_handle_table;
104 static param_table *param_tbl; // Table of all referenced parameters.
106 static vector<scalarexp_t *> sl_list;
107 static vector<cnf_elem *> where;
109 static gb_table *gb_tbl; // Table of all group-by attributes.
110 static aggregate_table *aggr_tbl; // Table of all referenced aggregates.
112 static bool packed_return; // unpack using structyure, not fcns
113 static nic_property *nicprop; // nic properties for this interface.
114 static int global_id;
117 // The partial_fcns vector can now refer to
118 // partial functions, or expensive functions
119 // which can be cached (if there are multiple refs). A couple
120 // of int vectors distinguish the cases.
121 static vector<scalarexp_t *> partial_fcns;
122 static vector<int> fcn_ref_cnt;
123 static vector<bool> is_partial_fcn;
124 int sl_fcns_start = 0, sl_fcns_end = 0;
125 int wh_fcns_start = 0, wh_fcns_end = 0;
126 int gb_fcns_start = 0, gb_fcns_end = 0;
127 int ag_fcns_start = 0, ag_fcns_end = 0;
130 // These vectors are for combinable predicates.
131 static vector<int> pred_class; // identifies the group
132 static vector<int> pred_pos; // position in the group.
136 static char tmpstr[1000];
138 //////////////////////////////////////////////////////////////////////
139 /// Various utilities
141 string generate_fta_name(string node_name){
142 string ret = normalize_name(node_name);
152 string generate_aggr_struct_name(string node_name){
153 string ret = normalize_name(node_name);
157 ret += "_aggr_struct";
162 string generate_fj_struct_name(string node_name){
163 string ret = normalize_name(node_name);
172 string generate_unpack_code(int tblref, int schref, string field, table_list *schema, string node_name, string end_goto = string("end")){
175 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
176 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
178 if(!schema->get_modifier_list(schref,field)->contains_key("required"))
179 ret += "\tif(retval) goto "+end_goto+";\n";
182 // TODO: ntoh xforms (aug 2010 : removing ntoh, hton)
183 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
184 if(dt.is_buffer_type()){
185 if(dt.get_type() != v_str_t){
186 ret += "\tif(sizeof(struct "+node_name+"_input_struct)+"+node_name+"_input_struct_var->unpack_var_"+field+".length+int("+node_name+"_input_struct_var->unpack_var_"+field+".data) > sz)\n";
187 ret += "\t\tgoto "+end_goto+";\n";
188 ret+= "\t\t"+node_name+"_input_struct_var->unpack_var_"+field+".data += "+node_name+"_input_struct_var->unpack_var_"+field+".length;\n";
189 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
190 " = "+node_name+"_input_struct_var->unpack_var_"+field+";+\n";
192 fprintf(stderr,"INTERNAL ERROR buffer type not string type in generate_lfta_code.cc:generate_unpack_code\n");
196 ret += "\tunpack_var_"+field+"_"+int_to_string(tblref)+
197 " = "+node_name+"_input_struct_var->unpack_var_"+field+";\n";
203 string generate_aggr_struct(string node_name, gb_table *gb_tbl, aggregate_table *aggr_tbl){
204 string ret = "struct " + generate_aggr_struct_name(node_name) + "{\n";
207 for(g=0;g<gb_tbl->size();g++){
208 sprintf(tmpstr,"gb_var%d",g);
209 ret += "\t"+gb_tbl->get_data_type(g)->make_cvar(tmpstr)+";\n";
213 for(a=0;a<aggr_tbl->size();a++){
215 sprintf(tmpstr,"aggr_var%d",a);
216 if(aggr_tbl->is_builtin(a))
217 ret+="\t"+aggr_tbl->get_data_type(a)->make_cvar(tmpstr)+";\n";
219 ret+="\t"+aggr_tbl->get_storage_type(a)->make_cvar(tmpstr)+";\n";
223 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *next;\n";
232 string generate_fj_struct(filter_join_qpn *fs, string node_name ){
235 if(fs->use_bloom == false){ // uses hash table instead
236 ret = "struct " + generate_fj_struct_name(node_name) + "{\n";
238 for(k=0;k<fs->hash_eq.size();++k){
239 sprintf(tmpstr,"key_var%d",k);
240 ret += "\t"+fs->hash_eq[k]->pr->get_right_se()->get_data_type()->make_cvar(tmpstr)+";\n";
242 ret += "\tlong long int ts;\n";
252 string generate_fta_struct(string node_name, gb_table *gb_tbl,
253 aggregate_table *aggr_tbl, param_table *param_tbl,
254 cplx_lit_table *complex_literals,
255 vector<handle_param_tbl_entry *> ¶m_handle_table,
256 bool is_aggr_query, bool is_fj, bool uses_bloom,
259 string ret = "struct " + generate_fta_name(node_name) + "{\n";
260 ret += "\tstruct FTA f;\n";
262 //-------------------------------------------------------------
263 // Aggregate-specific fields
267 ret += "\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_head, *flush_head;\n";
269 ret+="\tstruct "+generate_aggr_struct_name(node_name)+" *aggr_table; // the groups\n";
270 ret+="\tgs_uint32_t *aggr_table_hashmap; // hash val, plus control info.\n";
271 // ret+="\tint bitmap_size;\n";
272 ret += "\tint n_aggrs; // # of non-empty slots in aggr_table\n";
273 ret += "\tint max_aggrs; // size of aggr_table and its hashmap.\n";
274 ret += "\tint max_windows; // max number of open windows.\n";
275 ret += "\tunsigned int generation; // initially zero, increment on\n";
276 ret += "\t // every hash table flush - whether regular or induced.\n";
277 ret += "\t // Old groups are identified by a generation mismatch.\n";
278 ret += "\tunsigned int flush_pos; // next aggr_table entry to examine\n";
279 ret += "\tunsigned int flush_ctr; // control slow flushing\n";
284 bool uses_temporal_flush = false;
285 for(g=0;g<gb_tbl->size();g++){
286 data_type *dt = gb_tbl->get_data_type(g);
287 if(dt->is_temporal()){
289 fprintf(stderr,"group by attribute %s is temporal, ",
290 gb_tbl->get_name(g).c_str());
291 if(dt->is_increasing()){
292 fprintf(stderr,"increasing.\n");
294 fprintf(stderr,"decreasing.\n");
297 data_type *gdt = gb_tbl->get_data_type(g);
298 if(gdt->is_buffer_type()){
299 fprintf(stderr, "\t but temporal BUFFER types are not supported, skipping.\n");
301 sprintf(tmpstr,"\t%s last_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
303 sprintf(tmpstr,"\t%s flush_start_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
305 sprintf(tmpstr,"\t%s last_flushed_gb_%d;\n",gdt->get_cvar_type().c_str(),g);
307 uses_temporal_flush = true;
313 if(! uses_temporal_flush){
314 fprintf(stderr,"Warning: no temporal flush.\n");
318 // ---------------------------------------------------------
319 // Filter-join specific fields
324 "\tunsigned char * bf_table; //array of bloom filters with layout \n"
325 "\t\t// bit 0 bf 0| bit 0 bf 1| bit 0 bf 2| bit 1 bf 0| bit 1 bf 1|.....\n"
326 "\tint first_exec;\n"
327 "\tlong long int last_bin;\n"
328 "\tint last_bloom_pos;\n"
331 }else{ // limited hash table
333 " struct "+generate_fj_struct_name(node_name)+" *join_table;\n"
340 //--------------------------------------------------------
343 // Create places to hold the parameters.
345 vector<string> param_vec = param_tbl->get_param_names();
346 for(p=0;p<param_vec.size();p++){
347 data_type *dt = param_tbl->get_data_type(param_vec[p]);
348 sprintf(tmpstr,"\t%s param_%s;\n",dt->get_cvar_type().c_str(),
349 param_vec[p].c_str());
351 if(param_tbl->handle_access(param_vec[p])){
352 ret += "\tstruct search_handle *param_handle_"+param_vec[p]+";\n";
356 // Create places to hold complex literals.
358 for(cl=0;cl<complex_literals->size();cl++){
359 literal_t *l = complex_literals->get_literal(cl);
360 data_type *dtl = new data_type( l->get_type() );
361 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
365 // Create places to hold the pass-by-handle parameters.
366 for(p=0;p<param_handle_table.size();++p){
367 sprintf(tmpstr,"\tgs_param_handle_t handle_param_%d;\n",p);
371 // Create places to hold the last values of temporal
372 // attributes referenced in select clause
373 // we also need to store values of the temoral attributed
374 // of last flushed tuple in aggr queries
375 // to make sure we generate the cirrect temporal tuple
376 // in the presense of slow flushes
379 col_id_set temp_cids; // col ids of temp attributes in select clause
382 col_id_set::iterator csi;
384 for(s=0;s<sl_list.size();s++){
385 data_type *sdt = sl_list[s]->get_data_type();
386 if (sdt->is_temporal()) {
387 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
391 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
392 int tblref = (*csi).tblvar_ref;
393 int schref = (*csi).schema_ref;
394 string field = (*csi).field;
395 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
396 sprintf(tmpstr,"\t%s last_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
400 ret += "\tgs_uint64_t trace_id;\n\n";
402 // Fields to store the runtime stats
404 ret += "\tgs_uint32_t in_tuple_cnt;\n";
405 ret += "\tgs_uint32_t out_tuple_cnt;\n";
406 ret += "\tgs_uint32_t out_tuple_sz;\n";
407 ret += "\tgs_uint32_t accepted_tuple_cnt;\n";
408 ret += "\tgs_uint64_t cycle_cnt;\n";
409 ret += "\tgs_uint32_t collision_cnt;\n";
410 ret += "\tgs_uint32_t eviction_cnt;\n";
411 ret += "\tgs_float_t sampling_rate;\n";
420 //------------------------------------------------------------
421 // Set colref tblvars to 0..
422 // (special processing for join-like operators in an lfta).
424 void reset_se_col_ids_tblvars(scalarexp_t *se, gb_table *gtbl){
425 vector<scalarexp_t *> operands;
431 switch(se->get_operator_type()){
437 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
440 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
441 reset_se_col_ids_tblvars(se->get_right_se(),gtbl);
445 se->get_colref()->set_tablevar_ref(0);
448 fprintf(stderr,"INTERNAL ERROR: gbvar ref in gather_se_col_ids, but gtbl is NULL.\n");
451 reset_se_col_ids_tblvars(gtbl->get_def(se->get_gb_ref()),gtbl);
457 reset_se_col_ids_tblvars(se->get_left_se(),gtbl);
460 operands = se->get_operands();
461 for(o=0;o<operands.size();o++){
462 reset_se_col_ids_tblvars(operands[o], gtbl);
466 fprintf(stderr,"INTERNAL ERROR in reset_se_col_ids_tblvars, line %d, character %d: unknown operator type %d\n",
467 se->get_lineno(), se->get_charno(),se->get_operator_type());
473 // reset column tblvars accessed in this pr.
475 void reset_pr_col_ids_tblvars(predicate_t *pr, gb_table *gtbl){
476 vector<scalarexp_t *> op_list;
479 switch(pr->get_operator_type()){
481 reset_se_col_ids_tblvars(pr->get_left_se(), gtbl);
484 reset_se_col_ids_tblvars(pr->get_left_se(),gtbl) ;
485 reset_se_col_ids_tblvars(pr->get_right_se(),gtbl) ;
488 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
491 reset_pr_col_ids_tblvars(pr->get_left_pr(),gtbl) ;
492 reset_pr_col_ids_tblvars(pr->get_right_pr(),gtbl) ;
495 op_list = pr->get_op_list();
496 for(o=0;o<op_list.size();++o){
497 reset_se_col_ids_tblvars(op_list[o],gtbl) ;
501 fprintf(stderr,"INTERNAL ERROR in reset_pr_col_ids_tblvars, line %d, character %d, unknown predicate operator type %d\n",
502 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
509 // Generate code that makes reference
510 // to the tuple, and not to any aggregates.
511 static string generate_se_code(scalarexp_t *se,table_list *schema){
513 data_type *ldt, *rdt;
515 vector<scalarexp_t *> operands;
518 switch(se->get_operator_type()){
520 if(se->is_handle_ref()){
521 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
525 if(se->get_literal()->is_cpx_lit()){
526 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
530 return(se->get_literal()->to_C_code("")); // not complex, no constructor
532 if(se->is_handle_ref()){
533 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
538 ret += se->get_param_name();
541 ldt = se->get_left_se()->get_data_type();
542 if(ldt->complex_operator(se->get_op()) ){
543 ret += ldt->get_complex_operator(se->get_op());
545 ret += generate_se_code(se->get_left_se(),schema);
550 ret += generate_se_code(se->get_left_se(),schema);
555 ldt = se->get_left_se()->get_data_type();
556 rdt = se->get_right_se()->get_data_type();
558 if(ldt->complex_operator(rdt, se->get_op()) ){
559 ret += ldt->get_complex_operator(rdt, se->get_op());
561 ret += generate_se_code(se->get_left_se(),schema);
563 ret += generate_se_code(se->get_right_se(),schema);
567 ret += generate_se_code(se->get_left_se(),schema);
569 ret += generate_se_code(se->get_right_se(),schema);
574 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet unpacked ...
575 // so return the defining code.
576 ret = generate_se_code(gb_tbl->get_def(se->get_gb_ref()), schema );
579 sprintf(tmpstr,"unpack_var_%s_%d",
580 se->get_colref()->get_field().c_str(), se->get_colref()->get_tablevar_ref() );
585 // Should not be ref'ing any aggr here.
586 if(se->get_aggr_ref() >= 0){
587 fprintf(stderr,"INTERNAL ERROR, UDAF reference in generate_se_code.\n");
588 return("ERROR in generate_se_code");
591 if(se->is_partial()){
592 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
596 operands = se->get_operands();
597 for(o=0;o<operands.size();o++){
599 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
601 ret += generate_se_code(operands[o], schema);
607 fprintf(stderr,"INTERNAL ERROR in generate_se_code (lfta), line %d, character %d: unknown operator type %d\n",
608 se->get_lineno(), se->get_charno(),se->get_operator_type());
609 return("ERROR in generate_se_code");
613 // generate code that refers only to aggregate data and constants.
614 static string generate_se_code_fm_aggr(scalarexp_t *se, string var, table_list *schema){
617 data_type *ldt, *rdt;
619 vector<scalarexp_t *> operands;
622 switch(se->get_operator_type()){
624 if(se->is_handle_ref()){
625 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
629 if(se->get_literal()->is_cpx_lit()){
630 sprintf(tmpstr,"t->complex_literal_%d",se->get_literal()->get_cpx_lit_ref() );
634 return(se->get_literal()->to_C_code("")); // not complex no constructor
636 if(se->is_handle_ref()){
637 sprintf(tmpstr,"t->handle_param_%d",se->get_handle_ref() );
642 ret += se->get_param_name();
645 ldt = se->get_left_se()->get_data_type();
646 if(ldt->complex_operator(se->get_op()) ){
647 ret += ldt->get_complex_operator(se->get_op());
649 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
654 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
659 ldt = se->get_left_se()->get_data_type();
660 rdt = se->get_right_se()->get_data_type();
662 if(ldt->complex_operator(rdt, se->get_op()) ){
663 ret += ldt->get_complex_operator(rdt, se->get_op());
665 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
667 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
671 ret += generate_se_code_fm_aggr(se->get_left_se(),var,schema);
673 ret += generate_se_code_fm_aggr(se->get_right_se(),var,schema);
678 if(se->is_gb()){ // OK to ref gb attrs, but they're not yet
679 // unpacked ... so return the defining code.
680 sprintf(tmpstr,"%sgb_var%d",var.c_str(),se->get_gb_ref());
684 fprintf(stderr,"ERROR reference to non-GB column ref not permitted here,"
685 "error in generate_se_code_fm_aggr, line %d, character %d.\n",
686 se->get_lineno(), se->get_charno());
692 sprintf(tmpstr,"%saggr_var%d",var.c_str(),se->get_aggr_ref());
697 if(se->get_aggr_ref() >= 0){
698 sprintf(tmpstr,"udaf_ret%d",se->get_aggr_ref());
703 if(se->is_partial()){
704 sprintf(tmpstr,"partial_fcn_result_%d",se->get_partial_ref());
708 operands = se->get_operands();
709 for(o=0;o<operands.size();o++){
711 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
713 ret += generate_se_code_fm_aggr(operands[o], var, schema);
719 fprintf(stderr,"INTERNAL ERROR in generate_lfta_code.cc::generate_se_code_fm_aggr, line %d, character %d: unknown operator type %d\n",
720 se->get_lineno(), se->get_charno(),se->get_operator_type());
721 return("ERROR in generate_se_code");
727 static string unpack_partial_fcn_fm_aggr(scalarexp_t *se, int pfn_id, string var, table_list *schema){
730 vector<scalarexp_t *> operands;
733 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
734 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn_fm_aggr. line %d, character %d\n",
735 se->get_lineno(), se->get_charno());
736 return("ERROR in generate_se_code");
739 ret = "\tretval = " + se->get_op() + "( ";
740 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
743 operands = se->get_operands();
744 for(o=0;o<operands.size();o++){
746 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
748 ret += generate_se_code_fm_aggr(operands[o], var, schema);
755 static string generate_cached_fcn(scalarexp_t *se, table_list *schema){
758 vector<scalarexp_t *> operands;
760 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
761 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to generate_cached_fcn. line %d, character %d\n",
762 se->get_lineno(), se->get_charno());
763 return("ERROR in generate_se_code");
766 ret = se->get_op() + "( ";
768 operands = se->get_operands();
769 for(o=0;o<operands.size();o++){
771 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
773 ret += generate_se_code(operands[o], schema);
782 static string unpack_partial_fcn(scalarexp_t *se, int pfn_id, table_list *schema){
785 vector<scalarexp_t *> operands;
788 if(se->get_operator_type() != SE_FUNC || se->get_aggr_ref() >= 0){
789 fprintf(stderr,"INTERNAL ERROR, non-function SE passed to unpack_partial_fcn. line %d, character %d\n",
790 se->get_lineno(), se->get_charno());
791 return("ERROR in generate_se_code");
794 ret = "\tretval = " + se->get_op() + "( ",
795 sprintf(tmpstr, "&partial_fcn_result_%d",pfn_id);
798 operands = se->get_operands();
799 for(o=0;o<operands.size();o++){
801 if(operands[o]->get_data_type()->is_buffer_type() && (! (operands[o]->is_handle_ref()) ) )
803 ret += generate_se_code(operands[o], schema);
814 static string generate_C_comparison_op(string op){
815 if(op == "=") return("==");
816 if(op == "<>") return("!=");
820 static string generate_C_boolean_op(string op){
821 if( (op == "AND") || (op == "And") || (op == "and") ){
824 if( (op == "OR") || (op == "Or") || (op == "or") ){
827 if( (op == "NOT") || (op == "Not") || (op == "not") ){
831 fprintf(stderr,"INTERNAL ERROR: unknown boolean operator %s\n",op.c_str());
832 return("ERROR UNKNOWN BOOLEAN OPERATOR :"+op);
836 static string generate_predicate_code(predicate_t *pr,table_list *schema){
838 vector<literal_t *> litv;
840 data_type *ldt, *rdt;
841 vector<scalarexp_t *> op_list;
843 unsigned int bitmask;
845 switch(pr->get_operator_type()){
847 ldt = pr->get_left_se()->get_data_type();
850 litv = pr->get_lit_vec();
851 for(i=0;i<litv.size();i++){
852 if(i>0) ret += " || ";
855 if(ldt->complex_comparison(ldt) ){
856 ret += ldt->get_comparison_fcn(ldt) ;
858 if(ldt->is_buffer_type() ) ret += "&";
859 ret += generate_se_code(pr->get_left_se(), schema);
861 if(ldt->is_buffer_type() ) ret += "&";
862 if(litv[i]->is_cpx_lit()){
863 sprintf(tmpstr,"t->complex_literal_%d",litv[i]->get_cpx_lit_ref() );
866 ret += litv[i]->to_C_code("");
870 ret += generate_se_code(pr->get_left_se(), schema);
872 ret += litv[i]->to_C_code("");
881 ldt = pr->get_left_se()->get_data_type();
882 rdt = pr->get_right_se()->get_data_type();
885 if(ldt->complex_comparison(rdt) ){
886 ret += ldt->get_comparison_fcn(rdt);
888 if(ldt->is_buffer_type() ) ret += "&";
889 ret += generate_se_code(pr->get_left_se(),schema);
891 if(rdt->is_buffer_type() ) ret += "&";
892 ret += generate_se_code(pr->get_right_se(),schema);
894 ret += generate_C_comparison_op(pr->get_op());
897 ret += generate_se_code(pr->get_left_se(),schema);
898 ret += generate_C_comparison_op(pr->get_op());
899 ret += generate_se_code(pr->get_right_se(),schema);
905 ret += generate_C_boolean_op(pr->get_op());
906 ret += generate_predicate_code(pr->get_left_pr(),schema);
911 ret += generate_predicate_code(pr->get_left_pr(),schema);
912 ret += generate_C_boolean_op(pr->get_op());
913 ret += generate_predicate_code(pr->get_right_pr(),schema);
917 op_list = pr->get_op_list();
918 cref = pr->get_combinable_ref();
919 if(cref >= 0){ // predicate is a combinable pred reference
921 if(pred_class.size() >= cref && pred_class[cref] >= 0){
922 ppos = pred_pos[cref];
923 bitmask = 1 << ppos % 32;
924 sprintf(tmpstr,"(pref_common_pred_val_%d_%d & %u)",pred_class[cref],ppos/32,bitmask);
930 ret = pr->get_op() + "(";
931 if (pr->is_sampling_fcn) {
932 ret += "t->sampling_rate";
933 if (!op_list.empty())
936 for(o=0;o<op_list.size();++o){
938 if(op_list[o]->get_data_type()->is_buffer_type() && (! (op_list[o]->is_handle_ref()) ) )
940 ret += generate_se_code(op_list[o],schema);
945 fprintf(stderr,"INTERNAL ERROR in generate_predicate_code, line %d, character %d, unknown predicate operator type %d\n",
946 pr->get_lineno(), pr->get_charno(), pr->get_operator_type() );
947 return("ERROR in generate_predicate_code");
952 static string generate_equality_test(string &lhs_op, string &rhs_op, data_type *dt){
955 if(dt->complex_comparison(dt) ){
956 ret += dt->get_comparison_fcn(dt);
958 if(dt->is_buffer_type() ) ret += "&";
961 if(dt->is_buffer_type() ) ret += "&";
973 static string generate_comparison(string &lhs_op, string &rhs_op, data_type *dt){
976 if(dt->complex_comparison(dt) ){
977 ret += dt->get_comparison_fcn(dt);
979 if(dt->is_buffer_type() ) ret += "&";
982 if(dt->is_buffer_type() ) ret += "&";
994 // Here I assume that only MIN and MAX aggregates can be computed
995 // over BUFFER data types.
997 static string generate_aggr_update(string var, aggregate_table *atbl,int aidx, table_list *schema){
998 string retval = "\t\t";
999 string op = atbl->get_op(aidx);
1002 if(! atbl->is_builtin(aidx)) {
1004 retval += op+"_LFTA_AGGR_UPDATE_(";
1005 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1006 retval+="("+var+")";
1007 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1008 for(o=0;o<opl.size();++o){
1010 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1012 retval += generate_se_code(opl[o], schema);
1019 // Built-in aggregate processing.
1021 data_type *dt = atbl->get_data_type(aidx);
1025 retval.append("++;\n");
1030 retval.append(" += ");
1031 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1032 retval.append(";\n");
1036 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1037 retval.append(tmpstr);
1038 if(dt->complex_comparison(dt)){
1039 if(dt->is_buffer_type())
1040 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1042 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) < 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1044 sprintf(tmpstr,"\t\tif(aggr_tmp_%d < %s)\n",aidx,var.c_str());
1046 retval.append(tmpstr);
1047 if(dt->is_buffer_type()){
1048 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1050 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1052 retval.append(tmpstr);
1057 sprintf(tmpstr,"aggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1058 retval.append(tmpstr);
1059 if(dt->complex_comparison(dt)){
1060 if(dt->is_buffer_type())
1061 sprintf(tmpstr,"\t\tif(%s(&aggr_tmp_%d,&(%s)) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1063 sprintf(tmpstr,"\t\tif(%s(aggr_tmp_%d,%s) > 0)\n",dt->get_comparison_fcn(dt).c_str(), aidx, var.c_str());
1065 sprintf(tmpstr,"\t\tif(aggr_tmp_%d > %s)\n",aidx,var.c_str());
1067 retval.append(tmpstr);
1068 if(dt->is_buffer_type()){
1069 sprintf(tmpstr,"\t\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_replace().c_str(),var.c_str(),aidx);
1071 sprintf(tmpstr,"\t\t\t%s = aggr_tmp_%d;\n",var.c_str(),aidx);
1073 retval.append(tmpstr);
1078 if(op == "AND_AGGR"){
1080 retval.append(" &= ");
1081 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1082 retval.append(";\n");
1085 if(op == "OR_AGGR"){
1087 retval.append(" |= ");
1088 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1089 retval.append(";\n");
1092 if(op == "XOR_AGGR"){
1094 retval.append(" ^= ");
1095 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema) );
1096 retval.append(";\n");
1099 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_update.\n",op.c_str());
1100 return("ERROR: aggregate not recognized: "+op);
1106 static string generate_aggr_init(string var, aggregate_table *atbl,int aidx, table_list *schema){
1108 string op = atbl->get_op(aidx);
1111 if(! atbl->is_builtin(aidx)) {
1113 retval += "\t\t"+op+"_LFTA_AGGR_INIT_(";
1114 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1115 retval+="("+var+"));\n";
1117 retval += "\t"+atbl->get_op(aidx)+"_LFTA_AGGR_UPDATE_(";
1118 if(atbl->get_storage_type(aidx)->get_type() != fstring_t) retval+="&";
1119 retval+="("+var+")";
1120 vector<scalarexp_t *> opl = atbl->get_operand_list(aidx);
1121 for(o=0;o<opl.size();++o){
1123 if(opl[o]->get_data_type()->is_buffer_type() && (! (opl[o]->is_handle_ref()) ) )
1125 retval += generate_se_code(opl[o],schema);
1131 // Built-in aggregate processing.
1134 data_type *dt = atbl->get_data_type(aidx);
1137 retval = "\t\t"+var;
1138 retval.append(" = 1;\n");
1142 if(op == "SUM" || op == "MIN" || op == "MAX" || op == "AND_AGGR" ||
1143 op == "OR_AGGR" || op == "XOR_AGGR"){
1144 if(dt->is_buffer_type()){
1145 sprintf(tmpstr,"\t\taggr_tmp_%d = %s;\n",aidx,generate_se_code(atbl->get_aggr_se(aidx), schema ).c_str() );
1146 retval.append(tmpstr);
1147 sprintf(tmpstr,"\t\t%s(f,&(%s),&aggr_tmp_%d);\n",dt->get_buffer_assign_copy().c_str(),var.c_str(),aidx);
1148 retval.append(tmpstr);
1150 retval = "\t\t"+var;
1152 retval.append(generate_se_code(atbl->get_aggr_se(aidx), schema));
1153 retval.append(";\n");
1158 fprintf(stderr,"INTERNAL ERROR : aggregate %s not recognized in generate_aggr_init.\n",op.c_str());
1159 return("ERROR: aggregate not recognized: "+op);
1163 ////////////////////////////////////////////////////////////
1166 string generate_preamble(table_list *schema, //map<string,string> &int_fcn_defs,
1167 std::string &node_name, std::string &schema_embed_str){
1168 // Include these only once, not once per lfta
1169 // string ret = "#include \"rts.h\"\n";
1170 // ret += "#include \"fta.h\"\n\n");
1172 string ret = "#ifndef LFTA_IN_NIC\n";
1173 ret += "char *"+generate_schema_string_name(node_name)+" = " +schema_embed_str+";\n";
1174 ret += "#include<stdio.h>\n";
1175 ret += "#include <limits.h>\n";
1176 ret += "#include <float.h>\n";
1177 ret += "#include \"rdtsc.h\"\n";
1186 string generate_tuple_from_aggr(string node_name, table_list *schema, string idx){
1188 // need to create and output the tuple.
1189 string ret = "/*\t\tCreate an output tuple for the aggregate being kicked out \t*/\n";
1190 // Check for any UDAFs with LFTA_BAILOUT
1191 ret += "\tlfta_bailout = 0;\n";
1192 for(a=0;a<aggr_tbl->size();a++){
1193 if(aggr_tbl->has_bailout(a)){
1194 ret += "\tlfta_bailout+="+aggr_tbl->get_op(a)+"_LFTA_AGGR_BAILOUT_(";
1195 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1196 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1199 ret += "\tif(! lfta_bailout){\n";
1201 // First, compute the size of the tuple.
1203 // Unpack UDAF return values
1204 for(a=0;a<aggr_tbl->size();a++){
1205 if(! aggr_tbl->is_builtin(a)){
1206 ret += "\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_OUTPUT_(&(udaf_ret"+int_to_string(a)+"),";
1207 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1208 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1214 // Unpack partial fcns ref'd by the select clause.
1215 if(sl_fcns_start != sl_fcns_end){
1216 ret += "\t\tunpack_failed = 0;\n";
1217 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1218 if(is_partial_fcn[p]){
1219 ret += "\t" + unpack_partial_fcn_fm_aggr(partial_fcns[p], p,
1220 "t->aggr_table["+idx+"].",schema);
1221 ret += "\t\tif(retval) unpack_failed = 1;\n";
1224 // BEGIN don't allocate tuple if
1225 ret += "\t\tif( unpack_failed == 0 ){\n"; // unpack failed.
1228 // Unpack any BUFFER type selections into temporaries
1229 // so that I can compute their size and not have
1230 // to recompute their value during tuple packing.
1231 // I can use regular assignment here because
1232 // these temporaries are non-persistent.
1234 for(s=0;s<sl_list.size();s++){
1235 data_type *sdt = sl_list[s]->get_data_type();
1236 if(sdt->is_buffer_type()){
1237 sprintf(tmpstr,"\t\t\tselvar_%d = ",s);
1239 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1245 // The size of the tuple is the size of the tuple struct plus the
1246 // size of the buffers to be copied in.
1248 ret += "\t\t\ttuple_size = sizeof( struct ";
1249 ret += generate_tuple_name(node_name);
1251 for(s=0;s<sl_list.size();s++){
1252 data_type *sdt = sl_list[s]->get_data_type();
1253 if(sdt->is_buffer_type()){
1254 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
1261 ret += "\t\t\ttuple = allocate_tuple(f, tuple_size );\n";
1262 ret += "\t\t\tif( tuple != NULL){\n";
1265 // Test passed, make assignments to the tuple.
1267 ret += "\t\t\t\ttuple_pos = sizeof( struct ";
1268 ret += generate_tuple_name(node_name) ;
1271 // Mark tuple as REGULAR_TUPLE
1272 ret += "\n\t\t\t\ttuple->tuple_type = REGULAR_TUPLE;\n";
1274 for(s=0;s<sl_list.size();s++){
1275 data_type *sdt = sl_list[s]->get_data_type();
1276 if(sdt->is_buffer_type()){
1277 sprintf(tmpstr,"\t\t\t\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
1279 sprintf(tmpstr,"\t\t\t\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
1282 sprintf(tmpstr,"\t\t\t\ttuple->tuple_var%d = ",s);
1284 // if(sdt->needs_hn_translation())
1285 // ret += sdt->hton_translation() +"( ";
1286 ret += generate_se_code_fm_aggr(sl_list[s],"t->aggr_table["+idx+"].",schema);
1287 // if(sdt->needs_hn_translation())
1294 ret += "\t\t\t\tpost_tuple(tuple);\n";
1295 ret += "\t\t\t\t#ifdef LFTA_STATS\n";
1296 ret+="\t\t\t\tt->out_tuple_cnt++;\n";
1297 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
1298 ret += "\t\t\t\t#endif\n\n";
1301 if(sl_fcns_start != sl_fcns_end) // END don't allocate tuple if
1302 ret += "\t\t}\n"; // unpack failed.
1305 // Need to release memory held by BUFFER types.
1308 for(g=0;g<gb_tbl->size();g++){
1309 data_type *gdt = gb_tbl->get_data_type(g);
1310 if(gdt->is_buffer_type()){
1311 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].gb_var%d));\n",gdt->get_buffer_destroy().c_str(),idx.c_str(),g);
1315 for(a=0;a<aggr_tbl->size();a++){
1316 if(aggr_tbl->is_builtin(a)){
1317 data_type *adt = aggr_tbl->get_data_type(a);
1318 if(adt->is_buffer_type()){
1319 sprintf(tmpstr,"\t\t\t%s(&(t->aggr_table[%s].aggr_var%d));\n",adt->get_buffer_destroy().c_str(),idx.c_str(),a);
1323 ret += "\t\t"+aggr_tbl->get_op(a)+"_LFTA_AGGR_DESTROY_(";
1324 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1325 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"));\n";
1329 ret += "\t\tt->n_aggrs--;\n";
1335 string generate_gb_match_test(string idx){
1337 string ret="\tif (IS_FILLED(t->aggr_table_bitmap, "+idx+") && IS_NEW(t->aggr_table_bitmap,"+idx+")";
1338 if(gb_tbl->size()>0){
1339 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
1342 // Next, scan list for a match on the group-by attributes.
1343 string rhs_op, lhs_op;
1344 for(g=0;g<gb_tbl->size();g++){
1347 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
1348 sprintf(tmpstr,"t->aggr_table[%s].gb_var%d",idx.c_str(),g); rhs_op = tmpstr;
1349 ret += generate_equality_test(lhs_op, rhs_op, gb_tbl->get_data_type(g) );
1359 string generate_gb_update(string node_name, table_list *schema, string idx, bool has_udaf){
1363 ret += "/*\t\tMatch found : update in place.\t*/\n";
1366 for(a=0;a<aggr_tbl->size();a++){
1367 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1368 ret += generate_aggr_update(tmpstr,aggr_tbl,a, schema);
1369 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
1372 // garbage collect copied buffer type gb attrs.
1373 for(g=0;g<gb_tbl->size();g++){
1374 data_type *gdt = gb_tbl->get_data_type(g);
1375 if(gdt->is_buffer_type()){
1376 sprintf(tmpstr,"\t\t\t%s(&(gb_attr_%d));\n",gdt->get_buffer_destroy().c_str(),g);
1383 bool first_udaf = true;
1386 for(a=0;a<aggr_tbl->size();a++){
1387 if(! aggr_tbl->is_builtin(a)){
1388 if(! first_udaf)ret += " || ";
1389 else first_udaf = false;
1390 ret += aggr_tbl->get_op(a)+"_LFTA_AGGR_FLUSHME_(";
1391 if(aggr_tbl->get_storage_type(a)->get_type() != fstring_t) ret+="&";
1392 ret+="(t->aggr_table["+idx+"].aggr_var"+int_to_string(a)+"))";
1396 ret+=" fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1397 ret += generate_tuple_from_aggr(node_name,schema,idx);
1398 ret += "\t\tt->aggr_table_hashmap["+idx+"] &= ~SLOT_FILLED;\n";
1405 string generate_init_group( table_list *schema, string idx){
1407 string ret="\t\t\tt->aggr_table_hashmap["+idx+"] = hash2 | SLOT_FILLED | gen_val;\n";
1408 // Fill up the aggregate block.
1409 for(g=0;g<gb_tbl->size();g++){
1410 sprintf(tmpstr,"\t\t\tt->aggr_table[%s].gb_var%d = gb_attr_%d;\n",idx.c_str(),g,g);
1413 for(a=0;a<aggr_tbl->size();a++){
1414 sprintf(tmpstr,"t->aggr_table[%s].aggr_var%d",idx.c_str(),a);
1415 ret += generate_aggr_init(tmpstr, aggr_tbl,a, schema);
1417 ret+="\t\tt->n_aggrs++;\n";
1422 string generate_fta_flush(string node_name, table_list *schema,
1423 ext_fcn_list *Ext_fcns){
1426 string select_var_defs ;
1429 // Flush from previous epoch
1431 ret+="static void fta_aggr_flush_old_"+node_name+"(struct FTA *f, unsigned int nflush){\n";
1433 ret += "\tgs_int32_t tuple_size, tuple_pos;\n";
1434 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1435 ret += "\tint i, lfta_bailout;\n";
1436 ret += "\tunsigned int gen_val;\n";
1438 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1439 ret += generate_fta_name(node_name)+" *) f;\n";
1444 // Variables needed to store selected attributes of BUFFER type
1445 // temporarily, in order to compute their size for storage
1446 // in an output tuple.
1448 select_var_defs = "";
1449 for(s=0;s<sl_list.size();s++){
1450 data_type *sdt = sl_list[s]->get_data_type();
1451 if(sdt->is_buffer_type()){
1452 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
1453 select_var_defs.append(tmpstr);
1456 if(select_var_defs != ""){
1457 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
1458 ret += select_var_defs;
1462 // Variables to store results of partial functions.
1463 if(sl_fcns_start != sl_fcns_end){
1464 ret += "/*\t\tVariables to store the results of partial functions.\t*/\n";
1465 for(p=sl_fcns_start;p<sl_fcns_end;p++){
1466 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
1467 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
1470 ret += "\tgs_retval_t retval = 0;\n\tint unpack_failed = 0;\n;";
1473 // Variables for udaf output temporaries
1474 bool no_udaf = true;
1476 for(a=0;a<aggr_tbl->size();a++){
1477 if(! aggr_tbl->is_builtin(a)){
1479 ret+="/*\t\tUDAF output vars.\t*/\n";
1482 int afcn_id = aggr_tbl->get_fcn_id(a);
1483 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
1484 sprintf(tmpstr,"udaf_ret%d", a);
1485 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
1490 // ret+="\tt->flush_finished=1; /* flush will be completed */\n";
1492 ret+="\tgen_val = t->generation & SLOT_GEN_BITS;\n";
1493 ret+="\tfor (i=t->flush_pos; (i < t->max_aggrs) && t->n_aggrs && nflush>0; ++i){\n";
1494 ret+="\t\tif ( (t->aggr_table_hashmap[i] & SLOT_FILLED) && (((t->aggr_table_hashmap[i] & SLOT_GEN_BITS) != gen_val ) || (";
1497 for(g=0;g<gb_tbl->size();g++){
1498 data_type *gdt = gb_tbl->get_data_type(g);
1499 if(gdt->is_temporal()){
1500 if(first_g) first_g=false; else ret+=" || ";
1501 ret += "t->last_gb_"+int_to_string(g)+" > t->aggr_table[i].gb_var"+int_to_string(g)+" ";
1505 ret+="\t\t\tt->aggr_table_hashmap[i] = 0;\n";
1507 "#ifdef LFTA_STATS\n"
1508 "\t\t\tt->eviction_cnt++;\n"
1513 ret+=generate_tuple_from_aggr(node_name,schema,"i");
1515 // ret+="\t\t\tt->n_aggrs--;\n"; // done in generate_tuple_from_aggr
1516 ret+="\t\t\tnflush--;\n";
1519 ret+="\tt->flush_pos=i;\n";
1520 ret+="\tif(t->n_aggrs == 0) {\n";
1521 ret+="\t\tt->flush_pos = t->max_aggrs;\n";
1524 ret+="\tif(t->flush_pos == t->max_aggrs) {\n";
1526 for(int g=0;g<gb_tbl->size();g++){
1527 data_type *dt = gb_tbl->get_data_type(g);
1528 if(dt->is_temporal()){
1529 data_type *gdt = gb_tbl->get_data_type(g);
1530 if(!gdt->is_buffer_type()){
1531 sprintf(tmpstr,"\t\tt->last_flushed_gb_%d = t->flush_start_gb_%d;\n",g,g);
1536 ret += "\t}\n}\n\n";
1541 // TODO Remove sprintf to perform string catenation
1542 string generate_fta_load_params(string node_name){
1544 vector<string> param_names = param_tbl->get_param_names();
1546 string ret = "static int load_params_"+node_name+"(struct "+generate_fta_name(node_name);
1547 ret += " *t, int sz, void *value, int initial_call){\n";
1548 ret += "\tint pos=0;\n";
1549 ret += "\tint data_pos;\n";
1551 for(p=0;p<param_names.size();p++){
1552 data_type *dt = param_tbl->get_data_type(param_names[p]);
1553 if(dt->is_buffer_type()){
1554 sprintf(tmpstr,"\t%s tmp_var_%s;\n",dt->get_cvar_type().c_str(), param_names[p].c_str() );
1556 sprintf(tmpstr,"\t%s access_var_%s;\n",dt->get_tuple_cvar_type().c_str(), param_names[p].c_str() );
1563 ret += "\n\tdata_pos = ";
1564 for(p=0;p<param_names.size();p++){
1565 if(p>0) ret += " + ";
1566 data_type *dt = param_tbl->get_data_type(param_names[p]);
1568 ret += dt->get_tuple_cvar_type();
1572 ret += "\tif(data_pos > sz) return 1;\n\n";
1575 for(p=0;p<param_names.size();p++){
1576 data_type *dt = param_tbl->get_data_type(param_names[p]);
1577 if(dt->is_buffer_type()){
1578 sprintf(tmpstr,"\taccess_var_%s = *( (%s *)((char *)value+pos) );\n",param_names[p].c_str(), dt->get_tuple_cvar_type().c_str() );
1580 switch( dt->get_type() ){
1582 // ret += "\ttmp_var_"+param_names[p]+".data = ntohl( tmp_var_"+param_names[p]+".data );\n"; // ntoh conversion
1583 // ret += "\ttmp_var_"+param_names[p]+".length = ntohl( tmp_var_"+param_names[p]+".length );\n"; // ntoh conversion
1584 sprintf(tmpstr,"\tif( (access_var_%s.offset) + access_var_%s.length > sz) return 1;\n",param_names[p].c_str(), param_names[p].c_str() );
1586 sprintf(tmpstr,"\ttmp_var_%s.data = (gs_sp_t)(value) + access_var_%s.offset ;\n",param_names[p].c_str(), param_names[p].c_str() );
1588 sprintf(tmpstr,"\ttmp_var_%s.length = access_var_%s.length ;\n",param_names[p].c_str(), param_names[p].c_str() );
1592 fprintf(stderr,"ERROR: parameter %s is of type %s, a buffered type, but I don't know how to unpack it as a parameter.\n",param_names[p].c_str(), dt->get_type_str().c_str() );
1596 // First, destroy the old
1597 ret += "\tif(! initial_call)\n";
1598 sprintf(tmpstr,"\t\t%s(&(t->param_%s));\n",dt->get_buffer_destroy().c_str(),param_names[p].c_str());
1600 // Next, create the new.
1601 sprintf(tmpstr,"\t%s((struct FTA *)t, &(t->param_%s), &tmp_var_%s);\n", dt->get_buffer_assign_copy().c_str(), param_names[p].c_str(), param_names[p].c_str() );
1604 // if(dt->needs_hn_translation()){
1605 // sprintf(tmpstr,"\tt->param_%s = %s( *( (%s *)( (char *)value+pos) ) );\n",
1606 // param_names[p].c_str(), dt->ntoh_translation().c_str(), dt->get_cvar_type().c_str() );
1608 sprintf(tmpstr,"\tt->param_%s = *( (%s *)( (char *)value+pos) );\n",
1609 param_names[p].c_str(), dt->get_cvar_type().c_str() );
1613 sprintf(tmpstr,"\tpos += sizeof( %s );\n",dt->get_cvar_type().c_str() );
1617 // Register the pass-by-handle parameters
1619 ret += "/* register and de-register the pass-by-handle parameters */\n";
1622 for(ph=0;ph<param_handle_table.size();++ph){
1623 data_type pdt(param_handle_table[ph]->type_name);
1624 switch(param_handle_table[ph]->val_type){
1630 ret += "\tif(! initial_call)\n";
1631 sprintf(tmpstr, "\t\t%s(t->handle_param_%d);\n",
1632 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1634 sprintf(tmpstr,"\tt->handle_param_%d = %s((struct FTA *)t,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
1637 if(pdt.is_buffer_type()) ret += "&(";
1638 ret += "t->param_"+param_handle_table[ph]->param_name;
1639 if(pdt.is_buffer_type()) ret += ")";
1643 sprintf(tmpstr, "INTERNAL ERROR unknown case (%d) found when processing pass-by-handle parameter table.",param_handle_table[ph]->val_type);
1644 fprintf(stderr,"%s\n",tmpstr);
1649 ret+="\treturn 0;\n";
1658 string generate_fta_free(string node_name, bool is_aggr_query){
1660 string ret="static gs_retval_t free_fta_"+node_name+"(struct FTA *f, gs_uint32_t recursive){\n";
1661 ret+= "\tstruct "+generate_fta_name(node_name)+
1662 " * t = (struct "+generate_fta_name(node_name)+" *) f;\n";
1663 ret += "\tint i;\n";
1666 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1667 ret+="\t/* \t\tmark all groups as old */\n";
1668 ret+="\tt->generation++;\n";
1669 ret+="\tt->flush_pos = 0;\n";
1670 ret+="\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1673 // Deregister the pass-by-handle parameters
1674 ret += "/* de-register the pass-by-handle parameters */\n";
1676 for(ph=0;ph<param_handle_table.size();++ph){
1677 sprintf(tmpstr, "\t%s(t->handle_param_%d);\n",
1678 param_handle_table[ph]->lfta_deregistration_fcn().c_str(),ph);
1683 ret += "\treturn 0;\n}\n\n";
1688 string generate_fta_control(string node_name, table_list *schema, bool is_aggr_query){
1689 string ret="static gs_retval_t control_fta_"+node_name+"(struct FTA *f, gs_int32_t command, gs_int32_t sz, void *value){\n";
1690 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1691 ret += generate_fta_name(node_name)+" *) f;\n\n";
1695 ret += "\t/* temp status tuple */\n";
1696 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1697 ret += "\tgs_int32_t tuple_size;\n";
1701 ret+="\tif(command == FTA_COMMAND_FLUSH){\n";
1703 ret+="\t\tif (!t->n_aggrs) {\n";
1704 ret+="\t\t\ttuple = allocate_tuple(f, 0);\n";
1705 ret+="\t\t\tif( tuple != NULL)\n";
1706 ret+="\t\t\t\tpost_tuple(tuple);\n";
1708 ret+="\t\t}else{\n";
1710 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1711 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1712 ret +="\t\tt->generation++;\n";
1713 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1714 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1715 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1716 ret+="\t\t\tt->flush_pos = 0;\n";
1717 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1722 if(param_tbl->size() > 0){
1724 "\tif(command == FTA_COMMAND_LOAD_PARAMS){\n"
1725 "\t\tif(load_params_"+node_name+"(t, sz, value, 0))\n"
1726 "#ifndef LFTA_IN_NIC\n"
1727 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small, ignored.\\n\");\n"
1734 "\tif(command == FTA_COMMAND_SET_SAMPLING_RATE){\n"
1735 "\t\tmemcpy(&t->sampling_rate, value, sizeof(gs_float_t));\n"
1739 ret += "\tif(command == FTA_COMMAND_FILE_DONE ){\n";
1742 ret+="\t\tif (t->n_aggrs) {\n";
1743 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1744 ret+="\t\t\t/* \t\tmark all groups as old */\n";
1745 ret +="\t\tt->generation++;\n";
1746 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1747 ret+="//\t\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1748 ret+="//\t\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1749 ret+="\t\t\tt->flush_pos = 0;\n";
1750 ret+="\t\t\tfta_aggr_flush_old_" + node_name+"(f,t->max_aggrs);\n";
1754 ret += "\t\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+");\n";
1755 ret += "\t\ttuple = allocate_tuple(f, tuple_size );\n";
1756 ret += "\t\tif( tuple == NULL)\n\t\treturn 1;\n";
1758 /* mark tuple as EOF_TUPLE */
1759 ret += "\n\t\t/* Mark tuple as eof_tuple */\n";
1760 ret += "\t\ttuple->tuple_type = EOF_TUPLE;\n";
1761 ret += "\t\tpost_tuple(tuple);\n";
1764 ret += "\treturn 0;\n}\n\n";
1769 string generate_fta_clock(string node_name, table_list *schema, unsigned time_corr, bool is_aggr_query){
1770 string ret="static gs_retval_t clock_fta_"+node_name+"(struct FTA *f){\n";
1771 ret += "\tstruct "+generate_fta_name(node_name)+" * t = (struct ";
1772 ret += generate_fta_name(node_name)+" *) f;\n\n";
1774 ret += "\t/* Create a temp status tuple */\n";
1775 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
1776 ret += "\tgs_int32_t tuple_size;\n";
1777 ret += "\tunsigned int i;\n";
1778 ret += "\ttime_t cur_time;\n";
1779 ret += "\tint time_advanced;\n";
1780 ret += "\tstruct fta_stat stats;\n";
1784 /* copy the last seen values of temporal attributes */
1785 col_id_set temp_cids; // col ids of temp attributes in select clause
1788 /* HACK: in order to reuse the SE generation code, we need to copy
1789 * the last values of the temp attributes into new variables
1790 * which have names unpack_var_XXX_XXX
1794 col_id_set::iterator csi;
1796 for(s=0;s<sl_list.size();s++){
1797 data_type *sdt = sl_list[s]->get_data_type();
1798 if (sdt->is_temporal()) {
1799 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
1803 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1804 int tblref = (*csi).tblvar_ref;
1805 int schref = (*csi).schema_ref;
1806 string field = (*csi).field;
1807 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1808 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n", dt.get_cvar_type().c_str(), field.c_str(), tblref);
1812 if (is_aggr_query) {
1813 for(g=0;g<gb_tbl->size();g++){
1814 data_type *gdt = gb_tbl->get_data_type(g);
1815 if(gdt->is_temporal()){
1816 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1818 data_type *gdt = gb_tbl->get_data_type(g);
1819 if(gdt->is_buffer_type()){
1820 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
1828 ret += "\ttime_advanced = 0;\n";
1830 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
1831 int tblref = (*csi).tblvar_ref;
1832 int schref = (*csi).schema_ref;
1833 string field = (*csi).field;
1834 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
1836 // update last seen value with the value seen
1837 ret += "\t#ifdef PREFILTER_DEFINED\n";
1838 sprintf(tmpstr,"\tif (prefilter_temp_vars.unpack_var_%s_%d > t->last_%s_%d) {\n\t\tt->last_%s_%d = prefilter_temp_vars.unpack_var_%s_%d;\n",
1839 field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref, field.c_str(), tblref);
1841 ret += "\t\ttime_advanced = 1;\n\t}\n";
1842 ret += "\t#endif\n";
1844 // we need to pay special attention to time fields
1845 if (field == "time" || field == "timestamp" || field == "timestamp_ms"){
1846 ret += "\tcur_time = time(&cur_time);\n";
1848 if (field == "time") {
1849 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && (t->last_time_%d < (cur_time - %d))) {\n",
1852 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = cur_time - %d;\n",
1853 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1854 } else if (field == "timestamp_ms") {
1855 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((t->last_timestamp_ms_%d/1000) < (cur_time - %d))) {\n",
1858 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = (cur_time - %d) * 1000;\n",
1859 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1861 sprintf(tmpstr,"\tif (!gscp_blocking_mode() && ((gs_uint32_t)(t->last_%s_%d>>32) < (cur_time - %d))) {\n",
1862 field.c_str(), tblref, time_corr);
1864 sprintf(tmpstr,"\t\tunpack_var_%s_%d = t->last_%s_%d = ((gs_uint64_t)(cur_time - %d))<<32;\n",
1865 field.c_str(), tblref, field.c_str(), tblref, time_corr);
1869 ret += "\t\ttime_advanced = 1;\n";
1872 sprintf(tmpstr,"\telse\n\t\tunpack_var_%s_%d = t->last_%s_%d;\n",
1873 field.c_str(), tblref, field.c_str(), tblref);
1876 sprintf(tmpstr,"\tunpack_var_%s_%d = t->last_%s_%d;\n",
1877 field.c_str(), tblref, field.c_str(), tblref);
1882 // for aggregation lftas we need to check if the time was advanced beyond the current epoch
1883 if (is_aggr_query) {
1886 bool first_one = true;
1887 for(g=0;g<gb_tbl->size();g++){
1888 data_type *gdt = gb_tbl->get_data_type(g);
1889 if(gdt->is_temporal()){
1890 // To perform the test, first need to compute the value
1891 // of the temporal gb attrs.
1892 if(gdt->is_buffer_type()){
1893 // NOTE : if the SE defining the gb is anything
1894 // other than a ref to a variable, this will generate
1895 // illegal code. To be resolved with Spatch.
1896 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
1897 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
1899 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
1900 gdt->get_buffer_assign_copy().c_str(), g, g);
1902 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
1906 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
1907 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
1908 if(first_one){first_one = false;} else {change_test.append(") && (");}
1909 change_test.append(generate_equality_test(lhs_op, rhs_op, gdt));
1913 ret += "\n\tif( time_advanced && !( (";
1917 ret += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
1918 ret += "\t\tif(t->flush_pos<t->max_aggrs) \n";
1919 ret += "\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
1921 ret += "\t\t/* \t\tmark all groups as old */\n";
1922 ret +="\t\tt->generation++;\n";
1923 ret += "//\tmarking groups old should happen implicitly by advancing the generation.\n";
1924 ret += "//\t\tfor (i = 0; i < t->bitmap_size; ++i)\n";
1925 ret += "//\t\t\tt->aggr_table_bitmap[i] &= 0xAAAAAAAA;\n";
1926 ret += "\t\tt->flush_pos = 0;\n";
1928 for(g=0;g<gb_tbl->size();g++){
1929 data_type *gdt = gb_tbl->get_data_type(g);
1930 if(gdt->is_temporal()){
1931 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
1932 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g); ret += tmpstr;
1939 ret += "\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t) + sizeof(struct fta_stat);\n";
1940 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
1941 ret += "\tif( tuple == NULL)\n\t\treturn 1;\n";
1944 for(s=0;s<sl_list.size();s++){
1945 data_type *sdt = sl_list[s]->get_data_type();
1946 if(sdt->is_temporal()){
1948 if (sl_list[s]->is_gb()) {
1949 sprintf(tmpstr,"\tt->last_flushed_gb_%d = (t->n_aggrs) ? t->last_flushed_gb_%d : %s;\n",sl_list[s]->get_gb_ref(), sl_list[s]->get_gb_ref(), generate_se_code(sl_list[s],schema).c_str());
1953 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
1955 // if(sdt->needs_hn_translation())
1956 // ret += sdt->hton_translation() +"( ";
1957 if (sl_list[s]->is_gb()) {
1958 sprintf(tmpstr, "t->last_flushed_gb_%d",sl_list[s]->get_gb_ref());
1961 ret += generate_se_code(sl_list[s],schema);
1963 // if(sdt->needs_hn_translation())
1969 /* mark tuple as temporal */
1970 ret += "\n\t/* Mark tuple as temporal */\n";
1971 ret += "\ttuple->tuple_type = TEMPORAL_TUPLE;\n";
1973 ret += "\n\t/* Copy trace id */\n";
1974 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+"), &t->trace_id, sizeof(gs_uint64_t));\n\n";
1976 ret += "\n\t/* Populate runtime stats */\n";
1977 ret += "\tstats.ftaid = f->ftaid;\n";
1978 ret += "\tstats.in_tuple_cnt = t->in_tuple_cnt;\n";
1979 ret += "\tstats.out_tuple_cnt = t->out_tuple_cnt;\n";
1980 ret += "\tstats.out_tuple_sz = t->out_tuple_sz;\n";
1981 ret += "\tstats.accepted_tuple_cnt = t->accepted_tuple_cnt;\n";
1982 ret += "\tstats.cycle_cnt = t->cycle_cnt;\n";
1983 ret += "\tstats.collision_cnt = t->collision_cnt;\n";
1984 ret += "\tstats.eviction_cnt = t->eviction_cnt;\n";
1985 ret += "\tstats.sampling_rate = t->sampling_rate;\n";
1987 ret += "\n#ifdef LFTA_PROFILE\n";
1988 ret += "\n\t/* Print stats */\n";
1989 ret += "\tfprintf(stderr, \"STATS " + node_name + " \");\n";
1990 ret += "\tfprintf(stderr, \"in_tuple_cnt= %u \", t->in_tuple_cnt);\n";
1991 ret += "\tfprintf(stderr, \"out_tuple_cnt= %u \", t->out_tuple_cnt);\n";
1992 ret += "\tfprintf(stderr, \"out_tuple_sz= %u \", t->out_tuple_sz);\n";
1993 ret += "\tfprintf(stderr, \"accepted_tuple_cnt= %u \", t->accepted_tuple_cnt);\n";
1994 ret += "\tfprintf(stderr, \"cycle_cnt= %llu \", t->cycle_cnt);\n";
1995 ret += "\tfprintf(stderr, \"cycle_per_in_tuple= %lf \", ((double)t->cycle_cnt)/((double)t->in_tuple_cnt));\n";
1996 ret += "\tfprintf(stderr, \"collision_cnt= %d\\n\", t->collision_cnt);\n\n";
1997 ret += "\tfprintf(stderr, \"eviction_cnt= %d\\n\", t->eviction_cnt);\n\n";
1998 ret += "\n#endif\n";
2001 ret += "\n\t/* Copy stats */\n";
2002 ret += "\tmemcpy((gs_sp_t)tuple+sizeof( struct "+generate_tuple_name(node_name)+") + sizeof(gs_uint64_t), &stats, sizeof(fta_stat));\n\n";
2003 ret+="\tpost_tuple(tuple);\n";
2005 ret += "\n\t/* Send a heartbeat message to clearinghouse */\n";
2006 ret += "\tfta_heartbeat(f->ftaid, t->trace_id++, 1, &stats);\n";
2008 ret += "\n\t/* Reset runtime stats */\n";
2009 ret += "\tt->in_tuple_cnt = 0;\n";
2010 ret += "\tt->out_tuple_cnt = 0;\n";
2011 ret += "\tt->out_tuple_sz = 0;\n";
2012 ret += "\tt->accepted_tuple_cnt = 0;\n";
2013 ret += "\tt->cycle_cnt = 0;\n";
2014 ret += "\tt->collision_cnt = 0;\n";
2015 ret += "\tt->eviction_cnt = 0;\n";
2017 ret += "\treturn 0;\n}\n\n";
2023 // accept processing before the where clause,
2024 // do flush processwing.
2025 string generate_aggr_accept_prelim(qp_node *fs, string node_name, table_list *schema, col_id_set &unpacked_cids, string &temporal_flush){
2029 string ret="\n/*\tslow flush\t*/\n";
2030 string slow_flush_str = fs->get_val_of_def("slow_flush");
2031 int n_slow_flush = atoi(slow_flush_str.c_str());
2032 if(n_slow_flush <= 0) n_slow_flush = 2;
2033 if(n_slow_flush > 1){
2034 ret += "\tt->flush_ctr++;\n";
2035 ret += "\tif(t->flush_ctr >= "+int_to_string(n_slow_flush)+"){\n";
2036 ret += "\t\tt->flush_ctr = 0;\n";
2037 ret+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n";
2040 ret+="\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,1);\n\n";
2045 bool first_one = true;
2047 col_id_set flush_cids; // col ids accessed when computing flush variables.
2048 // unpack them at temporal flush test time.
2049 temporal_flush = "";
2052 for(g=0;g<gb_tbl->size();g++){
2053 data_type *gdt = gb_tbl->get_data_type(g);
2054 if(gdt->is_temporal()){
2055 gather_se_col_ids(gb_tbl->get_def(g), flush_cids, gb_tbl);
2057 // To perform the test, first need to compute the value
2058 // of the temporal gb attrs.
2059 if(gdt->is_buffer_type()){
2060 // NOTE : if the SE defining the gb is anything
2061 // other than a ref to a variable, this will generate
2062 // illegal code. To be resolved with Spatch.
2063 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2064 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2065 temporal_flush += tmpstr;
2066 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2067 gdt->get_buffer_assign_copy().c_str(), g, g);
2069 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2071 temporal_flush += tmpstr;
2072 // END computing the value of the temporal GB attr.
2075 sprintf(tmpstr,"t->last_gb_%d",g); string lhs_op = tmpstr;
2076 sprintf(tmpstr,"gb_attr_%d",g); string rhs_op = tmpstr;
2077 if(first_one){first_one = false;} else {change_test.append(") && (");}
2078 change_test += generate_equality_test(lhs_op, rhs_op, gdt);
2081 if(!first_one){ // will be false iff. there is a temporal GB attribute
2082 temporal_flush += "\n/*\t\tFlush the aggregates if the temporal gb attrs have changed.\t*/\n";
2083 temporal_flush += "\tif( !( (";
2084 temporal_flush += change_test;
2085 temporal_flush += ") ) ){\n";
2087 // temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs) fta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2088 temporal_flush+="\t\tif(t->flush_pos<t->max_aggrs){ \n";
2089 temporal_flush+="\t\t\tfta_aggr_flush_old_"+node_name+"(f,t->max_aggrs);\n";
2090 temporal_flush+="\t\t}\n";
2091 temporal_flush+="\t\t/* \t\tmark all groups as old */\n";
2092 temporal_flush+="\t\tt->generation++;\n";
2093 temporal_flush+="\t\tt->flush_pos = 0;\n";
2096 // Now set the saved temporal value of the gb to the
2097 // current value of the gb. Only for simple types,
2098 // not for buffer types -- but the strings are not
2099 // temporal in any case.
2101 for(g=0;g<gb_tbl->size();g++){
2102 data_type *gdt = gb_tbl->get_data_type(g);
2103 if(gdt->is_temporal()){
2104 if(gdt->is_buffer_type()){
2106 fprintf(stderr,"ERROR : can't handle temporal buffer types, ignoring in buffer flush control.\n");
2108 sprintf(tmpstr,"\t\tt->flush_start_gb_%d = gb_attr_%d;\n",g,g);
2109 temporal_flush += tmpstr;
2110 sprintf(tmpstr,"\t\tt->last_gb_%d = gb_attr_%d;\n",g,g);
2111 temporal_flush += tmpstr;
2115 temporal_flush += "\t}\n\n";
2118 // Unpack all the temporal attributes referenced in select clause
2119 // and update the last value of the attribute
2120 col_id_set temp_cids; // col ids of temp attributes in select clause
2121 col_id_set::iterator csi;
2123 for(s=0;s<sl_list.size();s++){
2124 data_type *sdt = sl_list[s]->get_data_type();
2125 if (sdt->is_temporal()) {
2126 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
2130 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
2131 if(unpacked_cids.count((*csi)) == 0){
2132 int tblref = (*csi).tblvar_ref;
2133 int schref = (*csi).schema_ref;
2134 string field = (*csi).field;
2135 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2137 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
2138 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2139 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2141 ret += "\tif(retval) return 1;\n";
2143 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
2146 unpacked_cids.insert( (*csi) );
2151 // Do the flush here if this is a real_time query
2152 string rt_level = fs->get_val_of_def("real_time");
2153 if(rt_level != "" && temporal_flush != ""){
2154 for(csi=flush_cids.begin(); csi != flush_cids.end();++csi){
2155 if(unpacked_cids.count((*csi)) == 0){
2156 int tblref = (*csi).tblvar_ref;
2157 int schref = (*csi).schema_ref;
2158 string field = (*csi).field;
2159 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2161 sprintf(tmpstr,"\tretval = %s(p, &unpack_var_%s_%d);\n",
2162 schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
2164 ret += "\tif(retval) return 1;\n";
2166 unpacked_cids.insert( (*csi) );
2169 ret += temporal_flush;
2175 string generate_sel_accept_body(qp_node *fs, string node_name, table_list *schema){
2180 /////////////// Processing for filter-only query
2182 // test passed : create the tuple, then assign to it.
2183 ret += "/*\t\tCreate and post the tuple\t*/\n";
2185 // Unpack partial fcns ref'd by the select clause.
2186 // Its a kind of a WHERE clause ...
2187 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2188 if(fcn_ref_cnt[p] > 1){
2189 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2191 if(is_partial_fcn[p]){
2192 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2193 ret += "\tif(retval) goto end;\n";
2195 if(fcn_ref_cnt[p] > 1){
2196 if(!is_partial_fcn[p]){
2197 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2199 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2204 // increment the counter of accepted tuples
2205 ret += "\n\t#ifdef LFTA_STATS\n";
2206 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2207 ret += "\t#endif\n\n";
2209 // First, compute the size of the tuple.
2211 // Unpack any BUFFER type selections into temporaries
2212 // so that I can compute their size and not have
2213 // to recompute their value during tuple packing.
2214 // I can use regular assignment here because
2215 // these temporaries are non-persistent.
2217 for(s=0;s<sl_list.size();s++){
2218 data_type *sdt = sl_list[s]->get_data_type();
2219 if(sdt->is_buffer_type()){
2220 sprintf(tmpstr,"\tselvar_%d = ",s);
2222 ret += generate_se_code(sl_list[s],schema);
2228 // The size of the tuple is the size of the tuple struct plus the
2229 // size of the buffers to be copied in.
2231 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2232 for(s=0;s<sl_list.size();s++){
2233 data_type *sdt = sl_list[s]->get_data_type();
2234 if(sdt->is_buffer_type()){
2235 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2242 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2243 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2245 // Test passed, make assignments to the tuple.
2247 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2249 // Mark tuple as REGULAR_TUPLE
2250 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2253 for(s=0;s<sl_list.size();s++){
2254 data_type *sdt = sl_list[s]->get_data_type();
2255 if(sdt->is_buffer_type()){
2256 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2258 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2261 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2263 // if(sdt->needs_hn_translation())
2264 // ret += sdt->hton_translation() +"( ";
2265 ret += generate_se_code(sl_list[s],schema);
2266 // if(sdt->needs_hn_translation())
2274 ret += "\tpost_tuple(tuple);\n";
2276 // Increment the counter of posted tuples
2277 ret += "\n\t#ifdef LFTA_STATS\n";
2278 ret += "\tt->out_tuple_cnt++;\n";
2279 ret+="\tt->out_tuple_sz+=tuple_size;\n";
2280 ret += "\t#endif\n\n";
2287 string generate_fj_accept_body(filter_join_qpn *fs, string node_name,col_id_set &unpacked_cids,ext_fcn_list *Ext_fcns, table_list *schema){
2293 unsigned int window_len = fs->temporal_range;
2294 unsigned int n_bloom = 11;
2295 string n_bloom_str = fs->get_val_of_def("num_bloom");
2296 int tmp_n_bloom = atoi(n_bloom_str.c_str());
2298 n_bloom = tmp_n_bloom+1;
2299 float bloom_width = (window_len+1.0)/(1.0*n_bloom-1);
2300 sprintf(tmpstr,"%f",bloom_width);
2301 string bloom_width_str = tmpstr;
2303 if(window_len < n_bloom){
2304 n_bloom = window_len+1;
2305 bloom_width_str = "1";
2309 // Grab the current window time
2310 scalarexp_t winvar(fs->temporal_var);
2311 ret += "\tcurr_fj_ts = "+generate_se_code(&winvar,schema)+";\n";
2313 int bf_exp_size = 12; // base-2 log of number of bits
2314 string bloom_len_str = fs->get_val_of_def("bloom_size");
2315 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
2316 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
2317 bf_exp_size = tmp_bf_exp_size;
2319 int bf_bit_size = 1 << bf_exp_size;
2320 int bf_byte_size = bf_bit_size / (8*sizeof(char));
2322 unsigned int ht_size = 4096;
2323 string ht_size_s = fs->get_val_of_def("aggregate_slots");
2324 int tmp_ht_size = atoi(ht_size_s.c_str());
2325 if(tmp_ht_size > 1024){
2326 unsigned int hs = 1; // make it power of 2
2329 tmp_ht_size = tmp_ht_size >> 1;
2336 for(i=0;i<bf_exp_size;i++)
2337 bf_mask = (bf_mask << 1) | 1;
2339 for(i=ht_size;i>1;i=i>>1)
2340 bf_mask = (bf_mask << 1) | 1;
2344 printf("n_bloom=%d, window_len=%d, bloom_width=%s, bf_exp_size=%d, bf_bit_size=%d, bf_byte_size=%d, ht_size=%d, ht_size_s=%s, bf_mask=%d\n",
2347 bloom_width_str.c_str(),
2359 // If this is a bloom-filter fj, first test if the
2360 // bloom filter needs to be advanced.
2361 // SET_BF_EMPTY(table,number of bloom filters,bloom filter index,bit index)
2362 // t->bf_size : number of bits in bloom filter
2364 // TODO: Don't iterate more than n_bloom times!
2365 // As written, its possible to wrap around many times.
2368 "// Clean out old bloom filters if needed.\n"
2369 "// TODO vectorize this ? \n"
2370 " if(t->first_exec){\n"
2371 " t->first_exec = 0;\n"
2372 " t->last_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2373 " t->last_bloom_pos = t->last_bin % "+int_to_string(n_bloom)+";\n"
2375 " curr_bin = (long long int)(curr_fj_ts/"+bloom_width_str+");\n"
2376 " if(curr_bin != t->last_bin){\n"
2377 " for(the_bin=t->last_bin+1;the_bin<=curr_bin;the_bin++){\n"
2378 " t->last_bloom_pos++;\n"
2379 " if(t->last_bloom_pos >= "+int_to_string(n_bloom)+")\n"
2380 " t->last_bloom_pos = 0;\n"
2381 " tmp_i = t->last_bloom_pos;\n"
2382 " for(j=0;j<"+int_to_string(bf_bit_size)+";j++){\n"
2383 " SET_BF_EMPTY(t->bf_table, "+int_to_string(n_bloom)+", tmp_i,j);\n"
2387 " t->last_bin = curr_bin;\n"
2393 //-----------------------------------------------------------------
2394 // First, determine whether to do S (filter stream) processing.
2397 "// S (filtering stream) predicate, should it be processed?\n"
2400 // Sort S preds based on cost.
2401 vector<cnf_elem *> s_filt = fs->pred_t1;
2402 col_id_set::iterator csi;
2403 if(s_filt.size() > 0){
2405 // Unpack fields ref'd in the S pred
2406 for(w=0;w<s_filt.size();++w){
2407 col_id_set this_pred_cids;
2408 gather_pr_col_ids(s_filt[w]->pr, this_pred_cids, gb_tbl);
2409 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2410 if(unpacked_cids.count( (*csi) ) == 0){
2411 int tblref = (*csi).tblvar_ref;
2412 int schref = (*csi).schema_ref;
2413 string field = (*csi).field;
2414 ret += generate_unpack_code(tblref,schref,field,schema,node_name,"end_s");
2415 unpacked_cids.insert( (*csi) );
2421 // Sort by evaluation cost.
2422 // First, estimate evaluation costs
2423 // Eliminate predicates covered by the prefilter (those in s_pids).
2424 // I need to do it before the sort becuase the indices refer
2425 // to the position in the unsorted list.
2426 vector<cnf_elem *> tmp_wh;
2427 for(w=0;w<s_filt.size();++w){
2428 compute_cnf_cost(s_filt[w],Ext_fcns);
2429 tmp_wh.push_back(s_filt[w]);
2433 sort(s_filt.begin(), s_filt.end(), compare_cnf_cost());
2435 // Now generate the predicates.
2436 for(w=0;w<s_filt.size();++w){
2437 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,s_filt[w]->cost);
2440 // Find partial fcns ref'd in this cnf element
2442 collect_partial_fcns_pr(s_filt[w]->pr, pfcn_refs);
2443 // Since set<..> is a "Sorted Associative Container",
2444 // we can walk through it in sorted order by walking from
2445 // begin() to end(). (and the partial fcns must be
2446 // evaluated in this order).
2447 set<int>::iterator si;
2449 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2450 if(fcn_ref_cnt[(*si)] > 1){
2451 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2453 if(is_partial_fcn[(*si)]){
2454 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2455 ret += "\t\tif(retval) goto end_s;\n";
2457 if(fcn_ref_cnt[(*si)] > 1){
2458 if(!is_partial_fcn[(*si)]){
2459 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2460 // Testing for S is a side branch.
2461 // I don't want a cacheable partial function to be
2462 // marked as evaluated. Therefore I mark the function
2463 // as evalauted ONLY IF it is not partial.
2464 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2470 ret += "\tif( !("+generate_predicate_code(s_filt[w]->pr,schema)+
2471 ") ) goto end_s;\n";
2474 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2477 for(p=0;p<fs->hash_eq.size();++p)
2478 ret += "\t\ts_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_right_se(),schema)+";\n";
2481 // First, generate the S scalar expressions in the hash_eq
2483 // Iterate over the bloom filters
2485 ret += "\t\tbucket=0;\n";
2486 for(p=0;p<fs->hash_eq.size();++p){
2488 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2489 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2490 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2492 // SET_BF_BIT(table,number of bloom filters,bloom filter index,bit index)
2494 " bucket &= "+int_to_string(bf_mask)+";\n"
2495 " SET_BF_BIT(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket);\n"
2500 ret += "// Add the S record to the hash table, choose a position\n";
2501 ret += "\t\tbucket=0;\n";
2502 for(p=0;p<fs->hash_eq.size();++p){
2504 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2505 fs->hash_eq[p]->pr->get_right_se()->get_data_type()->get_type_str()+
2506 +"_to_hash(s_equijoin_"+int_to_string(p)+"))>>32);\n";
2509 " bucket &= "+int_to_string(bf_mask)+";\n"
2510 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2512 // Try the first bucket
2514 for(p=0;p<fs->hash_eq.size();++p){
2515 if(p>0) ret += " && ";
2516 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2517 // " == s_equijoin_"+int_to_string(p);
2518 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2519 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2520 string rhs_op = "s_equijoin_"+int_to_string(p);
2521 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2523 ret += "){\n\t\t\tthe_bucket = bucket;\n";
2524 ret += "\t\t}else{\n\t\t\tif(";
2525 for(p=0;p<fs->hash_eq.size();++p){
2526 if(p>0) ret += " && ";
2527 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2528 // " == s_equijoin_"+int_to_string(p);
2529 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2530 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2531 string rhs_op = "s_equijoin_"+int_to_string(p);
2532 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2534 ret += "){\n\t\t\t\tthe_bucket = bucket1;\n";
2535 ret += "\t\t\t}else{ \n\t\t\t\tif(t->join_table[bucket].ts <= t->join_table[bucket1].ts)\n";
2536 ret+="\t\t\t\t\tthe_bucket = bucket;\n\t\t\t\telse\n\t\t\t\t\tthe_bucket=bucket1;\n";
2537 ret += "\t\t\t}\n\t\t}\n";
2538 for(p=0;p<fs->hash_eq.size();++p){
2539 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2540 if(hdt->is_buffer_type()){
2541 sprintf(tmpstr,"\t\t%s(f, &(t->join_table[the_bucket].key_var%d), &s_equijoin_%d);\n", hdt->get_buffer_assign_copy().c_str(), p, p);
2544 ret += "\t\tt->join_table[the_bucket].key_var"+int_to_string(p)+
2545 " = s_equijoin_"+int_to_string(p)+";\n";
2548 ret+="\t\tt->join_table[the_bucket].ts = curr_fj_ts;\n";
2550 ret += "\tend_s:\n";
2552 // ------------------------------------------------------------
2553 // Next, determine if the R record should be processed.
2557 "// R (main stream) cheap predicate\n"
2561 // Unpack r_filt fields
2562 vector<cnf_elem *> r_filt = fs->pred_t0;
2563 for(w=0;w<r_filt.size();++w){
2564 col_id_set this_pred_cids;
2565 gather_pr_col_ids(r_filt[w]->pr, this_pred_cids, gb_tbl);
2566 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
2567 if(unpacked_cids.count( (*csi) ) == 0){
2568 int tblref = (*csi).tblvar_ref;
2569 int schref = (*csi).schema_ref;
2570 string field = (*csi).field;
2571 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2572 unpacked_cids.insert( (*csi) );
2577 // Sort R preds based on cost.
2579 vector<cnf_elem *> tmp_wh;
2580 for(w=0;w<r_filt.size();++w){
2581 compute_cnf_cost(r_filt[w],Ext_fcns);
2582 tmp_wh.push_back(r_filt[w]);
2586 sort(r_filt.begin(), r_filt.end(), compare_cnf_cost());
2588 // WARNING! the constant 20 below is a wild-ass guess.
2590 for(cheap_rpos=0;cheap_rpos<r_filt.size() && r_filt[cheap_rpos]->cost <= 20;cheap_rpos++)
2592 // Test the cheap filters on R.
2595 // Now generate the predicates.
2596 for(w=0;w<cheap_rpos;++w){
2597 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2600 // Find partial fcns ref'd in this cnf element
2602 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2603 // Since set<..> is a "Sorted Associative Container",
2604 // we can walk through it in sorted order by walking from
2605 // begin() to end(). (and the partial fcns must be
2606 // evaluated in this order).
2607 set<int>::iterator si;
2608 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2609 if(fcn_ref_cnt[(*si)] > 1){
2610 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2612 if(is_partial_fcn[(*si)]){
2613 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2614 ret += "\t\tif(retval) goto end;\n";
2616 if(fcn_ref_cnt[(*si)] > 1){
2617 if(!is_partial_fcn[(*si)]){
2618 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2620 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2625 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2629 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2632 ret += "\n// Do the join\n\n";
2633 for(p=0;p<fs->hash_eq.size();++p)
2634 ret += "\tr_equijoin_"+int_to_string(p)+" = "+generate_se_code(fs->hash_eq[p]->pr->get_left_se(),schema)+";\n";
2637 // Passed the cheap pred, now test the join with S.
2640 ret += "\t\tbucket"+int_to_string(i)+"=0;\n";
2641 for(p=0;p<fs->hash_eq.size();++p){
2643 " bucket"+int_to_string(i)+
2644 " ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2645 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2646 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2649 " bucket"+int_to_string(i)+" &= "+int_to_string(bf_mask)+";\n";
2651 ret += "\tfound = 0;\n";
2652 ret += "\tfor(b=0;b<"+int_to_string(n_bloom)+" && !found; b++){\n";
2654 "\t\tif(IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket0) && "
2655 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket1) && "
2656 "IS_BF_SET(t->bf_table,"+int_to_string(n_bloom)+",t->last_bloom_pos,bucket2))\n "
2665 ret += "\tfound = 0;\n";
2666 ret += "\t\tbucket=0;\n";
2667 for(p=0;p<fs->hash_eq.size();++p){
2669 " bucket ^= (("+hash_nums[(i*fs->hash_eq.size()+p)%NRANDS]+" * lfta_"+
2670 fs->hash_eq[p]->pr->get_left_se()->get_data_type()->get_type_str()+
2671 +"_to_hash(r_equijoin_"+int_to_string(p)+"))>>32);\n";
2674 " bucket &= "+int_to_string(bf_mask)+";\n"
2675 " bucket1 = (bucket + 1) & "+int_to_string(bf_mask)+";\n"
2677 // Try the first bucket
2679 for(p=0;p<fs->hash_eq.size();++p){
2680 if(p>0) ret += " && ";
2681 // ret += "t->join_table[bucket].key_var"+int_to_string(p)+
2682 // " == r_equijoin_"+int_to_string(p);
2683 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2684 string lhs_op = "t->join_table[bucket].key_var"+int_to_string(p);
2685 string rhs_op = "s_equijoin_"+int_to_string(p);
2686 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2688 if(p>0) ret += " && ";
2689 ret += "t->join_table[bucket].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2690 ret += "){\n\t\t\tfound = 1;\n";
2691 ret += "\t\t}else {if(";
2692 for(p=0;p<fs->hash_eq.size();++p){
2693 if(p>0) ret += " && ";
2694 // ret += "t->join_table[bucket1].key_var"+int_to_string(p)+
2695 // " == r_equijoin_"+int_to_string(p);
2696 data_type *hdt = fs->hash_eq[p]->pr->get_right_se()->get_data_type();
2697 string lhs_op = "t->join_table[bucket1].key_var"+int_to_string(p);
2698 string rhs_op = "s_equijoin_"+int_to_string(p);
2699 ret += generate_equality_test(lhs_op,rhs_op,hdt);
2701 if(p>0) ret += " && ";
2702 ret += "t->join_table[bucket1].ts+"+int_to_string(fs->temporal_range)+" <= curr_fj_ts";
2703 ret += ")\n\t\t\tfound=1;\n";
2712 // Test the expensive filters on R.
2713 if(cheap_rpos < r_filt.size()){
2715 // Now generate the predicates.
2716 for(w=cheap_rpos;w<r_filt.size();++w){
2717 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,r_filt[w]->cost);
2720 // Find partial fcns ref'd in this cnf element
2722 collect_partial_fcns_pr(r_filt[w]->pr, pfcn_refs);
2723 // Since set<..> is a "Sorted Associative Container",
2724 // we can walk through it in sorted order by walking from
2725 // begin() to end(). (and the partial fcns must be
2726 // evaluated in this order).
2727 set<int>::iterator si;
2728 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
2729 if(fcn_ref_cnt[(*si)] > 1){
2730 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
2732 if(is_partial_fcn[(*si)]){
2733 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
2734 ret += "\t\tif(retval) goto end;\n";
2736 if(fcn_ref_cnt[(*si)] > 1){
2737 if(!is_partial_fcn[(*si)]){
2738 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
2740 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
2745 ret += "\tif( !("+generate_predicate_code(r_filt[w]->pr,schema)+
2749 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
2754 /////////////// post the tuple
2756 // test passed : create the tuple, then assign to it.
2757 ret += "/*\t\tCreate and post the tuple\t*/\n";
2759 // Unpack r_filt fields
2760 for(s=0;s<sl_list.size();++s){
2761 col_id_set this_se_cids;
2762 gather_se_col_ids(sl_list[s], this_se_cids, gb_tbl);
2763 for(csi=this_se_cids.begin();csi!=this_se_cids.end();++csi){
2764 if(unpacked_cids.count( (*csi) ) == 0){
2765 int tblref = (*csi).tblvar_ref;
2766 int schref = (*csi).schema_ref;
2767 string field = (*csi).field;
2768 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
2769 unpacked_cids.insert( (*csi) );
2775 // Unpack partial fcns ref'd by the select clause.
2776 // Its a kind of a WHERE clause ...
2777 for(p=sl_fcns_start;p<sl_fcns_end;p++){
2778 if(fcn_ref_cnt[p] > 1){
2779 ret += "\tif(fcn_ref_cnt_"+int_to_string(p)+"==0){\n";
2781 if(is_partial_fcn[p]){
2782 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2783 ret += "\tif(retval) goto end;\n";
2785 if(fcn_ref_cnt[p] > 1){
2786 if(!is_partial_fcn[p]){
2787 ret += "\t\tpartial_fcn_result_"+int_to_string(p)+"="+generate_cached_fcn(partial_fcns[p],schema)+";\n";
2789 ret += "\t\tfcn_ref_cnt_"+int_to_string(p)+"=1;\n";
2794 // increment the counter of accepted tuples
2795 ret += "\n\t#ifdef LFTA_STATS\n";
2796 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2797 ret += "\t#endif\n\n";
2799 // First, compute the size of the tuple.
2801 // Unpack any BUFFER type selections into temporaries
2802 // so that I can compute their size and not have
2803 // to recompute their value during tuple packing.
2804 // I can use regular assignment here because
2805 // these temporaries are non-persistent.
2807 for(s=0;s<sl_list.size();s++){
2808 data_type *sdt = sl_list[s]->get_data_type();
2809 if(sdt->is_buffer_type()){
2810 sprintf(tmpstr,"\tselvar_%d = ",s);
2812 ret += generate_se_code(sl_list[s],schema);
2818 // The size of the tuple is the size of the tuple struct plus the
2819 // size of the buffers to be copied in.
2821 ret+="\ttuple_size = sizeof( struct "+generate_tuple_name(node_name)+")";
2822 for(s=0;s<sl_list.size();s++){
2823 data_type *sdt = sl_list[s]->get_data_type();
2824 if(sdt->is_buffer_type()){
2825 sprintf(tmpstr," + %s(&selvar_%d)", sdt->get_buffer_size().c_str(),s);
2832 ret += "\ttuple = allocate_tuple(f, tuple_size );\n";
2833 ret += "\tif( tuple == NULL)\n\t\tgoto end;\n";
2835 // Test passed, make assignments to the tuple.
2837 ret += "\ttuple_pos = sizeof( struct "+generate_tuple_name(node_name)+");\n";
2839 // Mark tuple as REGULAR_TUPLE
2840 ret += "\ttuple->tuple_type = REGULAR_TUPLE;\n";
2843 for(s=0;s<sl_list.size();s++){
2844 data_type *sdt = sl_list[s]->get_data_type();
2845 if(sdt->is_buffer_type()){
2846 sprintf(tmpstr,"\t%s(&(tuple->tuple_var%d), &selvar_%d, (char *)tuple, ((char *)tuple)+tuple_pos);\n", sdt->get_buffer_tuple_copy().c_str(),s, s);
2848 sprintf(tmpstr,"\ttuple_pos += %s(&selvar_%d);\n", sdt->get_buffer_size().c_str(), s);
2851 sprintf(tmpstr,"\ttuple->tuple_var%d = ",s);
2853 // if(sdt->needs_hn_translation())
2854 // ret += sdt->hton_translation() +"( ";
2855 ret += generate_se_code(sl_list[s],schema);
2856 // if(sdt->needs_hn_translation())
2864 ret += "\tpost_tuple(tuple);\n";
2866 // Increment the counter of posted tuples
2867 ret += "\n\t#ifdef LFTA_STATS\n";
2868 ret += "\n\tt->out_tuple_cnt++;\n\n";
2869 ret+="\t\t\t\tt->out_tuple_sz+=tuple_size;\n";
2870 ret += "\t#endif\n\n";
2876 string generate_aggr_accept_body(qp_node *fs,string node_name,table_list *schema, string &temporal_flush){
2880 ////////////// Processing for aggregtion query
2882 // First, search for a match. Start by unpacking the group-by attributes.
2884 // One complication : if a real-time aggregate flush occurs,
2885 // the GB attr has already been calculated. So don't compute
2886 // it again if 1) its temporal and 2) it will be computed in the
2887 // agggregate flush code.
2889 // Unpack the partial fcns ref'd by the gb's and the aggr defs.
2890 for(p=gb_fcns_start;p<gb_fcns_end;p++){
2891 if(is_partial_fcn[p]){
2892 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2893 ret += "\tif(retval) goto end;\n";
2896 for(p=ag_fcns_start;p<ag_fcns_end;p++){
2897 if(is_partial_fcn[p]){
2898 ret += unpack_partial_fcn(partial_fcns[p], p, schema);
2899 ret += "\tif(retval) goto end;\n";
2903 // increment the counter of accepted tuples
2904 ret += "\n\t#ifdef LFTA_STATS\n";
2905 ret += "\n\tt->accepted_tuple_cnt++;\n\n";
2906 ret += "\t#endif\n\n";
2908 ret += "/*\t\tTest if the group is in the hash table \t*/\n";
2909 // Compute the values of the group-by variables.
2910 for(g=0;g<gb_tbl->size();g++){
2911 data_type *gdt = gb_tbl->get_data_type(g);
2912 if((! gdt->is_temporal()) || temporal_flush == ""){
2914 if(gdt->is_buffer_type()){
2915 // NOTE : if the SE defining the gb is anything
2916 // other than a ref to a variable, this will generate
2917 // illegal code. To be resolved with Spatch.
2918 sprintf(tmpstr,"\tgb_attr_tmp%d = %s;\n",
2919 g, generate_se_code(gb_tbl->get_def(g),schema).c_str() );
2921 sprintf(tmpstr,"\t%s(f, &gb_attr_%d, &gb_attr_tmp%d);\n",
2922 gdt->get_buffer_assign_copy().c_str(), g, g);
2924 sprintf(tmpstr,"\tgb_attr_%d = %s;\n",g,generate_se_code(gb_tbl->get_def(g),schema).c_str());
2931 // A quick aside : if any of the GB attrs are temporal,
2932 // test for change and flush if any change occurred.
2933 // We've already computed the flush code,
2934 // Put it here if this is not a real time query.
2935 // We've already unpacked all column refs, so no need to
2936 // do it again here.
2938 string rt_level = fs->get_val_of_def("real_time");
2939 if(rt_level == "" && temporal_flush != ""){
2940 ret += temporal_flush;
2943 // Compute the hash bucket
2944 if(gb_tbl->size() > 0){
2945 ret += "\thashval = ";\
2946 for(g=0;g<gb_tbl->size();g++){
2947 if(g>0) ret += " ^ ";
2948 data_type *gdt = gb_tbl->get_data_type(g);
2949 if(gdt->is_buffer_type()){
2950 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2951 gdt->get_type_str().c_str(), g);
2953 sprintf(tmpstr,"((%s * lfta_%s_to_hash(gb_attr_%d)))",hash_nums[g%NRANDS].c_str(),
2954 gdt->get_type_str().c_str(), g);
2959 ret += "\thash2 = ((hashval * "+hash_nums[g%NRANDS]+") >> 32) & SLOT_HASH_BITS;\n";
2960 ret+="\tprobe = (hashval >> 32) & (t->max_aggrs-1);\n";
2962 ret+="\tprobe = 0;\n";
2963 ret+="\thash2 = 0;\n\n";
2966 // Does the lfta reference a udaf?
2967 bool has_udaf = false;
2968 for(a=0;a<aggr_tbl->size();a++){
2969 if(! aggr_tbl->is_builtin(a)) has_udaf = true;
2972 // Scan for a match, or alternatively the best slot.
2973 // Currently, hardcode 5 tests.
2975 " gen_val = t->generation & SLOT_GEN_BITS;\n"
2976 " match_found = 0;\n"
2977 " best_slot = probe;\n"
2978 " for(i=0;i<5 && match_found == 0;i++){\n"
2979 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED) && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_HASH_BITS) == hash2 ){\n"
2981 if(gb_tbl->size()>0){
2982 ret+="\n\t/* \t\tcheck if the grouping variables are equal */\n";
2984 string rhs_op, lhs_op;
2985 for(g=0;g<gb_tbl->size();g++){
2986 if(g>0) ret += " && ";
2988 sprintf(tmpstr,"gb_attr_%d",g); lhs_op = tmpstr;
2989 sprintf(tmpstr,"t->aggr_table[probe].gb_var%d",g); rhs_op = tmpstr;
2990 ret += generate_equality_test(lhs_op,rhs_op,gb_tbl->get_data_type(g));
2995 " match_found = 1;\n"
2996 " best_slot = probe;\n"
2999 "// Rate slots in case no match found: prefer empty, then full but old slots\n"
3000 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3001 " if((t->aggr_table_hashmap[probe] & SLOT_FILLED)==0)\n"
3002 " best_slot = probe;\n"
3004 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val && (t->aggr_table_hashmap[probe] & SLOT_GEN_BITS) != gen_val){\n"
3005 " best_slot = probe;\n"
3009 " if(probe >= t->max_aggrs)\n"
3012 " if(match_found){\n"
3014 ret += generate_gb_update(node_name, schema, "best_slot",has_udaf);
3017 " if(t->aggr_table_hashmap[best_slot] & SLOT_FILLED){\n"
3019 printf("sgah_qpn name is %s, disorder is %d\n",fs->node_name.c_str(),((sgah_qpn *)fs)->lfta_disorder);
3020 if(((sgah_qpn *)fs)->lfta_disorder <= 1){
3022 " if((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS)==gen_val){\n"
3024 bool first_g = true;
3025 for(int g=0;g<gb_tbl->size();g++){
3026 data_type *gdt = gb_tbl->get_data_type(g);
3027 if(gdt->is_temporal()){
3028 if(first_g) first_g = false; else ret+=" + ";
3029 ret += "(gb_attr_"+int_to_string(g)+" - t->aggr_table[best_slot].gb_var"+int_to_string(g)+")";
3032 ret += ") == 0 ){\n";
3035 " fta_aggr_flush_old_"+ node_name+"(f,t->max_aggrs);\n"
3041 ret += generate_tuple_from_aggr(node_name,schema,"best_slot");
3043 "\t\t\t#ifdef LFTA_STATS\n"
3044 "\t\t\tif((t->aggr_table_hashmap[best_slot] & SLOT_GEN_BITS) == gen_val)\n"
3045 "\t\t\t\tt->collision_cnt++;\n\n"
3049 ret += generate_init_group(schema,"best_slot");
3059 string generate_fta_accept(qp_node *fs, string node_name, table_list *schema, ext_fcn_list *Ext_fcns, bool is_aggr_query, bool is_fj, set<unsigned int> &s_pids){
3061 string ret="static gs_retval_t accept_packet_"+node_name+
3062 "(struct FTA *f, FTAID * ftaid, void *pkt, gs_int32_t sz){\n";
3063 ret += "\tstruct packet *p = (struct packet *)pkt;\n";
3067 // Define all of the variables needed by this
3071 // Gather all column references, need to define unpacking variables.
3074 col_id_set::iterator csi;
3076 // If its a filter join, rebind all colrefs
3077 // to the first range var, to avoid double unpacking.
3080 for(w=0;w<where.size();++w)
3081 reset_pr_col_ids_tblvars(where[w]->pr, gb_tbl);
3082 for(s=0;s<sl_list.size();s++)
3083 reset_se_col_ids_tblvars(sl_list[s], gb_tbl);
3086 for(w=0;w<where.size();++w){
3087 if(is_fj || s_pids.count(w) == 0)
3088 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3090 for(s=0;s<sl_list.size();s++){
3091 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3096 for(g=0;g<gb_tbl->size();g++)
3097 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3100 // Variables for unpacking attributes.
3101 ret += "/*\t\tVariables for unpacking attributes\t*/\n";
3102 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3103 int schref = (*csi).schema_ref;
3104 int tblref = (*csi).tblvar_ref;
3105 string field = (*csi).field;
3106 data_type dt(schema->get_type_name(schref,field));
3107 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
3108 field.c_str(), tblref);
3114 // Variables that are always needed
3115 ret += "/*\t\tVariables which are always needed\t*/\n";
3116 ret += "\tgs_retval_t retval;\n";
3117 ret += "\tgs_int32_t tuple_size, tuple_pos, lfta_bailout;\n";
3118 ret += "\tstruct "+generate_tuple_name(node_name)+" *tuple;\n";
3120 ret+="\tstruct "+generate_fta_name(node_name)+" *t = (struct "+generate_fta_name(node_name)+"*) f;\n\n";
3123 // Variables needed for aggregation queries.
3125 ret += "\n/*\t\tVariables for aggregation\t*/\n";
3126 ret+="\tunsigned int i, probe;\n";
3127 ret+="\tunsigned int gen_val, match_found, best_slot;\n";
3128 ret+="\tgs_uint64_t hashval, hash2;\n";
3129 // Variables for storing group-by attribute values.
3130 if(gb_tbl->size() > 0)
3131 ret += "/*\t\tGroup-by attributes\t*/\n";
3132 for(g=0;g<gb_tbl->size();g++){
3133 sprintf(tmpstr,"\t%s gb_attr_%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3135 data_type *gdt = gb_tbl->get_data_type(g);
3136 if(gdt->is_buffer_type()){
3137 sprintf(tmpstr,"\t%s gb_attr_tmp%d;\n",gb_tbl->get_data_type(g)->get_cvar_type().c_str(),g);
3142 // Temporaries for min/max
3143 string aggr_tmp_str = "";
3144 for(a=0;a<aggr_tbl->size();a++){
3145 string aggr_op = aggr_tbl->get_op(a);
3146 if(aggr_op == "MIN" || aggr_op == "MAX"){
3147 sprintf(tmpstr,"\t%s aggr_tmp_%d;\n",aggr_tbl->get_data_type(a)->get_cvar_type().c_str(),a);
3148 aggr_tmp_str.append(tmpstr);
3151 if(aggr_tmp_str != ""){
3152 ret += "/*\t\tTemp vars for BUFFER aggregates\t*/\n";
3153 ret += aggr_tmp_str;
3156 // Variables for udaf output temporaries
3157 bool no_udaf = true;
3158 for(a=0;a<aggr_tbl->size();a++){
3159 if(! aggr_tbl->is_builtin(a)){
3161 ret+="/*\t\tUDAF output vars.\t*/\n";
3164 int afcn_id = aggr_tbl->get_fcn_id(a);
3165 data_type *adt = Ext_fcns->get_fcn_dt(afcn_id);
3166 sprintf(tmpstr,"udaf_ret%d", a);
3167 ret+="\t"+adt->make_cvar(tmpstr)+";\n";
3172 // Variables needed for a filter join query
3173 if(fs->node_type() == "filter_join"){
3174 filter_join_qpn *fjq = (filter_join_qpn *)fs;
3175 bool uses_bloom = fjq->use_bloom;
3176 ret += "/*\t\tJoin fields\t*/\n";
3177 for(g=0;g<fjq->hash_eq.size();g++){
3178 sprintf(tmpstr,"\t%s s_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_right_se()->get_data_type()->get_cvar_type().c_str(),g);
3180 sprintf(tmpstr,"\t%s r_equijoin_%d;\n",fjq->hash_eq[g]->pr->get_left_se()->get_data_type()->get_cvar_type().c_str(),g);
3185 " /* Variables for fj bloom filter */ \n"
3186 "\tunsigned int i=0,j=0,k=0, b, bf_clean = 0, tmp_i, found; \n"
3187 "\tunsigned int bucket, bucket0, bucket1, bucket2;\n"
3188 "\tlong long int curr_fj_ts;\n"
3189 "\tlong long int curr_bin, the_bin;\n"
3194 " /* Variables for fj join table */ \n"
3195 "\tunsigned int i, bucket, found; \n"
3196 "\tunsigned int bucket1, the_bucket;\n"
3197 " long long int curr_fj_ts;\n"
3204 // Variables needed to store selected attributes of BUFFER type
3205 // temporarily, in order to compute their size for storage
3206 // in an output tuple.
3208 string select_var_defs = "";
3209 for(s=0;s<sl_list.size();s++){
3210 data_type *sdt = sl_list[s]->get_data_type();
3211 if(sdt->is_buffer_type()){
3212 sprintf(tmpstr,"\t%s selvar_%d;\n",sdt->get_cvar_type().c_str(),s);
3213 select_var_defs.append(tmpstr);
3216 if(select_var_defs != ""){
3217 ret += "/*\t\tTemporaries for computing buffer sizes.\t*/\n";
3218 ret += select_var_defs;
3221 // Variables to store results of partial functions.
3223 if(partial_fcns.size()>0){
3224 ret += "/*\t\tVariables for storing results of partial functions. \t*/\n";
3225 for(p=0;p<partial_fcns.size();++p){
3226 if(is_partial_fcn[p] || (!is_aggr_query && fcn_ref_cnt[p] >1)){
3227 sprintf(tmpstr,"\t%s partial_fcn_result_%d;\n",
3228 partial_fcns[p]->get_data_type()->get_cvar_type().c_str(), p);
3230 if(!is_aggr_query && fcn_ref_cnt[p] >1){
3231 ret += "\tint fcn_ref_cnt_"+int_to_string(p)+" = 0;\n";
3236 if(is_aggr_query) ret += "\tint unpack_failed = 0;\n";
3240 // variable to hold packet struct //
3242 ret += "\t struct "+node_name+"_input_struct *"+node_name+"_input_struct_var;\n";
3246 ret += "\t#ifdef LFTA_STATS\n";
3247 // variable to store counter of cpu cycles spend in accept_tuple
3248 ret += "\tgs_uint64_t start_cycle = rdtsc();\n";
3249 // increment counter of received tuples
3250 ret += "\tt->in_tuple_cnt++;\n";
3251 ret += "\t#endif\n";
3254 // -------------------------------------------------
3255 // If the packet is "packet", test if its for this lfta,
3256 // and if so load it into its struct
3259 ret+="\n/* packed tuple : test and load. \t*/\n";
3260 ret+="\t"+node_name+"_input_struct_var = (struct "+node_name+"_input_struct *) pkt;\n";
3261 ret+="\tif("+node_name+"_input_struct_var->__lfta_id_fm_nic__ != "+int_to_string(global_id) + ")\n";
3262 ret+="\t\tgoto end;\n\n";
3267 col_id_set unpacked_cids; // Keep track of the cols that have been unpacked.
3269 string temporal_flush;
3271 ret += generate_aggr_accept_prelim(fs, node_name, schema, unpacked_cids, temporal_flush);
3272 else { // non-aggregation operators
3274 // Unpack all the temporal attributes referenced in select clause
3275 // and update the last value of the attribute
3276 col_id_set temp_cids; // col ids of temp attributes in select clause
3278 for(s=0;s<sl_list.size();s++){
3279 data_type *sdt = sl_list[s]->get_data_type();
3280 if (sdt->is_temporal()) {
3281 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3284 // If this is a filter join,
3285 // ensure that the temporal range field is unpacked.
3287 col_id window_var_cid(((filter_join_qpn *)fs)->temporal_var);
3288 if(temp_cids.count(window_var_cid)==0)
3289 temp_cids.insert(window_var_cid);
3292 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3293 if(unpacked_cids.count((*csi)) == 0){
3294 int tblref = (*csi).tblvar_ref;
3295 int schref = (*csi).schema_ref;
3296 string field = (*csi).field;
3297 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3298 sprintf(tmpstr,"\tt->last_%s_%d = unpack_var_%s_%d;\n", field.c_str(), tblref, field.c_str(), tblref);
3301 unpacked_cids.insert( (*csi) );
3307 vector<cnf_elem *> filter = fs->get_filter_clause();
3308 // Test the filter predicate (some query types have additional preds).
3309 if(filter.size() > 0){
3311 // Sort by evaluation cost.
3312 // First, estimate evaluation costs
3313 // Eliminate predicates covered by the prefilter (those in s_pids).
3314 // I need to do it before the sort becuase the indices refer
3315 // to the position in the unsorted list./
3316 vector<cnf_elem *> tmp_wh;
3317 for(w=0;w<filter.size();++w){
3318 if(s_pids.count(w) == 0){
3319 compute_cnf_cost(filter[w],Ext_fcns);
3320 tmp_wh.push_back(filter[w]);
3325 sort(filter.begin(), filter.end(), compare_cnf_cost());
3327 // Now generate the predicates.
3328 for(w=0;w<filter.size();++w){
3329 sprintf(tmpstr,"//\t\tPredicate clause %d.(cost %d)\n",w,filter[w]->cost);
3331 // Find the set of variables accessed in this CNF elem,
3332 // but in no previous element.
3333 col_id_set this_pred_cids;
3334 gather_pr_col_ids(filter[w]->pr, this_pred_cids, gb_tbl);
3335 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3336 if(unpacked_cids.count( (*csi) ) == 0){
3337 int tblref = (*csi).tblvar_ref;
3338 int schref = (*csi).schema_ref;
3339 string field = (*csi).field;
3340 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3341 unpacked_cids.insert( (*csi) );
3344 // Find partial fcns ref'd in this cnf element
3346 collect_partial_fcns_pr(filter[w]->pr, pfcn_refs);
3347 // Since set<..> is a "Sorted Associative Container",
3348 // we can walk through it in sorted order by walking from
3349 // begin() to end(). (and the partial fcns must be
3350 // evaluated in this order).
3351 set<int>::iterator si;
3352 for(si=pfcn_refs.begin();si!=pfcn_refs.end();++si){
3353 if(fcn_ref_cnt[(*si)] > 1){
3354 ret += "\tif(fcn_ref_cnt_"+int_to_string((*si))+"==0){\n";
3356 if(is_partial_fcn[(*si)]){
3357 ret += "\t"+unpack_partial_fcn(partial_fcns[(*si)], (*si), schema);
3358 ret += "\t\tif(retval) goto end;\n";
3360 if(fcn_ref_cnt[(*si)] > 1){
3361 if(!is_partial_fcn[(*si)]){
3362 ret += "\t\tpartial_fcn_result_"+int_to_string((*si))+"="+generate_cached_fcn(partial_fcns[(*si)],schema)+";\n";
3364 ret += "\t\tfcn_ref_cnt_"+int_to_string((*si))+"=1;\n";
3369 ret += "\tif( !("+generate_predicate_code(filter[w]->pr,schema)+
3373 ret += "\n\n/*\t\t (no predicate to test)\t*/\n\n";
3377 // We've passed the WHERE clause,
3378 // unpack the remainder of the accessed fields.
3380 ret += "\n/*\tPassed the WHERE clause, unpack the hash fields. */\n";
3381 vector<cnf_elem *> h_eq = ((filter_join_qpn *)fs)-> hash_eq;
3382 for(w=0;w<h_eq.size();++w){
3383 col_id_set this_pred_cids;
3384 gather_pr_col_ids(h_eq[w]->pr, this_pred_cids, gb_tbl);
3385 for(csi=this_pred_cids.begin();csi!=this_pred_cids.end();++csi){
3386 if(unpacked_cids.count( (*csi) ) == 0){
3387 int tblref = (*csi).tblvar_ref;
3388 int schref = (*csi).schema_ref;
3389 string field = (*csi).field;
3390 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3391 unpacked_cids.insert( (*csi) );
3396 ret += "\n/*\tPassed the WHERE clause, unpack the rest of the accessed fields. */\n";
3398 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
3399 if(unpacked_cids.count( (*csi) ) == 0){
3400 int schref = (*csi).schema_ref;
3401 int tblref = (*csi).tblvar_ref;
3402 string field = (*csi).field;
3403 ret += generate_unpack_code(tblref,schref,field,schema,node_name);
3404 unpacked_cids.insert( (*csi) );
3411 ////////////////// After this, the query types
3412 ////////////////// are processed differently.
3414 if(!is_aggr_query && !is_fj)
3415 ret += generate_sel_accept_body(fs, node_name, schema);
3416 else if(is_aggr_query)
3417 ret += generate_aggr_accept_body(fs, node_name, schema, temporal_flush);
3419 ret += generate_fj_accept_body((filter_join_qpn *)fs, node_name, unpacked_cids, Ext_fcns, schema);
3424 ret += "\n\tend:\n";
3425 ret += "\t#ifdef LFTA_STATS\n";
3426 ret+= "\tt->cycle_cnt += rdtsc() - start_cycle;\n";
3427 ret += "\t#endif\n";
3428 ret += "\n\treturn 1;\n}\n\n";
3434 string generate_fta_alloc(qp_node *fs, string node_name, table_list *schema, bool is_aggr_query, bool is_fj, bool uses_bloom){
3437 string ret = "struct FTA * "+generate_alloc_name(node_name) +
3438 "(struct FTAID ftaid, gs_uint32_t reusable, gs_int32_t command, gs_int32_t sz, void *value){\n";
3440 ret+="\tstruct "+generate_fta_name(node_name)+"* f;\n";
3443 ret+="\tif((f=fta_alloc(0,sizeof(struct "+generate_fta_name(node_name)+")))==0){\n\t\treturn(0);\n\t}\n";
3445 // assign a streamid to fta instance
3446 ret+="\t/* assign a streamid */\n";
3447 ret+="\tf->f.ftaid = ftaid;\n";
3448 ret+="\tf->f.ftaid.streamid = (gs_p_t)f;\n";
3449 ret+="\tgslog(LOG_INFO,\"Lfta "+node_name+" has FTAID {ip=%u,port=%u,index=%u,streamid=%u}\\n\",f->f.ftaid.ip,f->f.ftaid.port,f->f.ftaid.index,f->f.ftaid.streamid);\n";
3452 ret += "\tf->n_aggrs = 0;\n";
3454 ret += "\tf->max_aggrs = ";
3456 // Computing the number of aggregate blocks is a little
3457 // tricky. If there are no GB attrs, or if all GB attrs
3458 // are temporal, then use a single aggregate block, else
3459 // use a default value (10). A user specification overrides
3461 bool single_group = true;
3462 for(g=0;g<gb_tbl->size();g++){
3463 data_type *gdt = gb_tbl->get_data_type(g);
3464 if(! gdt->is_temporal() ){
3465 single_group = false;
3468 string max_aggr_str = fs->get_val_of_def("aggregate_slots");
3469 int max_aggr_i = atoi(max_aggr_str.c_str());
3470 if(max_aggr_i <= 0){
3474 ret += int_to_string(DEFAULT_LFTA_HASH_TABLE_SIZE);
3476 unsigned int naggrs = 1; // make it power of 2
3477 unsigned int nones = 0;
3481 naggrs = naggrs << 1;
3482 max_aggr_i = max_aggr_i >> 1;
3484 if(nones==1) // in case it was already a power of 2.
3486 ret += int_to_string(naggrs);
3490 ret+="\tif ((f->aggr_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_aggr_struct_name(node_name)+") * f->max_aggrs))==0) {\n";
3491 ret+="\t\treturn(0);\n";
3493 // ret+="/* compute how many integers we need to store the hashmap */\n";
3494 // ret+="\tf->bitmap_size = (f->max_aggrs % (sizeof(gs_uint32_t) * 4)) ? (f->max_aggrs / (sizeof(gs_uint32_t) * 4) + 1) : (f->max_aggrs / (sizeof(gs_uint32_t) * 4));\n\n";
3495 ret+="\tif ((f->aggr_table_hashmap = sp_fta_alloc((struct FTA *)f,sizeof(gs_uint32_t) * f->max_aggrs))==0) {\n";
3496 ret+="\t\treturn(0);\n";
3498 ret+="/*\t\tfill bitmap with zero \t*/\n";
3499 ret+="\tfor (i = 0; i < f->max_aggrs; ++i)\n";
3500 ret+="\t\tf->aggr_table_hashmap[i] = 0;\n";
3501 ret+="\tf->generation=0;\n";
3502 ret+="\tf->flush_pos = f->max_aggrs;\n";
3504 ret += "\tf->flush_ctr = 0;\n";
3510 ret+="\tf->first_exec = 1;\n";
3511 unsigned int n_bloom = 11;
3512 string n_bloom_str = fs->get_val_of_def("num_bloom");
3513 int tmp_n_bloom = atoi(n_bloom_str.c_str());
3515 n_bloom = tmp_n_bloom+1;
3517 unsigned int window_len = ((filter_join_qpn *)fs)->temporal_range;
3518 if(window_len < n_bloom){
3519 n_bloom = window_len+1;
3522 int bf_exp_size = 12; // base-2 log of number of bits
3523 string bloom_len_str = fs->get_val_of_def("bloom_size");
3524 int tmp_bf_exp_size = atoi(bloom_len_str.c_str());
3525 if(tmp_bf_exp_size > 3 && tmp_bf_exp_size < 32){
3526 bf_exp_size = tmp_bf_exp_size;
3528 int bf_bit_size = 1 << 12;
3529 int bf_byte_size = bf_bit_size / (8*sizeof(char));
3531 int bf_tot = n_bloom*bf_byte_size;
3532 ret+="\tif ((f->bf_table = sp_fta_alloc((struct FTA *)f,"+int_to_string(bf_tot)+"))==0) {\n";
3533 ret+="\t\treturn(0);\n";
3536 " for(i=0;i<"+int_to_string(bf_tot)+";i++)\n"
3537 " f->bf_table[i] = 0;\n"
3540 unsigned int ht_size = 4096;
3541 string ht_size_s = fs->get_val_of_def("aggregate_slots");
3542 int tmp_ht_size = atoi(ht_size_s.c_str());
3543 if(tmp_ht_size > 1024){
3544 unsigned int hs = 1; // make it power of 2
3547 tmp_ht_size = tmp_ht_size >> 1;
3551 ret+="\tif ((f->join_table = sp_fta_alloc((struct FTA *)f,sizeof(struct "+generate_fj_struct_name(node_name)+") * "+int_to_string(ht_size)+"))==0) {\n";
3552 ret+="\t\treturn(0);\n";
3555 " for(i=0;i<"+int_to_string(ht_size)+";i++)\n"
3556 " f->join_table[i].ts = 0;\n"
3561 // Initialize the complex literals (which might be handles).
3563 for(cl=0;cl<complex_literals->size();cl++){
3564 literal_t *l = complex_literals->get_literal(cl);
3565 // sprintf(tmpstr,"\tf->complex_literal_%d = ",cl);
3566 // ret += tmpstr + l->to_C_code() + ";\n";
3567 sprintf(tmpstr,"&(f->complex_literal_%d)",cl);
3568 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
3573 // Initialize the last seen values of temporal attributes to min(max) value of
3574 // their respective type
3575 // Create places to hold the last values of temporal attributes referenced in select clause
3578 col_id_set temp_cids; // col ids of temp attributes in select clause
3581 col_id_set::iterator csi;
3583 for(s=0;s<sl_list.size();s++){
3584 data_type *sdt = sl_list[s]->get_data_type();
3585 if (sdt->is_temporal()) {
3586 gather_se_col_ids(sl_list[s],temp_cids, gb_tbl);
3590 for(csi=temp_cids.begin(); csi != temp_cids.end();++csi){
3591 int tblref = (*csi).tblvar_ref;
3592 int schref = (*csi).schema_ref;
3593 string field = (*csi).field;
3594 data_type dt(schema->get_type_name(schref,field), schema->get_modifier_list(schref,field));
3595 if (dt.is_increasing()) {
3596 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_min_literal().c_str());
3598 } else if (dt.is_decreasing()) {
3599 sprintf(tmpstr,"\tf->last_%s_%d = %s;\n", field.c_str(), tblref, dt.get_max_literal().c_str());
3604 // initialize last seen values of temporal groubpy variables
3606 for(g=0;g<gb_tbl->size();g++){
3607 data_type *dt = gb_tbl->get_data_type(g);
3608 if(dt->is_temporal()){
3610 fprintf(stderr,"group by attribute %s is temporal, ",
3611 gb_tbl->get_name(g).c_str());
3613 if(dt->is_increasing()){
3614 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_min_literal().c_str());
3616 sprintf(tmpstr,"\tf->last_gb_%d = f->last_flushed_gb_%d = %s;\n",g, g, dt->get_max_literal().c_str());
3623 ret += "\tf->f.alloc_fta="+generate_alloc_name(node_name)+";\n";
3624 ret+="\tf->f.free_fta=free_fta_"+node_name+";\n";
3625 ret+="\tf->f.control_fta=control_fta_"+node_name+";\n";
3626 ret+="\tf->f.accept_packet=accept_packet_"+node_name+";\n";
3627 ret+="\tf->f.clock_fta=clock_fta_"+node_name+";\n\n";
3629 // Initialize runtime stats
3630 ret+="\tf->in_tuple_cnt = 0;\n";
3631 ret+="\tf->out_tuple_cnt = 0;\n";
3632 ret+="\tf->out_tuple_sz = 0;\n";
3633 ret+="\tf->accepted_tuple_cnt = 0;\n";
3634 ret+="\tf->cycle_cnt = 0;\n";
3635 ret+="\tf->collision_cnt = 0;\n";
3636 ret+="\tf->eviction_cnt = 0;\n";
3637 ret+="\tf->sampling_rate = 1.0;\n";
3639 ret+="\tf->trace_id = 0;\n\n";
3640 if(param_tbl->size() > 0){
3642 "\tif(load_params_"+node_name+"(f, sz, value, 1)){\n"
3643 "#ifndef LFTA_IN_NIC\n"
3644 "\t\t\tfprintf(stderr,\"WARNING: parameter passed to lfta "+node_name+" is too small (%d). This query does not have valid parameters, bailing out.\\n\",sz);\n"
3652 // Register the pass-by-handle parameters
3654 for(ph=0;ph<param_handle_table.size();++ph){
3655 data_type pdt(param_handle_table[ph]->type_name);
3656 sprintf(tmpstr,"\tf->handle_param_%d = %s((struct FTA *)f,",ph,param_handle_table[ph]->lfta_registration_fcn().c_str());
3657 switch(param_handle_table[ph]->val_type){
3660 if(pdt.is_buffer_type()) ret += "&(";
3661 sprintf(tmpstr,"f->complex_literal_%d",param_handle_table[ph]->complex_literal_idx);
3663 if(pdt.is_buffer_type()) ret += ")";
3667 // not complex, no constructor
3669 ret += param_handle_table[ph]->litval->to_C_code("") + ");\n";
3672 // query parameter handles are regstered/deregistered in the
3673 // load_params function.
3674 // ret += "t->param_"+param_handle_table[ph]->param_name;
3677 fprintf(stderr, "INTERNAL ERROR unknown case found when processing pass-by-handle parameter table.\n");
3682 ret += "\treturn (struct FTA *) f;\n";
3691 //////////////////////////////////////////////////////////////////
3693 string generate_lfta_block(qp_node *fs, table_list *schema, int gid,
3694 // map<string,string> &int_fcn_defs,
3695 ext_fcn_list *Ext_fcns, string &schema_embed_str, ifq_t *ifdb, nic_property *nicp, set<unsigned int> &s_pids){
3700 /////////////////////////////////////////////////////////////
3701 /// Do operator-generic processing, such as
3702 /// gathering the set of referenced columns,
3703 /// generating structures, etc.
3705 // Initialize globals to empty.
3706 gb_tbl = NULL; aggr_tbl = NULL;
3707 global_id = -1; nicprop = NULL;
3708 param_tbl = fs->get_param_tbl();
3709 sl_list.clear(); where.clear();
3710 partial_fcns.clear();
3711 fcn_ref_cnt.clear(); is_partial_fcn.clear();
3712 pred_class.clear(); pred_pos.clear();
3713 sl_fcns_start=sl_fcns_end=wh_fcns_start=wh_fcns_end=0;
3714 gb_fcns_start=gb_fcns_end=ag_fcns_start=ag_fcns_end=0;
3717 // Does the lfta read packed results from the NIC?
3718 nicprop = nicp; // load into global
3720 packed_return = false;
3721 if(nicp && nicp->option_exists("Return")){
3722 if(nicp->option_value("Return") == "Packed"){
3723 packed_return = true;
3725 fprintf(stderr,"Warning, nic option value of Return=%s is not recognized, ignoring\n",nicp->option_value("Return").c_str());
3730 // Extract data which defines the query.
3731 // complex literals gathered now.
3732 complex_literals = fs->get_cplx_lit_tbl(Ext_fcns);
3733 param_handle_table = fs->get_handle_param_tbl(Ext_fcns);
3734 string node_name = fs->get_node_name();
3735 bool is_fj = false, uses_bloom = false;
3738 if(fs->node_type() == "spx_qpn"){
3739 is_aggr_query = false;
3740 spx_qpn *spx_node = (spx_qpn *)fs;
3741 sl_list = spx_node->get_select_se_list();
3742 where = spx_node->get_where_clause();
3746 if(fs->node_type() == "sgah_qpn"){
3747 is_aggr_query = true;
3748 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3749 sl_list = sgah_node->get_select_se_list();
3750 where = sgah_node->get_where_clause();
3751 gb_tbl = sgah_node->get_gb_tbl();
3752 aggr_tbl = sgah_node->get_aggr_tbl();
3754 if((sgah_node->get_having_clause()).size() > 0){
3755 fprintf(stderr,"Warning in LFTA %s: HAVING clause will be ignored.\n", fs->get_node_name().c_str());
3758 if(fs->node_type() == "filter_join"){
3759 is_aggr_query = false;
3761 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3762 sl_list = fj_node->get_select_se_list();
3763 where = fj_node->get_where_clause();
3764 uses_bloom = fj_node->use_bloom;
3768 fprintf(stderr,"INTERNAL ERROR, unrecognized node type %s in generate_lfta_block\n", fs->node_type().c_str());
3772 // Build list of "partial functions", by clause.
3773 // NOTE : partial fcns are not handles well.
3774 // The act of searching for them associates the fcn call
3775 // in the SE with an index to an array. Refs to the
3776 // fcn value are replaced with refs to the variable they are
3777 // unpacked into. A more general tagging mechanism would be better.
3780 vector<bool> *pfunc_ptr = NULL;
3781 vector<int> *ref_cnt_ptr = NULL;
3782 if(!is_aggr_query){ // don't collect cacheable fcns on aggr query.
3783 ref_cnt_ptr = &fcn_ref_cnt;
3784 pfunc_ptr = &is_partial_fcn;
3788 for(i=0;i<sl_list.size();i++){
3789 find_partial_fcns(sl_list[i], &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3791 wh_fcns_start = sl_fcns_end = partial_fcns.size();
3792 for(i=0;i<where.size();i++){
3793 find_partial_fcns_pr(where[i]->pr, &partial_fcns, ref_cnt_ptr, pfunc_ptr, Ext_fcns);
3795 gb_fcns_start = wh_fcns_end = partial_fcns.size();
3797 for(i=0;i<gb_tbl->size();i++){
3798 find_partial_fcns(gb_tbl->get_def(i), &partial_fcns, NULL, NULL, Ext_fcns);
3801 ag_fcns_start = gb_fcns_end = partial_fcns.size();
3802 if(aggr_tbl != NULL){
3803 for(i=0;i<aggr_tbl->size();i++){
3804 find_partial_fcns(aggr_tbl->get_aggr_se(i), NULL, NULL, &is_partial_fcn, Ext_fcns);
3807 ag_fcns_end = partial_fcns.size();
3809 // Fill up the is_partial_fcn and fcn_ref_cnt arrays.
3811 for(i=0; i<partial_fcns.size();i++){
3812 fcn_ref_cnt.push_back(1);
3813 is_partial_fcn.push_back(true);
3817 // Unmark non-partial expensive functions referenced only once.
3818 for(i=0; i<partial_fcns.size();i++){
3819 if(!is_partial_fcn[i] && fcn_ref_cnt[i] <= 1){
3820 partial_fcns[i]->set_partial_ref(-1);
3824 node_name = normalize_name(node_name);
3826 retval += generate_preamble(schema, /*int_fcn_defs,*/ node_name, schema_embed_str);
3828 if(packed_return){ // generate unpack struct
3829 vector<tablevar_t *> input_tbls = fs->get_input_tbls();
3830 int schref = input_tbls[0]->get_schema_ref();
3831 vector<string> refd_cols;
3832 for(s=0;s<sl_list.size();++s){
3833 gather_nicsafe_cols(sl_list[s],refd_cols, nicp, gb_tbl);
3835 for(p=0;p<where.size();++p){
3836 // I'm not disabling these preds ...
3837 gather_nicsafe_cols(where[p]->pr,refd_cols, nicp, gb_tbl);
3840 for(g=0;g<gb_tbl->size();++g){
3841 gather_nicsafe_cols(gb_tbl->get_def(g),refd_cols, nicp, gb_tbl);
3844 sort(refd_cols.begin(), refd_cols.end());
3845 retval += "struct "+node_name+"_input_struct{\n";
3846 retval += "\tint __lfta_id_fm_nic__;\n";
3848 for(vsi=0;vsi<refd_cols.size();++vsi){
3849 data_type dt(schema->get_type_name(schref,refd_cols[vsi]));
3850 retval+="\t"+dt.get_cvar_type()+" unpack_var_"+refd_cols[vsi]+";\n";
3856 /////////////////////////////////////////////////////
3857 // Common stuff unpacked, do some generation
3860 retval += generate_aggr_struct(node_name, gb_tbl, aggr_tbl);
3862 retval += generate_fj_struct((filter_join_qpn *)fs, node_name);
3864 retval += generate_fta_struct(node_name, gb_tbl, aggr_tbl, param_tbl, complex_literals, param_handle_table, is_aggr_query, is_fj, uses_bloom, schema);
3865 retval += generate_tuple_struct(node_name, sl_list) ;
3868 retval += generate_fta_flush(node_name, schema, Ext_fcns) ;
3869 if(param_tbl->size() > 0)
3870 retval += generate_fta_load_params(node_name) ;
3871 retval += generate_fta_free(node_name, is_aggr_query) ;
3872 retval += generate_fta_control(node_name, schema, is_aggr_query) ;
3873 retval += generate_fta_accept(fs, node_name, schema, Ext_fcns, is_aggr_query, is_fj, s_pids) ;
3876 /* extract the value of Time_Correlation from interface definition */
3880 vector<tablevar_t *> tvec = fs->get_input_tbls();
3881 vector<string> time_corr_vec = ifdb->get_iface_vals(tvec[0]->get_machine(), tvec[0]->get_interface(),"Time_Correlation",e,es);
3882 if (time_corr_vec.empty())
3883 time_corr = DEFAULT_TIME_CORR;
3885 time_corr = atoi(time_corr_vec[0].c_str());
3887 retval.append( generate_fta_clock(node_name, schema, time_corr, is_aggr_query) );
3888 retval.append( generate_fta_alloc(fs, node_name, schema, is_aggr_query, is_fj, uses_bloom) );
3895 int compute_snap_len(qp_node *fs, table_list *schema){
3897 // Initialize global vars
3899 sl_list.clear(); where.clear();
3901 if(fs->node_type() == "spx_qpn"){
3902 spx_qpn *spx_node = (spx_qpn *)fs;
3903 sl_list = spx_node->get_select_se_list();
3904 where = spx_node->get_where_clause();
3906 else if(fs->node_type() == "sgah_qpn"){
3907 sgah_qpn *sgah_node = (sgah_qpn *)fs;
3908 sl_list = sgah_node->get_select_se_list();
3909 where = sgah_node->get_where_clause();
3910 gb_tbl = sgah_node->get_gb_tbl();
3912 else if(fs->node_type() == "filter_join"){
3913 filter_join_qpn *fj_node = (filter_join_qpn *)fs;
3914 sl_list = fj_node->get_select_se_list();
3915 where = fj_node->get_where_clause();
3917 fprintf(stderr,"INTERNAL ERROR, node type %s not recognized in compute_snap_len\n",fs->node_type().c_str());
3921 // Gather all column references, need to define unpacking variables.
3924 col_id_set::iterator csi;
3926 for(w=0;w<where.size();++w)
3927 gather_pr_col_ids(where[w]->pr,cid_set, gb_tbl);
3928 for(s=0;s<sl_list.size();s++){
3929 gather_se_col_ids(sl_list[s],cid_set, gb_tbl);
3934 for(g=0;g<gb_tbl->size();g++)
3935 gather_se_col_ids(gb_tbl->get_def(g),cid_set, gb_tbl);
3938 // compute snap length
3941 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
3942 int schref = (*csi).schema_ref;
3943 int tblref = (*csi).tblvar_ref;
3944 string field = (*csi).field;
3946 param_list *field_params = schema->get_modifier_list(schref, field);
3947 if(field_params->contains_key("snap_len")){
3948 string fld_snap_str = field_params->val_of("snap_len");
3950 if(sscanf(fld_snap_str.c_str(),"%d",&fld_snap)>0){
3951 if(fld_snap > snap_len) snap_len = fld_snap;
3954 fprintf(stderr,"CONFIGURATION ERROR: field %s has a non-numeric snap length (%s), ignoring\n",field.c_str(), fld_snap_str.c_str() );
3959 if(n_snap == cid_set.size()){
3968 // Function which computes an optimal
3969 // set of unpacking functions.
3971 void find_optimal_unpack_fcns(col_id_set &upref_cids, table_list *Schema, map<col_id, string,lt_col_id> &ucol_fcn_map){
3972 map<string, int> pfcn_count;
3973 map<string, int>::iterator msii;
3974 col_id_set::iterator cisi;
3975 set<string>::iterator ssi;
3978 while(ucol_fcn_map.size() < upref_cids.size()){
3980 // Gather unpack functions referenced by unaccounted-for
3981 // columns, and increment their reference count.
3983 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
3984 if(ucol_fcn_map.count((*cisi)) == 0){
3985 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
3986 for(ssi=ufcns.begin();ssi!=ufcns.end();++ssi)
3987 pfcn_count[(*ssi)]++;
3991 // Get the lowest cost per field function.
3992 float min_cost = 0.0;
3993 string best_fcn = "";
3994 for(msii=pfcn_count.begin();msii!=pfcn_count.end();++msii){
3995 int fcost = Schema->get_ufcn_cost((*msii).first);
3997 fprintf(stderr,"CONFIGURATION ERROR, unpack function %s either has negative cost or is not defined.\n",(*msii).first.c_str());
4000 float this_cost = (1.0*fcost)/(*msii).second;
4001 if(msii == pfcn_count.begin() || this_cost < min_cost){
4002 min_cost = this_cost;
4003 best_fcn = (*msii).first;
4007 fprintf(stderr,"ERROR, could not find a best field unpqacking function.\n");
4011 // Assign this function to the unassigned fcns which use it.
4012 for(cisi=upref_cids.begin();cisi!=upref_cids.end();++cisi){
4013 if(ucol_fcn_map.count((*cisi)) == 0){
4014 set<string> ufcns = Schema->get_field((*cisi).schema_ref, (*cisi).field)->get_unpack_fcns();
4015 if(ufcns.count(best_fcn)>0)
4016 ucol_fcn_map[(*cisi)] = best_fcn;
4024 // Generate an initial test test for the lfta
4025 // Assume that the predicate references no external functions,
4026 // and especially no partial functions,
4027 // aggregates, internal functions.
4028 string generate_lfta_prefilter(vector<cnf_set *> &pred_list,
4029 col_id_set &temp_cids, table_list *Schema, ext_fcn_list *Ext_fcns,
4030 vector<col_id_set> &lfta_cols, vector<long long int> &lfta_sigs,
4031 vector<int> &lfta_snap_lens, string iface){
4032 col_id_set tmp_cid_set, cid_set,upref_cids,upall_cids;
4033 col_id_set::iterator csi;
4037 // Gather complex literals in the prefilter.
4038 cplx_lit_table *complex_literals = new cplx_lit_table();
4039 for(p=0;p<pred_list.size();++p){
4040 find_complex_literal_pr(pred_list[p]->pr,Ext_fcns, complex_literals);
4044 // Find the combinable predicates
4045 vector<predicate_t *> pr_list;
4046 for(p=0;p<pred_list.size();++p){
4047 find_combinable_preds(pred_list[p]->pr,&pr_list, Schema, Ext_fcns);
4050 // Analyze the combinable predicates to find the predicate classes.
4051 pred_class.clear(); // idx to equiv pred in equiv_list
4052 pred_pos.clear(); // idx to returned bitmask.
4053 vector<predicate_t *> equiv_list;
4054 vector<int> num_equiv;
4057 for(p=0;p<pr_list.size();++p){
4058 for(q=0;q<equiv_list.size();++q){
4059 if(is_equivalent_class_pred_base(equiv_list[q],pr_list[p],Schema,Ext_fcns))
4062 if(q == equiv_list.size()){ // no equiv : create new
4063 pred_class.push_back(equiv_list.size());
4064 equiv_list.push_back(pr_list[p]);
4065 pred_pos.push_back(0);
4066 num_equiv.push_back(1);
4068 }else{ // pr_list[p] is equivalent to pred q
4069 pred_class.push_back(q);
4070 pred_pos.push_back(num_equiv[q]);
4075 // Generate the variables which hold the common pred handles
4076 ret += "/*\t\tprefilter global vars.\t*/\n";
4077 for(q=0;q<equiv_list.size();++q){
4078 for(p=0;p<=(num_equiv[q]/32);++p){
4079 ret += "void *pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface+";\n";
4083 // Struct to hold prefilter complex literals
4084 ret += "struct prefilter_complex_lit_struct_"+iface+" {\n";
4085 if(complex_literals->size() == 0)
4086 ret += "\tint no_variable;\n";
4088 for(cl=0;cl<complex_literals->size();cl++){
4089 literal_t *l = complex_literals->get_literal(cl);
4090 data_type *dtl = new data_type( l->get_type() );
4091 sprintf(tmpstr,"\t%s complex_literal_%d;\n",dtl->get_cvar_type().c_str(),cl);
4094 ret += "} prefilter_complex_lits_"+iface+";\n\n";
4097 // Generate the prefilter initialziation code
4098 ret += "void init_lfta_prefilter_"+iface+"(){\n";
4100 // First initialize complex literals, if any.
4101 ret += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4102 for(cl=0;cl<complex_literals->size();cl++){
4103 literal_t *l = complex_literals->get_literal(cl);
4104 sprintf(tmpstr,"&(t->complex_literal_%d)",cl);
4105 ret += "\t" + l->to_C_code(tmpstr) + ";\n";
4109 set<int> epred_seen;
4110 for(p=0;p<pr_list.size();++p){
4111 int q = pred_class[p];
4112 //printf("\tq=%d\n",q);
4113 if(epred_seen.count(q)>0){
4114 ret += "\tregister_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4115 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4116 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4117 for(o=0;o<op_list.size();++o){
4119 ret += generate_se_code(op_list[o],Schema)+", ";
4122 ret += "pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+","+int_to_string(pred_pos[p]%32)+");\n";
4123 epred_seen.insert(q);
4125 ret += "\tpref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(pred_pos[p]/32)+"_"+iface+" = (void *)register_commonpred_handles_"+equiv_list[q]->get_op()+"(";
4126 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4127 vector<scalarexp_t *> op_list = pr_list[p]->get_op_list();
4128 for(o=0;o<op_list.size();++o){
4130 ret += generate_se_code(op_list[o],Schema)+", ";
4133 ret += "NULL,"+int_to_string(pred_pos[p]%32)+");\n";
4134 epred_seen.insert(q);
4141 // Start on main body code generation
4142 ret+="gs_uint64_t lfta_prefilter_"+iface+"(void *pkt){\n";
4145 ///--------------------------------------------------------------
4146 /// Generate and store the prefilter body,
4147 /// reuse it for the snap length calculator
4148 ///-------------------------------------------------------------
4151 body += "\tstruct packet *p = (struct packet *)pkt;\n";
4155 // Gather the colids to store unpacked variables.
4156 for(p=0;p<pred_list.size();++p){
4157 gather_pr_col_ids(pred_list[p]->pr,tmp_cid_set, gb_tbl);
4160 // make the col_ids refer to the base tables, and
4161 // grab the col_ids with at least one unpacking function.
4162 for(csi=tmp_cid_set.begin();csi!=tmp_cid_set.end();++csi){
4163 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4165 tmp_col_id.field = (*csi).field;
4166 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4167 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4168 cid_set.insert(tmp_col_id);
4169 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4170 if(fe->get_unpack_fcns().size()>0)
4171 upref_cids.insert(tmp_col_id);
4176 // Find the set of unpacking programs needed for the
4177 // prefilter fields.
4178 map<col_id, string,lt_col_id> ucol_fcn_map;
4179 find_optimal_unpack_fcns(upref_cids, Schema, ucol_fcn_map);
4180 set<string> pref_ufcns;
4181 map<col_id, string,lt_col_id>::iterator mcis;
4182 for(mcis=ucol_fcn_map.begin(); mcis!=ucol_fcn_map.end(); mcis++){
4183 pref_ufcns.insert((*mcis).second);
4188 // Variables for unpacking attributes.
4189 body += "/*\t\tVariables for unpacking attributes\t*/\n";
4190 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4191 int schref = (*csi).schema_ref;
4192 int tblref = (*csi).tblvar_ref;
4193 string field = (*csi).field;
4194 data_type dt(Schema->get_type_name(schref,field));
4195 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4196 field.c_str(), tblref);
4198 sprintf(tmpstr,"\tgs_retval_t ret_%s_%d;\n", field.c_str(),tblref);
4201 // Variables for unpacking temporal attributes.
4202 body += "/*\t\tVariables for unpacking temporal attributes\t*/\n";
4203 for(csi=temp_cids.begin(); csi!=temp_cids.end();++csi){
4204 if (cid_set.count(*csi) == 0) {
4205 int schref = (*csi).schema_ref;
4206 int tblref = (*csi).tblvar_ref;
4207 string field = (*csi).field;
4208 data_type dt(Schema->get_type_name(schref,field));
4209 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4210 field.c_str(), tblref);
4217 // Variables for combinable predicate evaluation
4218 body += "/*\t\tVariables for common prdicate evaluation\t*/\n";
4219 for(q=0;q<equiv_list.size();++q){
4220 for(p=0;p<=(num_equiv[q]/32);++p){
4221 body += "unsigned long int pref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = 0;\n";
4226 // Variables that are always needed
4227 body += "/*\t\tVariables which are always needed\t*/\n";
4228 body += "\tgs_uint64_t retval=0, bitpos=1;\n";
4229 body += "\tstruct prefilter_complex_lit_struct_"+iface+" *t = &prefilter_complex_lits_"+iface+";\n";
4231 // Call the unpacking functions for the prefilter fields
4232 if(pref_ufcns.size() > 0)
4233 body += "\n/*\t\tcall field unpacking functions\t*/\n";
4234 set<string>::iterator ssi;
4235 for(ssi=pref_ufcns.begin(); ssi!=pref_ufcns.end(); ++ssi){
4236 body += "\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4240 // Unpack the accessed attributes
4241 body += "\n/*\t\tUnpack the accessed attributes.\t*/\n";
4242 for(csi=cid_set.begin();csi!=cid_set.end();++csi){
4243 int tblref = (*csi).tblvar_ref;
4244 int schref = (*csi).schema_ref;
4245 string field = (*csi).field;
4246 sprintf(tmpstr,"\tret_%s_%d = (%s(p, &unpack_var_%s_%d) == 0);\n",
4247 field.c_str(),tblref,Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4251 // next unpack the temporal attributes and ignore the errors
4252 // We are assuming here that failed unpack of temporal attributes
4253 // is not going to overwrite the last stored value
4254 // Failed upacks are ignored
4255 for(csi=temp_cids.begin();csi!=temp_cids.end();++csi){
4256 int tblref = (*csi).tblvar_ref;
4257 int schref = (*csi).schema_ref;
4258 string field = (*csi).field;
4259 sprintf(tmpstr,"\t%s(p, &prefilter_temp_vars.unpack_var_%s_%d);\n",
4260 Schema->get_fcn(schref,field).c_str(), field.c_str(), tblref);
4264 // Evaluate the combinable predicates
4265 if(equiv_list.size()>0)
4266 body += "/*\t\tEvaluate the combinable predicates.\t*/\n";
4267 for(q=0;q<equiv_list.size();++q){
4268 for(p=0;p<=(num_equiv[q]/32);++p){
4270 // Only call the common eval fcn if all ref'd fields present.
4271 col_id_set pred_cids;
4272 col_id_set::iterator cpi;
4273 gather_pr_col_ids(equiv_list[q], pred_cids, gb_tbl);
4274 if(pred_cids.size()>0){
4276 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4277 if(cpi != pred_cids.begin())
4279 string field = (*cpi).field;
4280 int tblref = (*cpi).tblvar_ref;
4281 body += "ret_"+field+"_"+int_to_string(tblref);
4286 body += "\t\tpref_common_pred_val_"+int_to_string(q)+"_"+int_to_string(p)+" = eval_commonpred_"+equiv_list[q]->get_op()+"(pref_common_pred_hdl_"+int_to_string(q)+"_"+int_to_string(p)+"_"+iface;
4287 vector<scalarexp_t *> op_list = equiv_list[q]->get_op_list();
4288 vector<bool> cl_op = Ext_fcns->get_class_indicators(equiv_list[q]->get_fcn_id());
4289 for(o=0;o<op_list.size();++o){
4291 body += ","+generate_se_code(op_list[o],Schema);
4299 for(p=0;p<pred_list.size();++p){
4300 col_id_set pred_cids;
4301 col_id_set::iterator cpi;
4302 gather_pr_col_ids(pred_list[p]->pr,pred_cids, gb_tbl);
4303 if(pred_cids.size()>0){
4305 for(cpi=pred_cids.begin();cpi!=pred_cids.end();++cpi){
4306 if(cpi != pred_cids.begin())
4308 string field = (*cpi).field;
4309 int tblref = (*cpi).tblvar_ref;
4310 body += "ret_"+field+"_"+int_to_string(tblref);
4314 body += "\t\tif("+generate_predicate_code(pred_list[p]->pr,Schema)+")\n\t\t\tretval |= bitpos;\n";
4315 body+="\tbitpos = bitpos << 1;\n";
4318 // ---------------------------------------------------------------
4319 // Finished with the body of the prefilter
4320 // --------------------------------------------------------------
4324 // Collect fields referenced by an lfta but not
4325 // already unpacked for the prefilter.
4327 //printf("upref_cids is:\n");
4328 //for(csi=upref_cids.begin();csi!=upref_cids.end();csi++)
4329 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4330 //printf("pref_ufcns is:\n");
4331 //for(ssi=pref_ufcns.begin();ssi!=pref_ufcns.end();++ssi)
4332 //printf("\t%s\n",(*ssi).c_str());
4335 for(l=0;l<lfta_cols.size();++l){
4336 for(csi=lfta_cols[l].begin();csi!=lfta_cols[l].end();++csi){
4337 string c_tbl = Schema->get_basetbl_name((*csi).schema_ref,(*csi).field);
4339 tmp_col_id.field = (*csi).field;
4340 tmp_col_id.tblvar_ref = (*csi).tblvar_ref;
4341 tmp_col_id.schema_ref = Schema->get_table_ref(c_tbl);
4342 field_entry *fe = Schema->get_field(tmp_col_id.schema_ref, tmp_col_id.field);
4343 set<string> fld_ufcns = fe->get_unpack_fcns();
4344 //printf("tmpcol is (%s, %d), ufcns size is %d, upref_cids cnt is %d\n",tmp_col_id.field.c_str(),tmp_col_id.schema_ref,fld_ufcns.size(), upref_cids.count(tmp_col_id));
4345 if(fld_ufcns.size()>0 && upref_cids.count(tmp_col_id) == 0){
4346 // Ensure that this field not already unpacked.
4348 for(ssi=fld_ufcns.begin();ssi!=fld_ufcns.end();++ssi){
4349 //printf("\tField has unpacking fcn %s\n",(*ssi).c_str());
4350 if(pref_ufcns.count((*ssi))){
4351 //printf("Field already unpacked.\n");
4356 //printf("\tadding to unpack list\n");
4357 upall_cids.insert(tmp_col_id);
4363 //printf("upall_cids is:\n");
4364 //for(csi=upall_cids.begin();csi!=upall_cids.end();csi++)
4365 //printf("\t%s %d\n",(*csi).field.c_str(), (*csi).schema_ref);
4367 // Get the set of unpacking programs for these.
4368 map<col_id, string,lt_col_id> uall_fcn_map;
4369 find_optimal_unpack_fcns(upall_cids, Schema, uall_fcn_map);
4370 set<string> pall_ufcns;
4371 for(mcis=uall_fcn_map.begin(); mcis!=uall_fcn_map.end(); mcis++){
4372 //printf("uall_fcn_map[%s %d] = %s\n",(*mcis).first.field.c_str(),(*mcis).first.schema_ref,(*mcis).second.c_str());
4373 pall_ufcns.insert((*mcis).second);
4376 // Iterate through the remaining set of unpacking function
4377 if(pall_ufcns.size() > 0)
4378 ret += "//\t\tcall all remaining field unpacking functions.\n";
4379 for(ssi=pall_ufcns.begin(); ssi!=pall_ufcns.end(); ++ssi){
4380 // gather the set of columns unpacked by this ufcn
4381 col_id_set fcol_set;
4382 for(csi=upall_cids.begin();csi!=upall_cids.end();++csi){
4383 if(uall_fcn_map[(*csi)] == (*ssi))
4384 fcol_set.insert((*csi));
4387 // gather the set of lftas which access a field unpacked by the fcn
4388 set<long long int> clfta;
4389 for(l=0;l<lfta_cols.size();l++){
4390 for(csi=fcol_set.begin();csi!=fcol_set.end();++csi){
4391 if(lfta_cols[l].count((*csi)) > 0)
4394 if(csi != fcol_set.end())
4395 clfta.insert(lfta_sigs[l]);
4398 // generate the unpacking code
4400 set<long long int>::iterator sii;
4401 for(sii=clfta.begin();sii!=clfta.end();++sii){
4402 if(sii!=clfta.begin())
4404 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sii),(*sii));
4407 ret += ")\n\t\t"+Schema->get_ufcn_fcn((*ssi))+"(p);\n";
4411 ret += "\treturn(retval);\n\n";
4415 // --------------------------------------------------------
4416 // reuse prefilter body for snaplen calculator
4418 // This is dummy code, so I'm commenting it out.
4421 ret+="gs_uint32_t lfta_pkt_snaplen(void *pkt){\n";
4426 vector<int> s_snaps = lfta_snap_lens;
4427 sort(s_snaps.begin(), s_snaps.end());
4429 if(s_snaps[0] == -1){
4430 set<unsigned long long int> sigset;
4431 for(i=0;i<lfta_snap_lens.size();++i){
4432 if(lfta_snap_lens[i] == -1){
4433 sigset.insert(lfta_sigs[i]);
4437 set<unsigned long long int>::iterator sulli;
4438 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4439 if(sulli!=sigset.begin())
4441 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4444 ret += ") return -1;\n";
4447 int nextpos = lfta_snap_lens.size()-1;
4448 int nextval = lfta_snap_lens[nextpos];
4449 while(nextval >= 0){
4450 set<unsigned long long int> sigset;
4451 for(i=0;i<lfta_snap_lens.size();++i){
4452 if(lfta_snap_lens[i] == nextval){
4453 sigset.insert(lfta_sigs[i]);
4457 set<unsigned long long int>::iterator sulli;
4458 for(sulli=sigset.begin();sulli!=sigset.end();++sulli){
4459 if(sulli!=sigset.begin())
4461 sprintf(tmpstr,"((retval & %lluull) == %lluull)",(*sulli),(*sulli));
4464 ret += ") return "+int_to_string(nextval)+";\n";
4466 for(nextpos--;nextpos>0 && lfta_snap_lens[nextpos] == nextval;nextpos--);
4468 nextval = lfta_snap_lens[nextpos];
4472 ret += "\treturn 0;\n";
4483 // Generate the struct which will store the the values of
4484 // temporal attributesunpacked by prefilter
4485 string generate_lfta_prefilter_struct(col_id_set &cid_set, table_list *Schema) {
4487 col_id_set::iterator csi;
4489 // printf("generate_lfta_prefilter_struct : %d vars\n",cid_set.size());
4491 string ret="struct prefilter_unpacked_temp_vars {\n";
4492 ret += "\t/*\tVariables for unpacking temporal attributes\t*/\n";
4496 for(csi=cid_set.begin(); csi!=cid_set.end();++csi){
4497 int schref = (*csi).schema_ref;
4498 int tblref = (*csi).tblvar_ref;
4499 string field = (*csi).field;
4500 data_type dt(Schema->get_type_name(schref,field), Schema->get_modifier_list(schref,field));
4501 sprintf(tmpstr,"\t%s unpack_var_%s_%d;\n",dt.get_cvar_type().c_str(),
4502 field.c_str(), tblref);
4505 if (init_code != "")
4507 if (dt.is_increasing())
4508 init_code += dt.get_min_literal();
4510 init_code += dt.get_max_literal();
4515 ret += "struct prefilter_unpacked_temp_vars prefilter_temp_vars = {" + init_code + "};\n\n";